feat: add SCANPREFIX/KEYSPREFIX/SCANRANGE/KEYSRANGE

This commit is contained in:
1iaan
2026-04-07 12:39:55 +08:00
parent aee84df665
commit 8fdefc2100
4 changed files with 296 additions and 46 deletions

View File

@@ -275,6 +275,21 @@ int resp_build_value(const resp_value_t *v, uint8_t *out, size_t cap) {
if (write_crlf(&p, end) < 0) return -1; if (write_crlf(&p, end) < 0) return -1;
break; break;
case RESP_T_ARRAY:
if (write_bytes(&p, end, "*", 1) < 0) return -1;
if (write_u32_ascii(&p, end, v->array.count) < 0) return -1;
if (write_crlf(&p, end) < 0) return -1;
for (uint32_t i = 0; i < v->array.count; ++i) {
if (write_bytes(&p, end, "$", 1) < 0) return -1;
if (write_u32_ascii(&p, end, v->array.items[i].len) < 0) return -1;
if (write_crlf(&p, end) < 0) return -1;
if (v->array.items[i].len > 0 && v->array.items[i].ptr) {
if (write_bytes(&p, end, v->array.items[i].ptr, v->array.items[i].len) < 0) return -1;
}
if (write_crlf(&p, end) < 0) return -1;
}
break;
default: default:
return -1; return -1;
} }
@@ -285,8 +300,8 @@ int resp_build_value(const resp_value_t *v, uint8_t *out, size_t cap) {
/* helpers */ /* helpers */
resp_value_t resp_simple(const char *s) { resp_value_t resp_simple(const char *s) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_SIMPLE_STR; v.type = RESP_T_SIMPLE_STR;
v.i64 = 0;
v.bulk.ptr = (const uint8_t*)s; v.bulk.ptr = (const uint8_t*)s;
v.bulk.len = (uint32_t)strlen(s); v.bulk.len = (uint32_t)strlen(s);
return v; return v;
@@ -294,8 +309,8 @@ resp_value_t resp_simple(const char *s) {
resp_value_t resp_error(const char *s) { resp_value_t resp_error(const char *s) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_ERROR; v.type = RESP_T_ERROR;
v.i64 = 0;
v.bulk.ptr = (const uint8_t*)s; v.bulk.ptr = (const uint8_t*)s;
v.bulk.len = (uint32_t)strlen(s); v.bulk.len = (uint32_t)strlen(s);
return v; return v;
@@ -303,17 +318,16 @@ resp_value_t resp_error(const char *s) {
resp_value_t resp_int(int64_t x) { resp_value_t resp_int(int64_t x) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_INTEGER; v.type = RESP_T_INTEGER;
v.i64 = x; v.i64 = x;
v.bulk.ptr = NULL;
v.bulk.len = 0;
return v; return v;
} }
resp_value_t resp_bulk(const uint8_t *p, uint32_t n) { resp_value_t resp_bulk(const uint8_t *p, uint32_t n) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_BULK_STR; v.type = RESP_T_BULK_STR;
v.i64 = 0;
v.bulk.ptr = p; v.bulk.ptr = p;
v.bulk.len = n; v.bulk.len = n;
return v; return v;
@@ -321,13 +335,31 @@ resp_value_t resp_bulk(const uint8_t *p, uint32_t n) {
resp_value_t resp_nil(void) { resp_value_t resp_nil(void) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_NIL; v.type = RESP_T_NIL;
v.i64 = 0;
v.bulk.ptr = NULL;
v.bulk.len = 0;
return v; return v;
} }
resp_value_t resp_array(resp_slice_t *items, uint32_t count, int owns_items) {
resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_ARRAY;
v.array.items = items;
v.array.count = count;
v.array.owns_items = owns_items ? 1 : 0;
return v;
}
void resp_value_release(resp_value_t *v) {
if (!v) return;
if (v->type == RESP_T_ARRAY && v->array.owns_items && v->array.items) {
kvs_free(v->array.items);
}
memset(v, 0, sizeof(*v));
v->type = RESP_T_NIL;
return;
}
/* ----------------- dispatcher (minimal) ----------------- */ /* ----------------- dispatcher (minimal) ----------------- */
static int expect_argv(const resp_cmd_t *cmd, uint32_t n) { static int expect_argv(const resp_cmd_t *cmd, uint32_t n) {
@@ -337,7 +369,7 @@ static int expect_argv(const resp_cmd_t *cmd, uint32_t n) {
const char *command[] = { const char *command[] = {
"PING", "AUTH", "PING", "AUTH",
"SET", "GET", "DEL", "SETEX", "MOD", "EXIST", "SET", "GET", "DEL", "SETEX", "MOD", "EXIST", "SCANPREFIX", "KEYSPREFIX", "SCANRANGE", "KEYSRANGE",
"RSET", "RGET", "RDEL", "RMOD", "REXIST", "RSET", "RGET", "RDEL", "RMOD", "REXIST",
"HSET", "HGET", "HDEL", "HMOD", "HEXIST", "HSET", "HGET", "HDEL", "HMOD", "HEXIST",
"SAVE", "SSYNC", "SREADY", "MEMPRINT" "SAVE", "SSYNC", "SREADY", "MEMPRINT"
@@ -382,6 +414,120 @@ static int match_auth_password(const resp_slice_t *password) {
return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0; return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0;
} }
static int prefix_match(const uint8_t *key, uint32_t key_len,
const uint8_t *prefix, uint32_t prefix_len) {
if (!key || !prefix) return 0;
if (key_len < prefix_len) return 0;
return memcmp(key, prefix, prefix_len) == 0;
}
static rbtree_node *rbtree_lower_bound(rbtree *T, const uint8_t *key, uint32_t keylen) {
rbtree_node *cur;
rbtree_node *best;
if (!T || !key) return NULL;
cur = T->root;
best = T->nil;
while (cur != T->nil) {
const uint8_t *cur_key = kvs_rbtree_node_key_ptr(cur);
int cmp = kvs_keycmp(cur_key, cur->key_len, key, keylen);
if (cmp < 0) {
cur = cur->right;
} else {
best = cur;
cur = cur->left;
}
}
return best;
}
static int append_scan_item(resp_slice_t **items, uint32_t *count, uint32_t *cap,
const uint8_t *ptr, uint32_t len) {
resp_slice_t *new_items;
uint32_t new_cap;
if (!items || !count || !cap) return -1;
if (*count >= *cap) {
new_cap = (*cap == 0) ? 16u : (*cap * 2u);
new_items = (resp_slice_t *)kvs_malloc(sizeof(resp_slice_t) * (size_t)new_cap);
if (!new_items) return -1;
if (*items && *count > 0) {
memcpy(new_items, *items, sizeof(resp_slice_t) * (size_t)(*count));
kvs_free(*items);
}
*items = new_items;
*cap = new_cap;
}
(*items)[*count].ptr = ptr;
(*items)[*count].len = len;
(*count)++;
return 0;
}
static int build_rbtree_scan_response(const uint8_t *begin, uint32_t begin_len,
const uint8_t *end, uint32_t end_len,
int prefix_mode, int keys_only,
resp_value_t *out_value) {
resp_slice_t *items = NULL;
uint32_t count = 0;
uint32_t cap = 0;
rbtree_node *node;
uint64_t now_ms;
if (!out_value) return -1;
now_ms = kvs_now_ms();
if (begin && begin_len > 0) {
node = rbtree_lower_bound(&global_rbtree, begin, begin_len);
} else if (global_rbtree.root != global_rbtree.nil) {
node = rbtree_mini(&global_rbtree, global_rbtree.root);
} else {
node = global_rbtree.nil;
}
while (node && node != global_rbtree.nil) {
const uint8_t *key = kvs_rbtree_node_key_ptr(node);
const uint8_t *value = kvs_rbtree_node_value_ptr(node);
rbtree_node *next = rbtree_successor(&global_rbtree, node);
if (prefix_mode) {
if (begin && begin_len > 0 && !prefix_match(key, node->key_len, begin, begin_len)) {
break;
}
} else if (end && end_len > 0) {
int end_cmp = kvs_keycmp(key, node->key_len, end, end_len);
if (end_cmp > 0) {
break;
}
}
if (node->expire_at_ms != 0 && node->expire_at_ms <= now_ms) {
(void)ttl_delete_schedule(key, node->key_len);
node = next;
continue;
}
if (append_scan_item(&items, &count, &cap, key, node->key_len) < 0) {
if (items) kvs_free(items);
return -1;
}
if (!keys_only &&
append_scan_item(&items, &count, &cap, value, node->value_len) < 0) {
if (items) kvs_free(items);
return -1;
}
node = next;
}
*out_value = resp_array(items, count, 1);
return 0;
}
/** /**
* 输入cmd * 输入cmd
@@ -462,6 +608,44 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) {
return 0; return 0;
} }
case KVS_CMD_SCANPREFIX: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'scanprefix'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
NULL, 0, 1, 0, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_KEYSPREFIX: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'keysprefix'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
NULL, 0, 1, 1, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_SCANRANGE: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'scanrange'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len,
0, 0, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_KEYSRANGE: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'keysrange'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len,
0, 1, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_DEL: { case KVS_CMD_DEL: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; } if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; }
int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len); int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len);

View File

@@ -26,6 +26,10 @@ typedef enum {
KVS_CMD_SETEX, KVS_CMD_SETEX,
KVS_CMD_MOD, KVS_CMD_MOD,
KVS_CMD_EXIST, KVS_CMD_EXIST,
KVS_CMD_SCANPREFIX,
KVS_CMD_KEYSPREFIX,
KVS_CMD_SCANRANGE,
KVS_CMD_KEYSRANGE,
// rbtree // rbtree
KVS_CMD_RSET, KVS_CMD_RSET,
KVS_CMD_RGET, KVS_CMD_RGET,
@@ -62,12 +66,20 @@ typedef enum resp_type_e {
RESP_T_INTEGER, RESP_T_INTEGER,
RESP_T_BULK_STR, RESP_T_BULK_STR,
RESP_T_NIL, /* nil bulk string ($-1\r\n) */ RESP_T_NIL, /* nil bulk string ($-1\r\n) */
RESP_T_ARRAY,
} resp_type_t; } resp_type_t;
typedef struct resp_array_s {
resp_slice_t *items;
uint32_t count;
int owns_items;
} resp_array_t;
typedef struct resp_value_s { typedef struct resp_value_s {
resp_type_t type; resp_type_t type;
int64_t i64; /* for integer */ int64_t i64; /* for integer */
resp_slice_t bulk; /* for simple/error/bulk string (bytes) */ resp_slice_t bulk; /* for simple/error/bulk string (bytes) */
resp_array_t array; /* for arrays */
} resp_value_t; } resp_value_t;
/* ----------------- parsing ----------------- */ /* ----------------- parsing ----------------- */
@@ -86,6 +98,8 @@ resp_value_t resp_error(const char *s); /* -s\r\n */
resp_value_t resp_int(int64_t x); /* :x\r\n */ resp_value_t resp_int(int64_t x); /* :x\r\n */
resp_value_t resp_bulk(const uint8_t *p, uint32_t n); /* $n\r\n...\r\n */ resp_value_t resp_bulk(const uint8_t *p, uint32_t n); /* $n\r\n...\r\n */
resp_value_t resp_nil(void); /* $-1\r\n */ resp_value_t resp_nil(void); /* $-1\r\n */
resp_value_t resp_array(resp_slice_t *items, uint32_t count, int owns_items);
void resp_value_release(resp_value_t *v);
/* ----------------- dispatcher ----------------- */ /* ----------------- dispatcher ----------------- */
/* Execute one command and return a RESP value. /* Execute one command and return a RESP value.

View File

@@ -186,6 +186,51 @@ static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) {
break; break;
} }
case RESP_T_ARRAY: {
char tmp[32];
int n;
size_t t = 0;
if (v->array.count > 0 && !v->array.items) {
return -1;
}
n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->array.count);
if (n <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)n, &t) < 0 ||
checked_size_add(t, 2, &t) < 0) {
return -1;
}
for (uint32_t i = 0; i < v->array.count; ++i) {
size_t item_len = 0;
int item_digits;
if (v->array.items[i].len > 0 && !v->array.items[i].ptr) {
return -1;
}
item_digits = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->array.items[i].len);
if (item_digits <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)item_digits, &item_len) < 0 ||
checked_size_add(item_len, 2, &item_len) < 0 ||
checked_size_add(item_len, (size_t)v->array.items[i].len, &item_len) < 0 ||
checked_size_add(item_len, 2, &item_len) < 0 ||
checked_size_add(t, item_len, &t) < 0) {
return -1;
}
}
len = t;
break;
}
default: default:
return -1; return -1;
} }
@@ -328,11 +373,13 @@ int kvs_protocol(struct conn* conn){
uint8_t *resp_heap = NULL; uint8_t *resp_heap = NULL;
if (resp_value_encoded_len(&val, &resp_need) < 0) { if (resp_value_encoded_len(&val, &resp_need) < 0) {
resp_value_release(&val);
return -1; return -1;
} }
resp_heap = (uint8_t *)kvs_malloc(resp_need); resp_heap = (uint8_t *)kvs_malloc(resp_need);
if (!resp_heap) { if (!resp_heap) {
resp_value_release(&val);
return -1; return -1;
} }
@@ -340,6 +387,7 @@ int kvs_protocol(struct conn* conn){
if (resp_len < 0 || if (resp_len < 0 ||
chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) {
free(resp_heap); free(resp_heap);
resp_value_release(&val);
return -1; return -1;
} }
@@ -349,6 +397,7 @@ int kvs_protocol(struct conn* conn){
} }
out_len += (size_t)resp_len; out_len += (size_t)resp_len;
resp_value_release(&val);
// __completed_cmd(request, consumed, 0); // __completed_cmd(request, consumed, 0);

View File

@@ -165,8 +165,11 @@ rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen);
typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg); typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg);
rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen, rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen,
kvs_rbtree_visit_cb cb, void *arg); kvs_rbtree_visit_cb cb, void *arg);
rbtree_node *rbtree_mini(rbtree *T, rbtree_node *x);
rbtree_node *rbtree_successor(rbtree *T, rbtree_node *x);
const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node); const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node);
const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node); const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node);
int kvs_keycmp(const uint8_t *a, uint32_t alen, const uint8_t *b, uint32_t blen);
int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename);
int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename);