简单ttl懒删除支持

This commit is contained in:
1iaan
2026-04-04 22:31:00 +08:00
parent 6ede44bd80
commit 78519fbfe5
11 changed files with 375 additions and 98 deletions

6
.gitignore vendored
View File

@@ -5,8 +5,10 @@
*.a
/ebpf/libbpf-bootstrap
/doc
/test-redis/results
doc
.workspace.codex
test-redis/results
test-redis/bench
kvstore
testcase

View File

@@ -1,4 +1,4 @@
# 9.1 Kvstore
# Kvstore
## 环境安装与编译
```shell
@@ -74,6 +74,8 @@ make
| 12 | `rbtree_insert` | 2.08 | 0.02 | 926531 |
## 其他测试
流水线深度高,模拟多客户端并发测试。
### 测试1性能测试
测试条件:
1. 不启用持久化。
@@ -86,23 +88,17 @@ make
#### 内存分配: malloc
```bash
lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3
average qps:880462
ALL TESTS PASSED.
```
#### 内存分配: 自实现内存池
```bash
lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3
average qps:942837
ALL TESTS PASSED.
```
#### 内存分配jemalloc
```shell
lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3
average qps:892493
ALL TESTS PASSED.
```
### 测试2持久化
@@ -116,9 +112,7 @@ ALL TESTS PASSED.
5. 本机发送请求。
```shell
lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 4
average qps:870227
ALL TESTS PASSED.
```
### 测试3内存

View File

@@ -60,6 +60,7 @@ static void set_default_config(AppConfig *cfg)
cfg->allocator = ALLOC_JEMALLOC;
cfg->leak_mode = MEMLEAK_DETECT_OFF;
cfg->replica_mode = REPLICA_DISABLE;
cfg->redis_auth_password[0] = '\0';
}
/* ---- 字符串转枚举 ---- */
@@ -422,6 +423,22 @@ void memory_load(xmlNodePtr *root, AppConfig *out_cfg){
}
}
void redis_compat_load(xmlNodePtr *root, AppConfig *out_cfg) {
xmlNodePtr redis_compat = find_child(*root, "redis_compat");
if (!redis_compat) return;
xmlNodePtr auth_password_node = find_child(redis_compat, "auth_password");
if (auth_password_node) {
xmlChar *txt = xmlNodeGetContent(auth_password_node);
if (txt) {
strncpy(out_cfg->redis_auth_password, (char *)txt,
sizeof(out_cfg->redis_auth_password) - 1);
out_cfg->redis_auth_password[sizeof(out_cfg->redis_auth_password) - 1] = '\0';
xmlFree(txt);
}
}
}
int config_load(const char *filename, AppConfig *out_cfg)
{
if (!filename || !out_cfg) return -1;
@@ -470,6 +487,8 @@ int config_load(const char *filename, AppConfig *out_cfg)
memory_load(&root, out_cfg);
redis_compat_load(&root, out_cfg);
rc = 0;
cleanup:
if (doc) xmlFreeDoc(doc);

View File

@@ -63,6 +63,7 @@ typedef struct {
AllocatorType allocator;
MemLeakDetectMode leak_mode;
ReplicaMode replica_mode;
char redis_auth_password[128];
} AppConfig;
/**

View File

@@ -18,8 +18,8 @@
</log>
<persistence>
<type>incremental</type>
<dir>data/persist_no_20260307_053744_r1_a1</dir>
<type>none</type>
<dir>data</dir>
<wal>kvs_oplog.db</wal>
<oplog_sync>none</oplog_sync>
@@ -32,4 +32,8 @@
<allocator>mypool</allocator>
<leakage>disable</leakage>
</memory>
</config>
<redis_compat>
<auth_password>123456</auth_password>
</redis_compat>
</config>

View File

@@ -336,12 +336,52 @@ static int expect_argv(const resp_cmd_t *cmd, uint32_t n) {
const char *command[] = {
"SET", "GET", "DEL", "MOD", "EXIST",
"PING", "AUTH",
"SET", "GET", "DEL", "SETEX", "MOD", "EXIST",
"RSET", "RGET", "RDEL", "RMOD", "REXIST",
"HSET", "HGET", "HDEL", "HMOD", "HEXIST",
"SAVE", "SSYNC", "SREADY", "MEMPRINT"
};
typedef struct ttl_visit_ctx_s {
uint64_t now_ms;
int path_budget;
const uint8_t *target_key;
uint32_t target_key_len;
} ttl_visit_ctx_t;
static int ttl_schedule_if_expired(rbtree_node *node, void *arg) {
ttl_visit_ctx_t *ctx;
const uint8_t *key;
if (!node || !arg) return 0;
ctx = (ttl_visit_ctx_t *)arg;
if (node->expire_at_ms == 0 || node->expire_at_ms > ctx->now_ms) return 0;
key = kvs_rbtree_node_key_ptr(node);
if (!key || node->key_len == 0) return 0;
if (ctx->target_key_len == node->key_len &&
memcmp(key, ctx->target_key, node->key_len) == 0) {
return 0;
}
if (ctx->path_budget <= 0) return 0;
if (ttl_delete_schedule(key, node->key_len) == 0) {
ctx->path_budget--;
}
return 0;
}
static int match_auth_password(const resp_slice_t *password) {
size_t expect_len;
if (!password) return 0;
expect_len = strlen(global_cfg.redis_auth_password);
if (expect_len != (size_t)password->len) return 0;
return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0;
}
/**
* 输入cmd
@@ -373,45 +413,89 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) {
}
switch (op) {
#if ENABLE_ARRAY
case KVS_CMD_PING: {
if (cmd->argc != 1) { *out_value = resp_error("ERR wrong number of arguments for 'ping'"); return -1; }
*out_value = resp_simple("PONG");
return 0;
}
case KVS_CMD_AUTH: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'auth'"); return -1; }
if (global_cfg.redis_auth_password[0] == '\0' || match_auth_password(&cmd->argv[1])) {
*out_value = resp_simple("OK");
} else {
*out_value = resp_error("ERR invalid password");
}
return 0;
}
#if ENABLE_RBTREE
case KVS_CMD_SET: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'set'"); return -1; }
// <0 error; 0 success; 1 exist
int r = kvs_array_set_bin(&global_array,
cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len);
int r = kvs_rbtree_set(&global_rbtree,
cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len);
if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; }
else if (r == 1) { *out_value = resp_error("ERR key has exist"); return 0; }
*out_value = resp_simple("OK");
return 0;
}
case KVS_CMD_GET: {
ttl_visit_ctx_t visit_ctx;
rbtree_node *node;
const uint8_t *v;
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'get'"); return -1; }
uint32_t vlen = 0;
// NULL not exist, NOTNULL exist
const char *v = kvs_array_get_bin(&global_array, cmd->argv[1].ptr, cmd->argv[1].len, &vlen);
if (!v) { *out_value = resp_nil(); return 0; }
*out_value = resp_bulk((const uint8_t*)v, vlen);
visit_ctx.now_ms = kvs_now_ms();
visit_ctx.path_budget = 1;
visit_ctx.target_key = cmd->argv[1].ptr;
visit_ctx.target_key_len = cmd->argv[1].len;
node = rbtree_search_with_visit(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len,
ttl_schedule_if_expired, &visit_ctx);
if (!node || node == global_rbtree.nil) { *out_value = resp_nil(); return 0; }
if (node->expire_at_ms != 0 && node->expire_at_ms <= visit_ctx.now_ms) {
(void)ttl_delete_schedule(cmd->argv[1].ptr, cmd->argv[1].len);
*out_value = resp_nil();
return 0;
}
v = kvs_rbtree_node_value_ptr(node);
*out_value = resp_bulk(v, node->value_len);
return 0;
}
case KVS_CMD_DEL: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; }
// <0 error; =0 success; >0 no exist
int r = kvs_array_del_bin(&global_array, cmd->argv[1].ptr, cmd->argv[1].len);
int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len);
if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; }
// r == 0, del 1; r > 0, del 0.
*out_value = resp_int((r == 0) ? 1 : 0);
return 0;
}
case KVS_CMD_SETEX: {
int64_t ttl_sec = 0;
uint64_t now_ms;
uint64_t expire_at_ms;
int r;
if (cmd->argc != 4) { *out_value = resp_error("ERR wrong number of arguments for 'setex'"); return -1; }
if (parse_i64(cmd->argv[2].ptr, cmd->argv[2].ptr + cmd->argv[2].len, &ttl_sec) < 0 || ttl_sec <= 0) {
*out_value = resp_error("ERR invalid expire time in 'setex'");
return -1;
}
now_ms = kvs_now_ms();
expire_at_ms = now_ms + (uint64_t)ttl_sec * 1000ULL;
r = kvs_rbtree_set_ex(&global_rbtree,
cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[3].ptr, cmd->argv[3].len,
expire_at_ms);
if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; }
*out_value = resp_simple("OK");
return 0;
}
case KVS_CMD_MOD: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'mod'"); return -1; }
// <0 error; =0 success; >0 no exist
int r = kvs_array_mod_bin(&global_array,
cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len);
int r = kvs_rbtree_mod(&global_rbtree,
cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len);
if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; }
if (r == 0) *out_value = resp_simple("OK");
else *out_value = resp_error("ERR no such key");
@@ -420,34 +504,41 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) {
case KVS_CMD_EXIST: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'exist'"); return -1; }
// =0 exist, =1 no exist
int r = kvs_array_exist_bin(&global_array, cmd->argv[1].ptr, cmd->argv[1].len);
int r = kvs_rbtree_exist(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len);
if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; }
*out_value = resp_int((r == 0) ? 1 : 0);
return 0;
}
#endif
#if ENABLE_RBTREE
case KVS_CMD_RSET: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'rset'"); return 0; }
// <0 error; 0 success; 1 exist
int r = kvs_rbtree_set(&global_rbtree,
cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len);
if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; }
else if (r == 1) { *out_value = resp_error("ERR key has exist"); return 0; }
*out_value = resp_simple("OK");
return 0;
}
case KVS_CMD_RGET: {
ttl_visit_ctx_t visit_ctx;
rbtree_node *node;
const uint8_t *v;
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'rget'"); return 0; }
uint32_t vlen = 0;
// NULL notexist, NOTNULL exist。out_value_len 是长度。
const char *v = kvs_rbtree_get(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len, &vlen);
if (!v) { *out_value = resp_nil(); return 0; }
*out_value = resp_bulk((const uint8_t*)v, vlen);
visit_ctx.now_ms = kvs_now_ms();
visit_ctx.path_budget = 1;
visit_ctx.target_key = cmd->argv[1].ptr;
visit_ctx.target_key_len = cmd->argv[1].len;
node = rbtree_search_with_visit(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len,
ttl_schedule_if_expired, &visit_ctx);
if (!node || node == global_rbtree.nil) { *out_value = resp_nil(); return 0; }
if (node->expire_at_ms != 0 && node->expire_at_ms <= visit_ctx.now_ms) {
(void)ttl_delete_schedule(cmd->argv[1].ptr, cmd->argv[1].len);
*out_value = resp_nil();
return 0;
}
v = kvs_rbtree_node_value_ptr(node);
*out_value = resp_bulk(v, node->value_len);
return 0;
}

View File

@@ -17,10 +17,13 @@ int ascii_casecmp(const uint8_t *a, uint32_t alen, const char *b);
typedef enum {
KVS_CMD_START = 0,
KVS_CMD_PING = KVS_CMD_START,
KVS_CMD_AUTH,
// array
KVS_CMD_SET = KVS_CMD_START,
KVS_CMD_SET,
KVS_CMD_GET,
KVS_CMD_DEL,
KVS_CMD_SETEX,
KVS_CMD_MOD,
KVS_CMD_EXIST,
// rbtree
@@ -94,4 +97,4 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value);
void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq);
void __sready();
#endif
#endif

View File

@@ -2,6 +2,7 @@
#include "kvs_rw_tools.h"
#include "memory/alloc_dispatch.h"
#include "diskuring/diskuring.h"
#include <endian.h>
/* ============================================================================
* 内存布局说明:
@@ -31,6 +32,10 @@ static inline uint8_t* rbtree_node_get_key(rbtree_node *node) {
return (uint8_t *)node + sizeof(rbtree_node_fixed);
}
const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node) {
return node ? (const uint8_t *)rbtree_node_get_key((rbtree_node *)node) : NULL;
}
// ============================================================================
// 辅助函数获取节点内的value指针
// ============================================================================
@@ -39,6 +44,10 @@ static inline uint8_t* rbtree_node_get_value(rbtree_node *node) {
return (uint8_t *)node + sizeof(rbtree_node_fixed) + node->key_len;
}
const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node) {
return node ? (const uint8_t *)rbtree_node_get_value((rbtree_node *)node) : NULL;
}
// ============================================================================
// 原始比较函数(保持不变)
// ============================================================================
@@ -383,6 +392,27 @@ rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen) {
return T->nil;
}
rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen,
kvs_rbtree_visit_cb cb, void *arg) {
rbtree_node *node;
if (!T) return NULL;
node = T->root;
while (node != T->nil) {
if (cb && cb(node, arg) != 0) {
break;
}
uint8_t *node_key = rbtree_node_get_key(node);
int c = kvs_keycmp(key, keylen, node_key, node->key_len);
if (c < 0) node = node->left;
else if (c > 0) node = node->right;
else return node;
}
return T->nil;
}
void rbtree_traversal(rbtree *T, rbtree_node *node) {
if (node != T->nil) {
@@ -412,6 +442,7 @@ int kvs_rbtree_create(kvs_rbtree_t *inst) {
inst->nil->color = BLACK;
inst->nil->left = inst->nil->right = inst->nil->parent = inst->nil;
inst->nil->expire_at_ms = 0;
inst->nil->key_len = 0;
inst->nil->value_len = 0;
inst->root = inst->nil;
@@ -448,6 +479,11 @@ void kvs_rbtree_destroy(kvs_rbtree_t *inst) {
* @return: <0 error; 0 success; 1 exist
*/
int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len) {
return kvs_rbtree_set_ex(inst, key, key_len, value, value_len, 0);
}
int kvs_rbtree_set_ex(kvs_rbtree_t *inst, const void *key, uint32_t key_len,
const void *value, uint32_t value_len, uint64_t expire_at_ms) {
if (!inst || !key || !value) return -1;
// 1. 查找键是否已存在
@@ -467,6 +503,7 @@ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const
new_node->right = existing->right;
new_node->left = existing->left;
new_node->parent = existing->parent;
new_node->expire_at_ms = expire_at_ms;
new_node->key_len = key_len;
new_node->value_len = value_len;
@@ -500,6 +537,7 @@ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const
// 大小相同,直接更新值
uint8_t *val = rbtree_node_get_value(existing);
if (value_len > 0) memcpy(val, value, value_len);
existing->expire_at_ms = expire_at_ms;
}
return 0;
}
@@ -513,6 +551,7 @@ int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const
node->key_len = key_len;
node->value_len = value_len;
node->expire_at_ms = expire_at_ms;
uint8_t *node_key = rbtree_node_get_key(node);
uint8_t *node_val = rbtree_node_get_value(node);
@@ -592,6 +631,7 @@ int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const
new_node->left = node->left;
new_node->right = node->right;
new_node->parent = node->parent;
new_node->expire_at_ms = node->expire_at_ms;
new_node->key_len = node->key_len;
new_node->value_len = value_len;
@@ -645,9 +685,10 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off
uint32_t klen = htonl(node->key_len);
uint32_t vlen = htonl(node->value_len);
uint64_t expire_at_ms = htobe64(node->expire_at_ms);
void *bufs[4];
size_t lens[4];
void *bufs[5];
size_t lens[5];
int count = 0;
bufs[count] = &klen;
@@ -658,6 +699,10 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off
lens[count] = sizeof(vlen);
count++;
bufs[count] = &expire_at_ms;
lens[count] = sizeof(expire_at_ms);
count++;
uint8_t *node_key = rbtree_node_get_key(node);
if (node->key_len > 0) {
bufs[count] = node_key;
@@ -718,12 +763,25 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){
for (;;) {
uint32_t klen_n = 0, vlen_n = 0;
uint64_t expire_at_ms_n = 0;
size_t r = fread(&klen_n, 1, sizeof(klen_n), fp);
if (r == 0) {
if (feof(fp)) {
fclose(fp);
return 0;
}
fclose(fp);
return -3;
}
if (r != sizeof(klen_n)) { fclose(fp); return -3; }
if (kvs_read_file(fp, &klen_n, 4) < 0) { fclose(fp); return -3; }
if (kvs_read_file(fp, &vlen_n, 4) < 0) { fclose(fp); return -3; }
if (kvs_read_file(fp, &expire_at_ms_n, sizeof(expire_at_ms_n)) < 0) { fclose(fp); return -3; }
uint32_t klen = ntohl(klen_n);
uint32_t vlen = ntohl(vlen_n);
uint64_t expire_at_ms = be64toh(expire_at_ms_n);
if (klen == 0) { fclose(fp); return -3; }
@@ -735,6 +793,7 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){
memset(node, 0, node_size);
node->key_len = klen;
node->value_len = vlen;
node->expire_at_ms = expire_at_ms;
uint8_t *keybuf = rbtree_node_get_key(node);
if (kvs_read_file(fp, keybuf, (size_t)klen) < 0) {
@@ -753,6 +812,11 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){
}
}
if (node->expire_at_ms != 0 && node->expire_at_ms <= kvs_now_ms()) {
kvs_free(node);
continue;
}
// 使用原生 rbtree_insert 而非 kvs_rbtree_set
// 因为 kvs_rbtree_set 会重新分配节点
if (rbtree_insert(inst, node) < 0) {
@@ -764,4 +828,4 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){
fclose(fp);
return 0;
}
}

116
kvstore.c
View File

@@ -17,9 +17,10 @@
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <libxml/parser.h>
#include <limits.h>
#include <arpa/inet.h>
#include <libxml/parser.h>
#include <limits.h>
#include <time.h>
#define TIME_COLLECT 0
@@ -33,7 +34,16 @@ unsigned long long global_seq;
extern int global_oplog_fd;
replica_shm_t g_rep_shm;
replica_shm_t g_rep_shm;
typedef struct ttl_delete_task_s {
struct ttl_delete_task_s *next;
uint8_t *key;
uint32_t key_len;
} ttl_delete_task_t;
static ttl_delete_task_t *g_ttl_delete_head = NULL;
static ttl_delete_task_t *g_ttl_delete_tail = NULL;
__attribute__((noinline))
void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){
@@ -45,13 +55,80 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec))
static int checked_size_add(size_t a, size_t b, size_t *out) {
static int checked_size_add(size_t a, size_t b, size_t *out) {
if (!out || a > SIZE_MAX - b) {
return -1;
}
*out = a + b;
return 0;
}
*out = a + b;
return 0;
}
uint64_t kvs_now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
}
int ttl_delete_schedule(const void *key, uint32_t key_len) {
ttl_delete_task_t *task;
if (!key || key_len == 0) return -1;
task = (ttl_delete_task_t *)kvs_malloc(sizeof(*task));
if (!task) return -1;
memset(task, 0, sizeof(*task));
task->key = (uint8_t *)kvs_malloc(key_len);
if (!task->key) {
kvs_free(task);
return -1;
}
memcpy(task->key, key, key_len);
task->key_len = key_len;
if (g_ttl_delete_tail) {
g_ttl_delete_tail->next = task;
} else {
g_ttl_delete_head = task;
}
g_ttl_delete_tail = task;
return 0;
}
int ttl_delete_drain(int budget) {
int drained = 0;
#if !ENABLE_RBTREE
(void)budget;
return 0;
#else
while (budget > 0 && g_ttl_delete_head) {
ttl_delete_task_t *task = g_ttl_delete_head;
rbtree_node *node;
g_ttl_delete_head = task->next;
if (!g_ttl_delete_head) {
g_ttl_delete_tail = NULL;
}
node = rbtree_search(&global_rbtree, task->key, task->key_len);
if (node != global_rbtree.nil &&
node->expire_at_ms != 0 &&
node->expire_at_ms <= kvs_now_ms()) {
(void)kvs_rbtree_del(&global_rbtree, task->key, task->key_len);
}
kvs_free(task->key);
kvs_free(task);
drained++;
budget--;
}
return drained;
#endif
}
static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) {
size_t len = 0;
@@ -138,11 +215,12 @@ static int is_update_cmd(const resp_cmd_t *cmd) {
return 0;
}
c0 = &cmd->argv[0];
return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 ||
c0 = &cmd->argv[0];
return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "SETEX") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 ||
@@ -420,11 +498,13 @@ int init_config(AppConfig *cfg){
printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file);
printf("|—— Persist-hash : %s\n", cfg->hash_file);
printf("Log level : %s\n", log_level_to_string(cfg->log_level));
printf("Memory : \n");
printf("|——Allocator : %s\n", allocator_to_string(cfg->allocator));
printf("|——MemLeakDetectMode : %s\n", leakage_to_string(cfg->leak_mode));
printf("=============== Config ===============\n");
printf("Log level : %s\n", log_level_to_string(cfg->log_level));
printf("Memory : \n");
printf("|——Allocator : %s\n", allocator_to_string(cfg->allocator));
printf("|——MemLeakDetectMode : %s\n", leakage_to_string(cfg->leak_mode));
printf("Redis-Compat\t\t: \n");
printf("|——Auth Password\t: %s\n", cfg->redis_auth_password[0] ? "[configured]" : "[empty]");
printf("=============== Config ===============\n");
xmlCleanupParser();
return 0;

View File

@@ -4,8 +4,9 @@
#ifndef __KV_STORE_H__
#define __KV_STORE_H__
#include "diskuring/diskuring.h"
#include <stdio.h>
#include "diskuring/diskuring.h"
#include "common/config.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
@@ -121,24 +122,26 @@ int kvs_array_exist(kvs_array_t *inst, char *key);
typedef uint8_t KEY_TYPE; // key
// 固定部分结构
typedef struct {
unsigned char color;
struct _rbtree_node *right;
struct _rbtree_node *left;
struct _rbtree_node *parent;
uint32_t key_len;
uint32_t value_len;
} rbtree_node_fixed;
typedef struct {
unsigned char color;
struct _rbtree_node *right;
struct _rbtree_node *left;
struct _rbtree_node *parent;
uint64_t expire_at_ms;
uint32_t key_len;
uint32_t value_len;
} rbtree_node_fixed;
// 完整节点结构(用于类型定义,实际内存大小由分配时确定)
typedef struct _rbtree_node {
unsigned char color;
struct _rbtree_node *right;
struct _rbtree_node *left;
struct _rbtree_node *parent;
uint32_t key_len;
uint32_t value_len;
// 动态数据key[key_len] + value[value_len]
struct _rbtree_node *right;
struct _rbtree_node *left;
struct _rbtree_node *parent;
uint64_t expire_at_ms;
uint32_t key_len;
uint32_t value_len;
// 动态数据key[key_len] + value[value_len]
// 不存储为结构体成员,通过指针运算访问
} rbtree_node;
@@ -149,13 +152,21 @@ typedef struct _rbtree {
typedef struct _rbtree kvs_rbtree_t;
int kvs_rbtree_create(kvs_rbtree_t *inst);
void kvs_rbtree_destroy(kvs_rbtree_t *inst);
int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len);
void* kvs_rbtree_get(kvs_rbtree_t *inst, const void *key, uint32_t key_len, uint32_t *out_valuelen);
int kvs_rbtree_del(rbtree *inst, const void *key, uint32_t key_len);
int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len);
int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len);
int kvs_rbtree_create(kvs_rbtree_t *inst);
void kvs_rbtree_destroy(kvs_rbtree_t *inst);
int kvs_rbtree_set(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len);
int kvs_rbtree_set_ex(kvs_rbtree_t *inst, const void *key, uint32_t key_len,
const void *value, uint32_t value_len, uint64_t expire_at_ms);
void* kvs_rbtree_get(kvs_rbtree_t *inst, const void *key, uint32_t key_len, uint32_t *out_valuelen);
int kvs_rbtree_del(rbtree *inst, const void *key, uint32_t key_len);
int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len);
int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len);
rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen);
typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg);
rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen,
kvs_rbtree_visit_cb cb, void *arg);
const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node);
const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node);
int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename);
int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename);
@@ -292,13 +303,18 @@ extern kvs_array_t global_array;
extern kvs_rbtree_t global_rbtree;
#endif
#if ENABLE_HASH
extern kvs_hash_t global_hash;
#endif
#endif
#if ENABLE_HASH
extern kvs_hash_t global_hash;
#endif
extern AppConfig global_cfg;
uint64_t kvs_now_ms(void);
int ttl_delete_schedule(const void *key, uint32_t key_len);
int ttl_delete_drain(int budget);
#endif

View File

@@ -30,6 +30,7 @@ typedef int (*msg_handler)(struct conn *conn);
static msg_handler kvs_handler;
extern iouring_ctx_t global_uring_ctx;
extern int ttl_delete_drain(int budget);
int kvs_request(struct conn *c) {
return kvs_handler ? kvs_handler(c) : -1;
@@ -603,6 +604,8 @@ int reactor_start(unsigned short port, msg_handler handler) {
conn_list[connfd].send_callback(connfd);
}
}
(void)ttl_delete_drain(8);
}
for (i = 0; i < listen_count; i++) {