diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index c912c7a..d0a7084 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -275,6 +275,21 @@ int resp_build_value(const resp_value_t *v, uint8_t *out, size_t cap) { if (write_crlf(&p, end) < 0) return -1; break; + case RESP_T_ARRAY: + if (write_bytes(&p, end, "*", 1) < 0) return -1; + if (write_u32_ascii(&p, end, v->array.count) < 0) return -1; + if (write_crlf(&p, end) < 0) return -1; + for (uint32_t i = 0; i < v->array.count; ++i) { + if (write_bytes(&p, end, "$", 1) < 0) return -1; + if (write_u32_ascii(&p, end, v->array.items[i].len) < 0) return -1; + if (write_crlf(&p, end) < 0) return -1; + if (v->array.items[i].len > 0 && v->array.items[i].ptr) { + if (write_bytes(&p, end, v->array.items[i].ptr, v->array.items[i].len) < 0) return -1; + } + if (write_crlf(&p, end) < 0) return -1; + } + break; + default: return -1; } @@ -285,8 +300,8 @@ int resp_build_value(const resp_value_t *v, uint8_t *out, size_t cap) { /* helpers */ resp_value_t resp_simple(const char *s) { resp_value_t v; + memset(&v, 0, sizeof(v)); v.type = RESP_T_SIMPLE_STR; - v.i64 = 0; v.bulk.ptr = (const uint8_t*)s; v.bulk.len = (uint32_t)strlen(s); return v; @@ -294,8 +309,8 @@ resp_value_t resp_simple(const char *s) { resp_value_t resp_error(const char *s) { resp_value_t v; + memset(&v, 0, sizeof(v)); v.type = RESP_T_ERROR; - v.i64 = 0; v.bulk.ptr = (const uint8_t*)s; v.bulk.len = (uint32_t)strlen(s); return v; @@ -303,17 +318,16 @@ resp_value_t resp_error(const char *s) { resp_value_t resp_int(int64_t x) { resp_value_t v; + memset(&v, 0, sizeof(v)); v.type = RESP_T_INTEGER; v.i64 = x; - v.bulk.ptr = NULL; - v.bulk.len = 0; return v; } resp_value_t resp_bulk(const uint8_t *p, uint32_t n) { resp_value_t v; + memset(&v, 0, sizeof(v)); v.type = RESP_T_BULK_STR; - v.i64 = 0; v.bulk.ptr = p; v.bulk.len = n; return v; @@ -321,13 +335,31 @@ resp_value_t resp_bulk(const uint8_t *p, uint32_t n) { resp_value_t resp_nil(void) { resp_value_t v; + memset(&v, 0, sizeof(v)); v.type = RESP_T_NIL; - v.i64 = 0; - v.bulk.ptr = NULL; - v.bulk.len = 0; return v; } +resp_value_t resp_array(resp_slice_t *items, uint32_t count, int owns_items) { + resp_value_t v; + memset(&v, 0, sizeof(v)); + v.type = RESP_T_ARRAY; + v.array.items = items; + v.array.count = count; + v.array.owns_items = owns_items ? 1 : 0; + return v; +} + +void resp_value_release(resp_value_t *v) { + if (!v) return; + if (v->type == RESP_T_ARRAY && v->array.owns_items && v->array.items) { + kvs_free(v->array.items); + } + memset(v, 0, sizeof(*v)); + v->type = RESP_T_NIL; + return; +} + /* ----------------- dispatcher (minimal) ----------------- */ static int expect_argv(const resp_cmd_t *cmd, uint32_t n) { @@ -337,7 +369,7 @@ static int expect_argv(const resp_cmd_t *cmd, uint32_t n) { const char *command[] = { "PING", "AUTH", - "SET", "GET", "DEL", "SETEX", "MOD", "EXIST", + "SET", "GET", "DEL", "SETEX", "MOD", "EXIST", "SCANPREFIX", "KEYSPREFIX", "SCANRANGE", "KEYSRANGE", "RSET", "RGET", "RDEL", "RMOD", "REXIST", "HSET", "HGET", "HDEL", "HMOD", "HEXIST", "SAVE", "SSYNC", "SREADY", "MEMPRINT" @@ -382,6 +414,120 @@ static int match_auth_password(const resp_slice_t *password) { return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0; } +static int prefix_match(const uint8_t *key, uint32_t key_len, + const uint8_t *prefix, uint32_t prefix_len) { + if (!key || !prefix) return 0; + if (key_len < prefix_len) return 0; + return memcmp(key, prefix, prefix_len) == 0; +} + +static rbtree_node *rbtree_lower_bound(rbtree *T, const uint8_t *key, uint32_t keylen) { + rbtree_node *cur; + rbtree_node *best; + + if (!T || !key) return NULL; + + cur = T->root; + best = T->nil; + while (cur != T->nil) { + const uint8_t *cur_key = kvs_rbtree_node_key_ptr(cur); + int cmp = kvs_keycmp(cur_key, cur->key_len, key, keylen); + if (cmp < 0) { + cur = cur->right; + } else { + best = cur; + cur = cur->left; + } + } + + return best; +} + +static int append_scan_item(resp_slice_t **items, uint32_t *count, uint32_t *cap, + const uint8_t *ptr, uint32_t len) { + resp_slice_t *new_items; + uint32_t new_cap; + + if (!items || !count || !cap) return -1; + + if (*count >= *cap) { + new_cap = (*cap == 0) ? 16u : (*cap * 2u); + new_items = (resp_slice_t *)kvs_malloc(sizeof(resp_slice_t) * (size_t)new_cap); + if (!new_items) return -1; + if (*items && *count > 0) { + memcpy(new_items, *items, sizeof(resp_slice_t) * (size_t)(*count)); + kvs_free(*items); + } + *items = new_items; + *cap = new_cap; + } + + (*items)[*count].ptr = ptr; + (*items)[*count].len = len; + (*count)++; + return 0; +} + +static int build_rbtree_scan_response(const uint8_t *begin, uint32_t begin_len, + const uint8_t *end, uint32_t end_len, + int prefix_mode, int keys_only, + resp_value_t *out_value) { + resp_slice_t *items = NULL; + uint32_t count = 0; + uint32_t cap = 0; + rbtree_node *node; + uint64_t now_ms; + + if (!out_value) return -1; + + now_ms = kvs_now_ms(); + if (begin && begin_len > 0) { + node = rbtree_lower_bound(&global_rbtree, begin, begin_len); + } else if (global_rbtree.root != global_rbtree.nil) { + node = rbtree_mini(&global_rbtree, global_rbtree.root); + } else { + node = global_rbtree.nil; + } + + while (node && node != global_rbtree.nil) { + const uint8_t *key = kvs_rbtree_node_key_ptr(node); + const uint8_t *value = kvs_rbtree_node_value_ptr(node); + rbtree_node *next = rbtree_successor(&global_rbtree, node); + + if (prefix_mode) { + if (begin && begin_len > 0 && !prefix_match(key, node->key_len, begin, begin_len)) { + break; + } + } else if (end && end_len > 0) { + int end_cmp = kvs_keycmp(key, node->key_len, end, end_len); + if (end_cmp > 0) { + break; + } + } + + if (node->expire_at_ms != 0 && node->expire_at_ms <= now_ms) { + (void)ttl_delete_schedule(key, node->key_len); + node = next; + continue; + } + + if (append_scan_item(&items, &count, &cap, key, node->key_len) < 0) { + if (items) kvs_free(items); + return -1; + } + if (!keys_only && + append_scan_item(&items, &count, &cap, value, node->value_len) < 0) { + if (items) kvs_free(items); + return -1; + } + + node = next; + } + + *out_value = resp_array(items, count, 1); + return 0; +} + /** * 输入:cmd @@ -462,6 +608,44 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { return 0; } + case KVS_CMD_SCANPREFIX: { + if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'scanprefix'"); return -1; } + if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len, + NULL, 0, 1, 0, out_value) < 0) { + *out_value = resp_error("ERR internal error"); + } + return 0; + } + + case KVS_CMD_KEYSPREFIX: { + if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'keysprefix'"); return -1; } + if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len, + NULL, 0, 1, 1, out_value) < 0) { + *out_value = resp_error("ERR internal error"); + } + return 0; + } + + case KVS_CMD_SCANRANGE: { + if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'scanrange'"); return -1; } + if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len, + cmd->argv[2].ptr, cmd->argv[2].len, + 0, 0, out_value) < 0) { + *out_value = resp_error("ERR internal error"); + } + return 0; + } + + case KVS_CMD_KEYSRANGE: { + if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'keysrange'"); return -1; } + if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len, + cmd->argv[2].ptr, cmd->argv[2].len, + 0, 1, out_value) < 0) { + *out_value = resp_error("ERR internal error"); + } + return 0; + } + case KVS_CMD_DEL: { if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; } int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len); diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h index a624bfa..99d4262 100644 --- a/kvs_protocol_resp.h +++ b/kvs_protocol_resp.h @@ -26,6 +26,10 @@ typedef enum { KVS_CMD_SETEX, KVS_CMD_MOD, KVS_CMD_EXIST, + KVS_CMD_SCANPREFIX, + KVS_CMD_KEYSPREFIX, + KVS_CMD_SCANRANGE, + KVS_CMD_KEYSRANGE, // rbtree KVS_CMD_RSET, KVS_CMD_RGET, @@ -62,12 +66,20 @@ typedef enum resp_type_e { RESP_T_INTEGER, RESP_T_BULK_STR, RESP_T_NIL, /* nil bulk string ($-1\r\n) */ + RESP_T_ARRAY, } resp_type_t; +typedef struct resp_array_s { + resp_slice_t *items; + uint32_t count; + int owns_items; +} resp_array_t; + typedef struct resp_value_s { resp_type_t type; int64_t i64; /* for integer */ resp_slice_t bulk; /* for simple/error/bulk string (bytes) */ + resp_array_t array; /* for arrays */ } resp_value_t; /* ----------------- parsing ----------------- */ @@ -86,6 +98,8 @@ resp_value_t resp_error(const char *s); /* -s\r\n */ resp_value_t resp_int(int64_t x); /* :x\r\n */ resp_value_t resp_bulk(const uint8_t *p, uint32_t n); /* $n\r\n...\r\n */ resp_value_t resp_nil(void); /* $-1\r\n */ +resp_value_t resp_array(resp_slice_t *items, uint32_t count, int owns_items); +void resp_value_release(resp_value_t *v); /* ----------------- dispatcher ----------------- */ /* Execute one command and return a RESP value. diff --git a/kvstore.c b/kvstore.c index 78c9e4b..db5cd6e 100644 --- a/kvstore.c +++ b/kvstore.c @@ -163,10 +163,10 @@ static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) { len = 5; /* "$-1\r\n" */ break; - case RESP_T_BULK_STR: { - char tmp[32]; - int n; - size_t t; + case RESP_T_BULK_STR: { + char tmp[32]; + int n; + size_t t; if (v->bulk.len > 0 && !v->bulk.ptr) { return -1; @@ -182,12 +182,57 @@ static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) { checked_size_add(t, (size_t)v->bulk.len, &t) < 0 || checked_size_add(t, 2, &len) < 0) { /* trailing \r\n */ return -1; - } - break; - } - - default: - return -1; + } + break; + } + + case RESP_T_ARRAY: { + char tmp[32]; + int n; + size_t t = 0; + + if (v->array.count > 0 && !v->array.items) { + return -1; + } + + n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->array.count); + if (n <= 0) { + return -1; + } + + if (checked_size_add(1, (size_t)n, &t) < 0 || + checked_size_add(t, 2, &t) < 0) { + return -1; + } + + for (uint32_t i = 0; i < v->array.count; ++i) { + size_t item_len = 0; + int item_digits; + + if (v->array.items[i].len > 0 && !v->array.items[i].ptr) { + return -1; + } + + item_digits = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->array.items[i].len); + if (item_digits <= 0) { + return -1; + } + + if (checked_size_add(1, (size_t)item_digits, &item_len) < 0 || + checked_size_add(item_len, 2, &item_len) < 0 || + checked_size_add(item_len, (size_t)v->array.items[i].len, &item_len) < 0 || + checked_size_add(item_len, 2, &item_len) < 0 || + checked_size_add(t, item_len, &t) < 0) { + return -1; + } + } + + len = t; + break; + } + + default: + return -1; } *out_len = len; @@ -324,33 +369,37 @@ int kvs_protocol(struct conn* conn){ resp_len = resp_build_value(&val, response, sizeof(response)); if (resp_len < 0) { - size_t resp_need = 0; - uint8_t *resp_heap = NULL; - - if (resp_value_encoded_len(&val, &resp_need) < 0) { - return -1; - } - - resp_heap = (uint8_t *)kvs_malloc(resp_need); - if (!resp_heap) { - return -1; - } - - resp_len = resp_build_value(&val, resp_heap, resp_need); - if (resp_len < 0 || - chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { - free(resp_heap); - return -1; - } - - free(resp_heap); - resp_len = 0; - } - } - - out_len += (size_t)resp_len; - - // __completed_cmd(request, consumed, 0); + size_t resp_need = 0; + uint8_t *resp_heap = NULL; + + if (resp_value_encoded_len(&val, &resp_need) < 0) { + resp_value_release(&val); + return -1; + } + + resp_heap = (uint8_t *)kvs_malloc(resp_need); + if (!resp_heap) { + resp_value_release(&val); + return -1; + } + + resp_len = resp_build_value(&val, resp_heap, resp_need); + if (resp_len < 0 || + chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { + free(resp_heap); + resp_value_release(&val); + return -1; + } + + free(resp_heap); + resp_len = 0; + } + } + + out_len += (size_t)resp_len; + resp_value_release(&val); + + // __completed_cmd(request, consumed, 0); consumed += len; } diff --git a/kvstore.h b/kvstore.h index 2897f4f..619bc01 100644 --- a/kvstore.h +++ b/kvstore.h @@ -165,8 +165,11 @@ rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen); typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg); rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen, kvs_rbtree_visit_cb cb, void *arg); +rbtree_node *rbtree_mini(rbtree *T, rbtree_node *x); +rbtree_node *rbtree_successor(rbtree *T, rbtree_node *x); const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node); const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node); +int kvs_keycmp(const uint8_t *a, uint32_t alen, const uint8_t *b, uint32_t blen); int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename);