From 78519fbfe57b08a375ecc43f5392478555a51792 Mon Sep 17 00:00:00 2001 From: 1iaan <139833683+1iaan@users.noreply.github.com> Date: Sat, 4 Apr 2026 22:31:00 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8D=95ttl=E6=87=92=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 +- README.md | 12 +--- common/config.c | 19 ++++++ common/config.h | 1 + config/config.xml | 10 ++- kvs_protocol_resp.c | 151 +++++++++++++++++++++++++++++++++++--------- kvs_protocol_resp.h | 7 +- kvs_rbtree_bin.c | 72 +++++++++++++++++++-- kvstore.c | 116 ++++++++++++++++++++++++++++------ kvstore.h | 76 +++++++++++++--------- reactor.c | 3 + 11 files changed, 375 insertions(+), 98 deletions(-) diff --git a/.gitignore b/.gitignore index 3b3abec..e07b62c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,10 @@ *.a /ebpf/libbpf-bootstrap -/doc -/test-redis/results +doc +.workspace.codex +test-redis/results +test-redis/bench kvstore testcase diff --git a/README.md b/README.md index 89acc67..f3ebe00 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# 9.1 Kvstore +# Kvstore ## 环境安装与编译 ```shell @@ -74,6 +74,8 @@ make | 12 | `rbtree_insert` | 2.08 | 0.02 | 926531 | ## 其他测试 +流水线深度高,模拟多客户端并发测试。 + ### 测试1:性能测试 测试条件: 1. 不启用持久化。 @@ -86,23 +88,17 @@ make #### 内存分配: malloc ```bash -lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 average qps:880462 -ALL TESTS PASSED. ``` #### 内存分配: 自实现内存池 ```bash -lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 average qps:942837 -ALL TESTS PASSED. ``` #### 内存分配:jemalloc ```shell -lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 average qps:892493 -ALL TESTS PASSED. ``` ### 测试2:持久化 @@ -116,9 +112,7 @@ ALL TESTS PASSED. 5. 本机发送请求。 ```shell -lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 4 average qps:870227 -ALL TESTS PASSED. ``` ### 测试3:内存 diff --git a/common/config.c b/common/config.c index 31629e9..0c0e9c2 100644 --- a/common/config.c +++ b/common/config.c @@ -60,6 +60,7 @@ static void set_default_config(AppConfig *cfg) cfg->allocator = ALLOC_JEMALLOC; cfg->leak_mode = MEMLEAK_DETECT_OFF; cfg->replica_mode = REPLICA_DISABLE; + cfg->redis_auth_password[0] = '\0'; } /* ---- 字符串转枚举 ---- */ @@ -422,6 +423,22 @@ void memory_load(xmlNodePtr *root, AppConfig *out_cfg){ } } +void redis_compat_load(xmlNodePtr *root, AppConfig *out_cfg) { + xmlNodePtr redis_compat = find_child(*root, "redis_compat"); + if (!redis_compat) return; + + xmlNodePtr auth_password_node = find_child(redis_compat, "auth_password"); + if (auth_password_node) { + xmlChar *txt = xmlNodeGetContent(auth_password_node); + if (txt) { + strncpy(out_cfg->redis_auth_password, (char *)txt, + sizeof(out_cfg->redis_auth_password) - 1); + out_cfg->redis_auth_password[sizeof(out_cfg->redis_auth_password) - 1] = '\0'; + xmlFree(txt); + } + } +} + int config_load(const char *filename, AppConfig *out_cfg) { if (!filename || !out_cfg) return -1; @@ -470,6 +487,8 @@ int config_load(const char *filename, AppConfig *out_cfg) memory_load(&root, out_cfg); + redis_compat_load(&root, out_cfg); + rc = 0; cleanup: if (doc) xmlFreeDoc(doc); diff --git a/common/config.h b/common/config.h index d71a2ed..75a4c03 100644 --- a/common/config.h +++ b/common/config.h @@ -63,6 +63,7 @@ typedef struct { AllocatorType allocator; MemLeakDetectMode leak_mode; ReplicaMode replica_mode; + char redis_auth_password[128]; } AppConfig; /** diff --git a/config/config.xml b/config/config.xml index 1dc72c0..606c18b 100644 --- a/config/config.xml +++ b/config/config.xml @@ -18,8 +18,8 @@ - incremental - data/persist_no_20260307_053744_r1_a1 + none + data kvs_oplog.db none @@ -32,4 +32,8 @@ mypool disable - \ No newline at end of file + + + 123456 + + diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index 05d5d1c..c912c7a 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -336,12 +336,52 @@ static int expect_argv(const resp_cmd_t *cmd, uint32_t n) { const char *command[] = { - "SET", "GET", "DEL", "MOD", "EXIST", + "PING", "AUTH", + "SET", "GET", "DEL", "SETEX", "MOD", "EXIST", "RSET", "RGET", "RDEL", "RMOD", "REXIST", "HSET", "HGET", "HDEL", "HMOD", "HEXIST", "SAVE", "SSYNC", "SREADY", "MEMPRINT" }; +typedef struct ttl_visit_ctx_s { + uint64_t now_ms; + int path_budget; + const uint8_t *target_key; + uint32_t target_key_len; +} ttl_visit_ctx_t; + +static int ttl_schedule_if_expired(rbtree_node *node, void *arg) { + ttl_visit_ctx_t *ctx; + const uint8_t *key; + + if (!node || !arg) return 0; + ctx = (ttl_visit_ctx_t *)arg; + if (node->expire_at_ms == 0 || node->expire_at_ms > ctx->now_ms) return 0; + + key = kvs_rbtree_node_key_ptr(node); + if (!key || node->key_len == 0) return 0; + + if (ctx->target_key_len == node->key_len && + memcmp(key, ctx->target_key, node->key_len) == 0) { + return 0; + } + if (ctx->path_budget <= 0) return 0; + + if (ttl_delete_schedule(key, node->key_len) == 0) { + ctx->path_budget--; + } + return 0; +} + +static int match_auth_password(const resp_slice_t *password) { + size_t expect_len; + + if (!password) return 0; + expect_len = strlen(global_cfg.redis_auth_password); + if (expect_len != (size_t)password->len) return 0; + return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0; +} + /** * 输入:cmd @@ -373,45 +413,89 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { } switch (op) { -#if ENABLE_ARRAY + case KVS_CMD_PING: { + if (cmd->argc != 1) { *out_value = resp_error("ERR wrong number of arguments for 'ping'"); return -1; } + *out_value = resp_simple("PONG"); + return 0; + } + + case KVS_CMD_AUTH: { + if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'auth'"); return -1; } + if (global_cfg.redis_auth_password[0] == '\0' || match_auth_password(&cmd->argv[1])) { + *out_value = resp_simple("OK"); + } else { + *out_value = resp_error("ERR invalid password"); + } + return 0; + } + +#if ENABLE_RBTREE case KVS_CMD_SET: { if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'set'"); return -1; } - // <0 error; 0 success; 1 exist - int r = kvs_array_set_bin(&global_array, - cmd->argv[1].ptr, cmd->argv[1].len, - cmd->argv[2].ptr, cmd->argv[2].len); + int r = kvs_rbtree_set(&global_rbtree, + cmd->argv[1].ptr, cmd->argv[1].len, + cmd->argv[2].ptr, cmd->argv[2].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } - else if (r == 1) { *out_value = resp_error("ERR key has exist"); return 0; } *out_value = resp_simple("OK"); return 0; } case KVS_CMD_GET: { + ttl_visit_ctx_t visit_ctx; + rbtree_node *node; + const uint8_t *v; if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'get'"); return -1; } - uint32_t vlen = 0; - // NULL not exist, NOTNULL exist - const char *v = kvs_array_get_bin(&global_array, cmd->argv[1].ptr, cmd->argv[1].len, &vlen); - if (!v) { *out_value = resp_nil(); return 0; } - *out_value = resp_bulk((const uint8_t*)v, vlen); + visit_ctx.now_ms = kvs_now_ms(); + visit_ctx.path_budget = 1; + visit_ctx.target_key = cmd->argv[1].ptr; + visit_ctx.target_key_len = cmd->argv[1].len; + node = rbtree_search_with_visit(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len, + ttl_schedule_if_expired, &visit_ctx); + if (!node || node == global_rbtree.nil) { *out_value = resp_nil(); return 0; } + if (node->expire_at_ms != 0 && node->expire_at_ms <= visit_ctx.now_ms) { + (void)ttl_delete_schedule(cmd->argv[1].ptr, cmd->argv[1].len); + *out_value = resp_nil(); + return 0; + } + v = kvs_rbtree_node_value_ptr(node); + *out_value = resp_bulk(v, node->value_len); return 0; } case KVS_CMD_DEL: { if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; } - // <0 error; =0 success; >0 no exist - int r = kvs_array_del_bin(&global_array, cmd->argv[1].ptr, cmd->argv[1].len); + int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } - // r == 0, del 1; r > 0, del 0. *out_value = resp_int((r == 0) ? 1 : 0); return 0; } + case KVS_CMD_SETEX: { + int64_t ttl_sec = 0; + uint64_t now_ms; + uint64_t expire_at_ms; + int r; + if (cmd->argc != 4) { *out_value = resp_error("ERR wrong number of arguments for 'setex'"); return -1; } + if (parse_i64(cmd->argv[2].ptr, cmd->argv[2].ptr + cmd->argv[2].len, &ttl_sec) < 0 || ttl_sec <= 0) { + *out_value = resp_error("ERR invalid expire time in 'setex'"); + return -1; + } + now_ms = kvs_now_ms(); + expire_at_ms = now_ms + (uint64_t)ttl_sec * 1000ULL; + r = kvs_rbtree_set_ex(&global_rbtree, + cmd->argv[1].ptr, cmd->argv[1].len, + cmd->argv[3].ptr, cmd->argv[3].len, + expire_at_ms); + if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } + *out_value = resp_simple("OK"); + return 0; + } + case KVS_CMD_MOD: { if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'mod'"); return -1; } - // <0 error; =0 success; >0 no exist - int r = kvs_array_mod_bin(&global_array, - cmd->argv[1].ptr, cmd->argv[1].len, - cmd->argv[2].ptr, cmd->argv[2].len); + int r = kvs_rbtree_mod(&global_rbtree, + cmd->argv[1].ptr, cmd->argv[1].len, + cmd->argv[2].ptr, cmd->argv[2].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } if (r == 0) *out_value = resp_simple("OK"); else *out_value = resp_error("ERR no such key"); @@ -420,34 +504,41 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { case KVS_CMD_EXIST: { if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'exist'"); return -1; } - // =0 exist, =1 no exist - int r = kvs_array_exist_bin(&global_array, cmd->argv[1].ptr, cmd->argv[1].len); + int r = kvs_rbtree_exist(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } *out_value = resp_int((r == 0) ? 1 : 0); return 0; } -#endif -#if ENABLE_RBTREE case KVS_CMD_RSET: { if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'rset'"); return 0; } - // <0 error; 0 success; 1 exist int r = kvs_rbtree_set(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len, cmd->argv[2].ptr, cmd->argv[2].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } - else if (r == 1) { *out_value = resp_error("ERR key has exist"); return 0; } *out_value = resp_simple("OK"); return 0; } case KVS_CMD_RGET: { + ttl_visit_ctx_t visit_ctx; + rbtree_node *node; + const uint8_t *v; if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'rget'"); return 0; } - uint32_t vlen = 0; - // NULL notexist, NOTNULL exist。out_value_len 是长度。 - const char *v = kvs_rbtree_get(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len, &vlen); - if (!v) { *out_value = resp_nil(); return 0; } - *out_value = resp_bulk((const uint8_t*)v, vlen); + visit_ctx.now_ms = kvs_now_ms(); + visit_ctx.path_budget = 1; + visit_ctx.target_key = cmd->argv[1].ptr; + visit_ctx.target_key_len = cmd->argv[1].len; + node = rbtree_search_with_visit(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len, + ttl_schedule_if_expired, &visit_ctx); + if (!node || node == global_rbtree.nil) { *out_value = resp_nil(); return 0; } + if (node->expire_at_ms != 0 && node->expire_at_ms <= visit_ctx.now_ms) { + (void)ttl_delete_schedule(cmd->argv[1].ptr, cmd->argv[1].len); + *out_value = resp_nil(); + return 0; + } + v = kvs_rbtree_node_value_ptr(node); + *out_value = resp_bulk(v, node->value_len); return 0; } diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h index 93fbf24..a624bfa 100644 --- a/kvs_protocol_resp.h +++ b/kvs_protocol_resp.h @@ -17,10 +17,13 @@ int ascii_casecmp(const uint8_t *a, uint32_t alen, const char *b); typedef enum { KVS_CMD_START = 0, + KVS_CMD_PING = KVS_CMD_START, + KVS_CMD_AUTH, // array - KVS_CMD_SET = KVS_CMD_START, + KVS_CMD_SET, KVS_CMD_GET, KVS_CMD_DEL, + KVS_CMD_SETEX, KVS_CMD_MOD, KVS_CMD_EXIST, // rbtree @@ -94,4 +97,4 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value); void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); void __sready(); -#endif \ No newline at end of file +#endif diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index b61acee..552454b 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -2,6 +2,7 @@ #include "kvs_rw_tools.h" #include "memory/alloc_dispatch.h" #include "diskuring/diskuring.h" +#include /* ============================================================================ * 内存布局说明: @@ -31,6 +32,10 @@ static inline uint8_t* rbtree_node_get_key(rbtree_node *node) { return (uint8_t *)node + sizeof(rbtree_node_fixed); } +const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node) { + return node ? (const uint8_t *)rbtree_node_get_key((rbtree_node *)node) : NULL; +} + // ============================================================================ // 辅助函数:获取节点内的value指针 // ============================================================================ @@ -39,6 +44,10 @@ static inline uint8_t* rbtree_node_get_value(rbtree_node *node) { return (uint8_t *)node + sizeof(rbtree_node_fixed) + node->key_len; } +const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node) { + return node ? (const uint8_t *)rbtree_node_get_value((rbtree_node *)node) : NULL; +} + // ============================================================================ // 原始比较函数(保持不变) // ============================================================================ @@ -383,6 +392,27 @@ rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen) { return T->nil; } +rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen, + kvs_rbtree_visit_cb cb, void *arg) { + rbtree_node *node; + + if (!T) return NULL; + + node = T->root; + while (node != T->nil) { + if (cb && cb(node, arg) != 0) { + break; + } + + uint8_t *node_key = rbtree_node_get_key(node); + int c = kvs_keycmp(key, keylen, node_key, node->key_len); + if (c < 0) node = node->left; + else if (c > 0) node = node->right; + else return node; + } + return T->nil; +} + void rbtree_traversal(rbtree *T, rbtree_node *node) { if (node != T->nil) { @@ -412,6 +442,7 @@ int kvs_rbtree_create(kvs_rbtree_t *inst) { inst->nil->color = BLACK; inst->nil->left = inst->nil->right = inst->nil->parent = inst->nil; + inst->nil->expire_at_ms = 0; inst->nil->key_len = 0; inst->nil->value_len = 0; inst->root = inst->nil; @@ -448,6 +479,11 @@ void kvs_rbtree_destroy(kvs_rbtree_t *inst) { * @return: <0 error; 0 success; 1 exist */ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len) { + return kvs_rbtree_set_ex(inst, key, key_len, value, value_len, 0); +} + +int kvs_rbtree_set_ex(kvs_rbtree_t *inst, const void *key, uint32_t key_len, + const void *value, uint32_t value_len, uint64_t expire_at_ms) { if (!inst || !key || !value) return -1; // 1. 查找键是否已存在 @@ -467,6 +503,7 @@ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const new_node->right = existing->right; new_node->left = existing->left; new_node->parent = existing->parent; + new_node->expire_at_ms = expire_at_ms; new_node->key_len = key_len; new_node->value_len = value_len; @@ -500,6 +537,7 @@ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const // 大小相同,直接更新值 uint8_t *val = rbtree_node_get_value(existing); if (value_len > 0) memcpy(val, value, value_len); + existing->expire_at_ms = expire_at_ms; } return 0; } @@ -513,6 +551,7 @@ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const node->key_len = key_len; node->value_len = value_len; + node->expire_at_ms = expire_at_ms; uint8_t *node_key = rbtree_node_get_key(node); uint8_t *node_val = rbtree_node_get_value(node); @@ -592,6 +631,7 @@ int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const new_node->left = node->left; new_node->right = node->right; new_node->parent = node->parent; + new_node->expire_at_ms = node->expire_at_ms; new_node->key_len = node->key_len; new_node->value_len = value_len; @@ -645,9 +685,10 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off uint32_t klen = htonl(node->key_len); uint32_t vlen = htonl(node->value_len); + uint64_t expire_at_ms = htobe64(node->expire_at_ms); - void *bufs[4]; - size_t lens[4]; + void *bufs[5]; + size_t lens[5]; int count = 0; bufs[count] = &klen; @@ -658,6 +699,10 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off lens[count] = sizeof(vlen); count++; + bufs[count] = &expire_at_ms; + lens[count] = sizeof(expire_at_ms); + count++; + uint8_t *node_key = rbtree_node_get_key(node); if (node->key_len > 0) { bufs[count] = node_key; @@ -718,12 +763,25 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){ for (;;) { uint32_t klen_n = 0, vlen_n = 0; + uint64_t expire_at_ms_n = 0; + size_t r = fread(&klen_n, 1, sizeof(klen_n), fp); + + if (r == 0) { + if (feof(fp)) { + fclose(fp); + return 0; + } + fclose(fp); + return -3; + } + if (r != sizeof(klen_n)) { fclose(fp); return -3; } - if (kvs_read_file(fp, &klen_n, 4) < 0) { fclose(fp); return -3; } if (kvs_read_file(fp, &vlen_n, 4) < 0) { fclose(fp); return -3; } + if (kvs_read_file(fp, &expire_at_ms_n, sizeof(expire_at_ms_n)) < 0) { fclose(fp); return -3; } uint32_t klen = ntohl(klen_n); uint32_t vlen = ntohl(vlen_n); + uint64_t expire_at_ms = be64toh(expire_at_ms_n); if (klen == 0) { fclose(fp); return -3; } @@ -735,6 +793,7 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){ memset(node, 0, node_size); node->key_len = klen; node->value_len = vlen; + node->expire_at_ms = expire_at_ms; uint8_t *keybuf = rbtree_node_get_key(node); if (kvs_read_file(fp, keybuf, (size_t)klen) < 0) { @@ -753,6 +812,11 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){ } } + if (node->expire_at_ms != 0 && node->expire_at_ms <= kvs_now_ms()) { + kvs_free(node); + continue; + } + // 使用原生 rbtree_insert 而非 kvs_rbtree_set // 因为 kvs_rbtree_set 会重新分配节点 if (rbtree_insert(inst, node) < 0) { @@ -764,4 +828,4 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){ fclose(fp); return 0; -} \ No newline at end of file +} diff --git a/kvstore.c b/kvstore.c index 9376ab7..78c9e4b 100644 --- a/kvstore.c +++ b/kvstore.c @@ -17,9 +17,10 @@ #include #include #include -#include -#include -#include +#include +#include +#include +#include #define TIME_COLLECT 0 @@ -33,7 +34,16 @@ unsigned long long global_seq; extern int global_oplog_fd; -replica_shm_t g_rep_shm; +replica_shm_t g_rep_shm; + +typedef struct ttl_delete_task_s { + struct ttl_delete_task_s *next; + uint8_t *key; + uint32_t key_len; +} ttl_delete_task_t; + +static ttl_delete_task_t *g_ttl_delete_head = NULL; +static ttl_delete_task_t *g_ttl_delete_tail = NULL; __attribute__((noinline)) void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ @@ -45,13 +55,80 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) #define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) -static int checked_size_add(size_t a, size_t b, size_t *out) { +static int checked_size_add(size_t a, size_t b, size_t *out) { if (!out || a > SIZE_MAX - b) { return -1; } - *out = a + b; - return 0; -} + *out = a + b; + return 0; +} + +uint64_t kvs_now_ms(void) { + struct timespec ts; + + clock_gettime(CLOCK_REALTIME, &ts); + return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL; +} + +int ttl_delete_schedule(const void *key, uint32_t key_len) { + ttl_delete_task_t *task; + + if (!key || key_len == 0) return -1; + + task = (ttl_delete_task_t *)kvs_malloc(sizeof(*task)); + if (!task) return -1; + memset(task, 0, sizeof(*task)); + + task->key = (uint8_t *)kvs_malloc(key_len); + if (!task->key) { + kvs_free(task); + return -1; + } + + memcpy(task->key, key, key_len); + task->key_len = key_len; + + if (g_ttl_delete_tail) { + g_ttl_delete_tail->next = task; + } else { + g_ttl_delete_head = task; + } + g_ttl_delete_tail = task; + return 0; +} + +int ttl_delete_drain(int budget) { + int drained = 0; + +#if !ENABLE_RBTREE + (void)budget; + return 0; +#else + while (budget > 0 && g_ttl_delete_head) { + ttl_delete_task_t *task = g_ttl_delete_head; + rbtree_node *node; + + g_ttl_delete_head = task->next; + if (!g_ttl_delete_head) { + g_ttl_delete_tail = NULL; + } + + node = rbtree_search(&global_rbtree, task->key, task->key_len); + if (node != global_rbtree.nil && + node->expire_at_ms != 0 && + node->expire_at_ms <= kvs_now_ms()) { + (void)kvs_rbtree_del(&global_rbtree, task->key, task->key_len); + } + + kvs_free(task->key); + kvs_free(task); + drained++; + budget--; + } + + return drained; +#endif +} static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) { size_t len = 0; @@ -138,11 +215,12 @@ static int is_update_cmd(const resp_cmd_t *cmd) { return 0; } - c0 = &cmd->argv[0]; - return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + c0 = &cmd->argv[0]; + return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "SETEX") == 0 || + ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || @@ -420,11 +498,13 @@ int init_config(AppConfig *cfg){ printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file); printf("|—— Persist-hash : %s\n", cfg->hash_file); - printf("Log level : %s\n", log_level_to_string(cfg->log_level)); - printf("Memory : \n"); - printf("|——Allocator : %s\n", allocator_to_string(cfg->allocator)); - printf("|——MemLeakDetectMode : %s\n", leakage_to_string(cfg->leak_mode)); - printf("=============== Config ===============\n"); + printf("Log level : %s\n", log_level_to_string(cfg->log_level)); + printf("Memory : \n"); + printf("|——Allocator : %s\n", allocator_to_string(cfg->allocator)); + printf("|——MemLeakDetectMode : %s\n", leakage_to_string(cfg->leak_mode)); + printf("Redis-Compat\t\t: \n"); + printf("|——Auth Password\t: %s\n", cfg->redis_auth_password[0] ? "[configured]" : "[empty]"); + printf("=============== Config ===============\n"); xmlCleanupParser(); return 0; diff --git a/kvstore.h b/kvstore.h index 5605818..2897f4f 100644 --- a/kvstore.h +++ b/kvstore.h @@ -4,8 +4,9 @@ #ifndef __KV_STORE_H__ #define __KV_STORE_H__ -#include "diskuring/diskuring.h" -#include +#include "diskuring/diskuring.h" +#include "common/config.h" +#include #include #include #include @@ -121,24 +122,26 @@ int kvs_array_exist(kvs_array_t *inst, char *key); typedef uint8_t KEY_TYPE; // key // 固定部分结构 -typedef struct { - unsigned char color; - struct _rbtree_node *right; - struct _rbtree_node *left; - struct _rbtree_node *parent; - uint32_t key_len; - uint32_t value_len; -} rbtree_node_fixed; +typedef struct { + unsigned char color; + struct _rbtree_node *right; + struct _rbtree_node *left; + struct _rbtree_node *parent; + uint64_t expire_at_ms; + uint32_t key_len; + uint32_t value_len; +} rbtree_node_fixed; // 完整节点结构(用于类型定义,实际内存大小由分配时确定) typedef struct _rbtree_node { unsigned char color; - struct _rbtree_node *right; - struct _rbtree_node *left; - struct _rbtree_node *parent; - uint32_t key_len; - uint32_t value_len; - // 动态数据:key[key_len] + value[value_len] + struct _rbtree_node *right; + struct _rbtree_node *left; + struct _rbtree_node *parent; + uint64_t expire_at_ms; + uint32_t key_len; + uint32_t value_len; + // 动态数据:key[key_len] + value[value_len] // 不存储为结构体成员,通过指针运算访问 } rbtree_node; @@ -149,13 +152,21 @@ typedef struct _rbtree { typedef struct _rbtree kvs_rbtree_t; -int kvs_rbtree_create(kvs_rbtree_t *inst); -void kvs_rbtree_destroy(kvs_rbtree_t *inst); -int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len); -void* kvs_rbtree_get(kvs_rbtree_t *inst, const void *key, uint32_t key_len, uint32_t *out_valuelen); -int kvs_rbtree_del(rbtree *inst, const void *key, uint32_t key_len); -int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len); -int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len); +int kvs_rbtree_create(kvs_rbtree_t *inst); +void kvs_rbtree_destroy(kvs_rbtree_t *inst); +int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len); +int kvs_rbtree_set_ex(kvs_rbtree_t *inst, const void *key, uint32_t key_len, + const void *value, uint32_t value_len, uint64_t expire_at_ms); +void* kvs_rbtree_get(kvs_rbtree_t *inst, const void *key, uint32_t key_len, uint32_t *out_valuelen); +int kvs_rbtree_del(rbtree *inst, const void *key, uint32_t key_len); +int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len); +int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len); +rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen); +typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg); +rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen, + kvs_rbtree_visit_cb cb, void *arg); +const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node); +const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node); int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename); @@ -292,13 +303,18 @@ extern kvs_array_t global_array; extern kvs_rbtree_t global_rbtree; #endif -#if ENABLE_HASH -extern kvs_hash_t global_hash; -#endif - - - -#endif +#if ENABLE_HASH +extern kvs_hash_t global_hash; +#endif + +extern AppConfig global_cfg; + +uint64_t kvs_now_ms(void); +int ttl_delete_schedule(const void *key, uint32_t key_len); +int ttl_delete_drain(int budget); + + +#endif diff --git a/reactor.c b/reactor.c index 440566a..dc4304c 100644 --- a/reactor.c +++ b/reactor.c @@ -30,6 +30,7 @@ typedef int (*msg_handler)(struct conn *conn); static msg_handler kvs_handler; extern iouring_ctx_t global_uring_ctx; +extern int ttl_delete_drain(int budget); int kvs_request(struct conn *c) { return kvs_handler ? kvs_handler(c) : -1; @@ -603,6 +604,8 @@ int reactor_start(unsigned short port, msg_handler handler) { conn_list[connfd].send_callback(connfd); } } + + (void)ttl_delete_drain(8); } for (i = 0; i < listen_count; i++) {