diff --git a/.gitignore b/.gitignore index 704ab53..3b3abec 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.a /ebpf/libbpf-bootstrap +/doc /test-redis/results kvstore diff --git a/README.md b/README.md index d9f3c3d..c76f51b 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,5 @@ # 9.1 Kvstore -## 需求 -1. ntyco需要作为kvstore的submodule,通过git clone一次下载。 **完成**。 -2. README需要包含编译步骤,测试方案与可行性,性能数据。 **完成**。 -3. 全量持久化保存数据集。 **BUG FIX**。 -4. 持久化的性能数据。 **完成**。 -5. 特殊字符,可以解决redis的resp协议。 **完成**。 -6. 实现配置文件,把日志级别,端口ip,主从模式,持久化方案。 **完成**。 -7. 持久化落盘用io_uring,加载配置文件用mmap。 **完成**。 -8. 主从同步的性能,开启与关闭性能做到5%?。 -9. 主从同步600w条,出现的coredump。 **BUG FIX**。 -10. 主从同步用ebpf实现。 **完成**。 -11. 内存池测试qps与虚拟内存,物理内存。 **完成**。 -12. 实现一个内存泄露检测组件。 **完成**。 - - ## 环境安装与编译 ```shell # xml @@ -101,9 +86,9 @@ BATCH (N=9000000) --> time_used=10033 ms, qps=1794079 VIRT 208M RES 155M ``` -![alt text](image11.png) -![alt text](image12.png) -![alt text](image13.png) +![alt text](https://disk.0voice.com/p/wl) +![alt text](https://disk.0voice.com/p/wm) +![alt text](https://disk.0voice.com/p/wO) #### jemalloc ```shell @@ -122,9 +107,9 @@ BATCH (N=9000000) --> time_used=9353 ms, qps=1924516 VIRT 356M RES 119M ``` -![alt text](image11.png) -![alt text](image22.png) -![alt text](image23.png) +![alt text](https://disk.0voice.com/p/wl) +![alt text](https://disk.0voice.com/p/wP) +![alt text](https://disk.0voice.com/p/wQ) #### mypool ```shell @@ -143,9 +128,9 @@ BATCH (N=3000000) --> time_used=3022 ms, qps=1985440 VIRT 122M RES 71492 ``` -![alt text](image31.png) -![alt text](image32.png) -![alt text](image33.png) +![alt text](https://disk.0voice.com/p/wR) +![alt text](https://disk.0voice.com/p/wn) +![alt text](https://disk.0voice.com/p/wo) ### 测试4:主从同步 测试条件: @@ -177,18 +162,56 @@ average qps:777838 ALL TESTS PASSED. ``` -### 面试题 -1. 为什么会实现kvstore,使用场景在哪里? -2. reactor, ntyco, io_uring的三种网络模型的性能差异? -3. 多线程的kvstore该如何改进? -4. 私有协议如何设计会更加安全可靠? -5. 协议改进以后,对已有的代码有哪些改变? -6. kv引擎实现了哪些? -7. 每个kv引擎的使用场景,以及性能差异? -8. 测试用例如何实现?并且保证代码覆盖率超过90% -9. 网络并发量如何?qps如何? -10. 能够跟哪些系统交互使用? +## REDIS 对比测试 +### 数据口径(2026-03-06 09:00:30) +- 轮次:3 轮(取均值) +- 参数:requests=1000000 pipeline=128 keyspace=1000000 value-size=32 +- 数据来源:`test-redis/results/hash_bench_summary_20260306_090030.csv` +### 仅看 mypool 与 Redis + +| 系统 | 场景 | set 均值QPS | set 均值us/op | get 均值QPS | get 均值us/op | +|---|---|---:|---:|---:|---:| +| kvstore (mypool) | none | 165554.00 | 6.04 | 169509.33 | 5.91 | +| kvstore (mypool) | incremental(oplog_sync=none) | 168248.67 | 5.96 | 181076.67 | 5.53 | +| kvstore (mypool) | incremental(oplog_sync=every_sec) | 164801.67 | 6.08 | 178469.33 | 5.60 | +| redis | none | 221558.00 | 4.52 | 254807.33 | 3.92 | +| redis | aof_no | 177826.67 | 5.63 | 256099.00 | 3.91 | +| redis | aof_everysec | 179159.67 | 5.59 | 243906.00 | 4.11 | +| redis | aof_always | 66807.33 | 14.97 | 236824.67 | 4.23 | + +### oplog 与 AOF 安全性级别 +1. `oplog_sync=none` 时,kvstore 仅异步写入,不做周期 fsync,安全级别仍接近 Redis `aof_no`。 +2. `oplog_sync=every_sec` 时,kvstore 每秒执行一次“flush + 每个 uring worker 提交 fsync(drain) 并等待回调”,可提供接近 Redis `aof_everysec` 的周期性落盘保证(仍可能丢失最近 1 秒窗口)。 +3. 与 Redis 相比,当前 kvstore 仍缺少 AOF 尾部校验/截断等恢复增强机制,崩溃恢复鲁棒性上略弱于 Redis AOF 体系。 + +### 落盘性能退化(写路径,set) +1. kvstore(mypool):`none 165554.00 -> incremental(oplog_sync=none) 168248.67`,变化 `+1.63%`(本轮无退化,属抖动范围)。 +2. kvstore(mypool):`none 165554.00 -> incremental(oplog_sync=every_sec) 164801.67`,退化 `0.45%`。 +3. redis:`none 221558.00 -> aof_no 177826.67`,退化 `19.74%`。 +4. redis:`none 221558.00 -> aof_everysec 179159.67`,退化 `19.14%`。 +5. redis:`none 221558.00 -> aof_always 66807.33`,退化 `69.85%`。 +6. 对应 set 平均时延(us/op)变化:kvstore every_sec `6.04 -> 6.08`(`+0.61%`),redis aof_no `4.52 -> 5.63`(`+24.72%`),redis aof_everysec `4.52 -> 5.59`(`+23.69%`),redis aof_always `4.52 -> 14.97`(`+231.51%`)。 + +结论:本轮下 kvstore(mypool) 的 `every_sec` 持久化性能代价远低于 Redis AOF 系列,同时安全性目标已从 `aof_no` 抬升到接近 `aof_everysec` 的级别。 + +## 调用开销 +### gprof Flat Profile(Top 12,按 self time) + +| 排名 | 函数 | self time % | self seconds | calls | +|---:|---|---:|---:|---:| +| 1 | `rbtree_node_get_key` | 58.34 | 0.56 | 103209091 | +| 2 | `rbtree_search` | 7.29 | 0.07 | 1874362 | +| 3 | `kvs_keycmp` | 5.73 | 0.06 | 74287566 | +| 4 | `ascii_casecmp` | 3.13 | 0.03 | 21490996 | +| 5 | `task_init` | 3.13 | 0.03 | 1397974 | +| 6 | `mp_page_create` | 3.13 | 0.03 | 23556 | +| 7 | `need` | 2.08 | 0.02 | 14042759 | +| 8 | `parse_i64` | 2.08 | 0.02 | 7029783 | +| 9 | `mp_page_alloc` | 2.08 | 0.02 | 5122487 | +| 10 | `rbtree_node_size` | 2.08 | 0.02 | 1860739 | +| 11 | `submit_write` | 2.08 | 0.02 | 1394599 | +| 12 | `rbtree_insert` | 2.08 | 0.02 | 926531 | ## 项目收获 #### reactor网络模型,用户态网络缓冲区的写法。 @@ -356,4 +379,4 @@ TCP协议栈是分界点, 7. 每线程独立内存池,相对malloc更少的锁竞争。 8. 大块分配自动退化为 malloc 处理。 - 相比 ptmalloc,该设计消除了外部碎片,降低了系统调用次数,并在多线程场景下显著提升分配性能与内存利用率。 \ No newline at end of file + 相比 ptmalloc,该设计消除了外部碎片,降低了系统调用次数,并在多线程场景下显著提升分配性能与内存利用率。 diff --git a/common/config.c b/common/config.c index 2ecedd1..31629e9 100644 --- a/common/config.c +++ b/common/config.c @@ -56,6 +56,7 @@ static void set_default_config(AppConfig *cfg) cfg->mode = MODE_MASTER; cfg->master_port = 8888; cfg->persistence = PERSIST_NONE; + cfg->oplog_sync_mode = OPLOG_SYNC_NONE; cfg->allocator = ALLOC_JEMALLOC; cfg->leak_mode = MEMLEAK_DETECT_OFF; cfg->replica_mode = REPLICA_DISABLE; @@ -87,6 +88,16 @@ static void parse_persistence(const char *s, PersistenceType *out) else if (!strcasecmp(s, "none")) *out = PERSIST_NONE; } +static void parse_oplog_sync_mode(const char *s, OplogSyncMode *out) +{ + if (!s || !out) return; + if (!strcasecmp(s, "every_sec") || !strcasecmp(s, "everysec")) { + *out = OPLOG_SYNC_EVERY_SEC; + } else if (!strcasecmp(s, "none")) { + *out = OPLOG_SYNC_NONE; + } +} + static void parse_allocator(const char *s, AllocatorType *out) { if (!s || !out) return; @@ -188,6 +199,15 @@ const char *persistence_to_string(PersistenceType p) } } +const char *oplog_sync_mode_to_string(OplogSyncMode m) +{ + switch (m) { + case OPLOG_SYNC_NONE: return "none"; + case OPLOG_SYNC_EVERY_SEC: return "every_sec"; + default: return "unknown"; + } +} + const char *allocator_to_string(AllocatorType a) { switch (a) { @@ -337,6 +357,15 @@ void persist_load(xmlNodePtr *root, AppConfig *out_cfg){ } } + xmlNodePtr oplog_sync_node = find_child(pers, "oplog_sync"); + if (oplog_sync_node) { + xmlChar *txt = xmlNodeGetContent(oplog_sync_node); + if (txt) { + parse_oplog_sync_mode((char *)txt, &out_cfg->oplog_sync_mode); + xmlFree(txt); + } + } + xmlNodePtr array_node = find_child(pers, "array"); if (array_node) { xmlChar *txt = xmlNodeGetContent(array_node); diff --git a/common/config.h b/common/config.h index 8a7a069..d71a2ed 100644 --- a/common/config.h +++ b/common/config.h @@ -20,6 +20,11 @@ typedef enum { PERSIST_NONE } PersistenceType; +typedef enum { + OPLOG_SYNC_NONE, + OPLOG_SYNC_EVERY_SEC +} OplogSyncMode; + typedef enum { REPLICA_DISABLE, REPLICA_ENABLE @@ -53,6 +58,7 @@ typedef struct { char array_file[256]; char rbtree_file[256]; char hash_file[256]; + OplogSyncMode oplog_sync_mode; AllocatorType allocator; MemLeakDetectMode leak_mode; @@ -68,6 +74,7 @@ int config_load(const char *filename, AppConfig *out_cfg); const char *log_level_to_string(LogLevel lvl); const char *server_mode_to_string(ServerMode mode); const char *persistence_to_string(PersistenceType p); +const char *oplog_sync_mode_to_string(OplogSyncMode m); const char *allocator_to_string(AllocatorType a); const char *leakage_to_string(MemLeakDetectMode a); const char *replica_to_string(ReplicaMode a); diff --git a/config/config.xml b/config/config.xml index acdb818..355b40e 100644 --- a/config/config.xml +++ b/config/config.xml @@ -1,11 +1,11 @@ - + - 127.0.0.1 + 192.168.220.134 8888 - master + master - + disable 192.168.220.134 @@ -14,21 +14,22 @@ - INFO + INFO - incremental - data/persist_mypool_20260305_072256 + none + data kvs_oplog.db + none kvs_array.db kvs_rbtree.db kvs_hash.db - mypool - disable + mypool + disable - \ No newline at end of file + diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index 513e102..67f2e50 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -284,7 +284,16 @@ static void *worker_main(void *arg) { break; } - io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); + if (t->op == TASK_WRITE) { + io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); + } else if (t->op == TASK_FSYNC) { + io_uring_prep_fsync(sqe, t->fd, t->fsync_flags); + } else { + task_finish(t, -EINVAL); + destroy_queue_push(w->parent, t); + continue; + } + sqe->flags |= (unsigned char)t->sqe_flags; sqe->user_data = (uint64_t)(uintptr_t)t; prepared++; } @@ -327,8 +336,18 @@ void task_init(task_t *t) { if (!t) { return; } + t->op = TASK_WRITE; + t->fd = -1; + t->off = 0; + t->fsync_flags = 0; + t->sqe_flags = 0; t->done = 0; t->res = 0; + t->iovs = NULL; + t->iovcnt = 0; + t->free_iov_bases = 1; + t->on_destroy = NULL; + t->on_destroy_arg = NULL; t->next = NULL; } @@ -355,10 +374,16 @@ void task_destroy(task_t *t) { return; } + if (t->on_destroy) { + t->on_destroy(t, t->on_destroy_arg); + } + if (t->iovs) { - for (int i = 0; i < t->iovcnt; i++) { - if (t->iovs[i].iov_base) { - kvs_free(t->iovs[i].iov_base); + if (t->free_iov_bases) { + for (int i = 0; i < t->iovcnt; i++) { + if (t->iovs[i].iov_base) { + kvs_free(t->iovs[i].iov_base); + } } } kvs_free(t->iovs); @@ -678,6 +703,33 @@ static int queue_task_with_backpressure(iouring_ctx_t *ctx, task_t *t) { return -1; } +static int queue_task_to_worker_with_backpressure(iouring_ctx_t *ctx, task_t *t, int worker_id) { + iouring_worker_t *w; + + if (!ctx || !ctx->workers || !t) { + return -1; + } + if (worker_id < 0 || worker_id >= ctx->worker_nr) { + return -1; + } + + w = &ctx->workers[worker_id]; + while (atomic_load_explicit(&ctx->stop, memory_order_acquire) == 0) { + int need_notify = 0; + if (spsc_try_push(&w->submit_q, t, &need_notify) == 0) { + if (need_notify) { + worker_notify(w); + } + return 0; + } + + cleanup_finished_iouring_tasks(ctx); + sched_yield(); + } + + return -1; +} + task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off) { task_t *t; size_t total = 0; @@ -782,6 +834,92 @@ task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int return t; } +task_t *submit_write_ref(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off, + int free_iov_bases, task_destroy_cb_t on_destroy, void *on_destroy_arg) { + task_t *t; + size_t total = 0; + + if (!ctx || !ctx->workers || !bufs || !lens || count <= 0) { + return NULL; + } + + t = (task_t *)kvs_malloc(sizeof(task_t)); + if (!t) { + return NULL; + } + task_init(t); + t->op = TASK_WRITE; + t->fd = fd; + t->off = off; + t->iovcnt = count; + t->free_iov_bases = free_iov_bases ? 1 : 0; + t->on_destroy = on_destroy; + t->on_destroy_arg = on_destroy_arg; + t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * (size_t)count); + if (!t->iovs) { + kvs_free(t); + return NULL; + } + + for (int i = 0; i < count; ++i) { + if (!bufs[i] || lens[i] == 0) { + task_destroy(t); + return NULL; + } + if (lens[i] > SIZE_MAX - total) { + task_destroy(t); + return NULL; + } + t->iovs[i].iov_base = bufs[i]; + t->iovs[i].iov_len = lens[i]; + total += lens[i]; + } + + if (total == 0) { + task_destroy(t); + return NULL; + } + + if (queue_task_with_backpressure(ctx, t) != 0) { + task_destroy(t); + return NULL; + } + + return t; +} + +task_t *submit_fsync_ref(iouring_ctx_t *ctx, int fd, int worker_id, int drain, + task_destroy_cb_t on_destroy, void *on_destroy_arg) { + task_t *t; + + if (!ctx || !ctx->workers || fd < 0) { + return NULL; + } + if (worker_id < 0 || worker_id >= ctx->worker_nr) { + return NULL; + } + + t = (task_t *)kvs_malloc(sizeof(task_t)); + if (!t) { + return NULL; + } + + task_init(t); + t->op = TASK_FSYNC; + t->fd = fd; + t->fsync_flags = 0; + t->sqe_flags = drain ? IOSQE_IO_DRAIN : 0; + t->on_destroy = on_destroy; + t->on_destroy_arg = on_destroy_arg; + + if (queue_task_to_worker_with_backpressure(ctx, t, worker_id) != 0) { + task_destroy(t); + return NULL; + } + + return t; +} + int uring_task_complete(iouring_ctx_t *ctx) { if (!ctx || !ctx->workers) { return 1; diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h index 0c8c9e7..a884f0f 100644 --- a/diskuring/diskuring.h +++ b/diskuring/diskuring.h @@ -11,18 +11,26 @@ #include #include -typedef enum { TASK_READ, TASK_WRITE } task_op_t; +typedef enum { TASK_READ, TASK_WRITE, TASK_FSYNC } task_op_t; + +struct task; +typedef void (*task_destroy_cb_t)(struct task *t, void *arg); typedef struct task { task_op_t op; int fd; off_t off; + unsigned fsync_flags; + unsigned sqe_flags; int res; _Atomic int done; struct iovec *iovs; int iovcnt; + int free_iov_bases; + task_destroy_cb_t on_destroy; + void *on_destroy_arg; struct task *next; } task_t; @@ -73,6 +81,10 @@ int iouring_init(iouring_ctx_t *ctx, unsigned entries); void iouring_shutdown(iouring_ctx_t *ctx); task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); +task_t *submit_write_ref(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off, + int free_iov_bases, task_destroy_cb_t on_destroy, void *on_destroy_arg); +task_t *submit_fsync_ref(iouring_ctx_t *ctx, int fd, int worker_id, int drain, + task_destroy_cb_t on_destroy, void *on_destroy_arg); int uring_task_complete(iouring_ctx_t *ctx); void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx); void iouring_profile_dump(iouring_ctx_t *ctx); diff --git a/dump/kvs_dump.h b/dump/kvs_dump.h index 58fffe8..e0d53df 100644 --- a/dump/kvs_dump.h +++ b/dump/kvs_dump.h @@ -18,9 +18,16 @@ extern int global_oplog_fd; int init_cmd_log(const char *file, int *logfd); int destroy_cmd_log(int logfd); +enum { + KVS_OPLOG_BUF_NOT_FULL = 0, + KVS_OPLOG_BUF_FULL = 1 +}; + +int kvs_oplog_buffer_append(const uint8_t *cmd, size_t len, int logfd); +int kvs_oplog_flush(int logfd, int force); int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd); int kvs_replay_log(int logfd); int ksv_clear_log(int logfd); -#endif \ No newline at end of file +#endif diff --git a/dump/kvs_oplog.c b/dump/kvs_oplog.c index a89017e..c698c19 100644 --- a/dump/kvs_oplog.c +++ b/dump/kvs_oplog.c @@ -3,58 +3,557 @@ #include "memory/alloc_dispatch.h" #include "kvs_protocol_resp.h" #include "diskuring/diskuring.h" +#include "common/config.h" #include #include +#include +#include +#include #include int global_oplog_fd = -1; static off_t g_log_off = -1; +extern AppConfig global_cfg; + +#define KVS_OPLOG_PAGE_SIZE (64u * 1024u) + +typedef struct oplog_buf { + struct oplog_buf *next; + off_t off; + size_t used; + uint8_t data[KVS_OPLOG_PAGE_SIZE]; +} oplog_buf_t; + +static oplog_buf_t *g_oplog_idle_head = NULL; +static oplog_buf_t *g_oplog_idle_tail = NULL; +static oplog_buf_t *g_oplog_ready_head = NULL; +static oplog_buf_t *g_oplog_ready_tail = NULL; +static oplog_buf_t *g_oplog_cur = NULL; + +static pthread_mutex_t g_oplog_mu = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t g_sync_cv = PTHREAD_COND_INITIALIZER; +static pthread_t g_sync_th; +static int g_sync_started = 0; +static int g_sync_stop = 0; +static int g_sync_logfd = -1; +static uint64_t g_sync_gen = 0; +static uint64_t g_sync_synced_gen = 0; + +static inline void oplog_mark_dirty_locked(void) { + g_sync_gen++; + pthread_cond_signal(&g_sync_cv); +} + +typedef struct { + pthread_mutex_t mu; + pthread_cond_t cv; + int pending; + int submit_failed; + int fsync_err; +} oplog_fsync_wait_t; + +typedef struct { + oplog_fsync_wait_t *waiter; +} oplog_fsync_arg_t; + +static void make_timeout_ms(struct timespec *ts, long ms) { + if (!ts) { + return; + } + clock_gettime(CLOCK_REALTIME, ts); + ts->tv_sec += ms / 1000; + ts->tv_nsec += (ms % 1000) * 1000000L; + if (ts->tv_nsec >= 1000000000L) { + ts->tv_sec += 1; + ts->tv_nsec -= 1000000000L; + } +} + +static void oplog_fsync_done(task_t *t, void *arg) { + oplog_fsync_arg_t *a = (oplog_fsync_arg_t *)arg; + oplog_fsync_wait_t *w; + + if (!a || !a->waiter) { + return; + } + w = a->waiter; + + pthread_mutex_lock(&w->mu); + if (t && t->res < 0 && w->fsync_err == 0) { + w->fsync_err = t->res; + } + if (w->pending > 0) { + w->pending--; + } + if (w->pending == 0) { + pthread_cond_signal(&w->cv); + } + pthread_mutex_unlock(&w->mu); +} + +static int kvs_oplog_fsync_all_workers(int fd) { + int i; + int n; + int rc = 0; + oplog_fsync_wait_t w; + oplog_fsync_arg_t *args = NULL; + + if (fd < 0 || !global_uring_ctx.workers || global_uring_ctx.worker_nr <= 0) { + return -1; + } + + memset(&w, 0, sizeof(w)); + pthread_mutex_init(&w.mu, NULL); + pthread_cond_init(&w.cv, NULL); + + n = global_uring_ctx.worker_nr; + args = (oplog_fsync_arg_t *)kvs_malloc(sizeof(oplog_fsync_arg_t) * (size_t)n); + if (!args) { + pthread_cond_destroy(&w.cv); + pthread_mutex_destroy(&w.mu); + return -1; + } + + for (i = 0; i < n; i++) { + task_t *t; + args[i].waiter = &w; + + pthread_mutex_lock(&w.mu); + w.pending++; + pthread_mutex_unlock(&w.mu); + + t = submit_fsync_ref(&global_uring_ctx, fd, i, 1, oplog_fsync_done, &args[i]); + if (!t) { + pthread_mutex_lock(&w.mu); + w.pending--; + w.submit_failed = 1; + if (w.pending == 0) { + pthread_cond_signal(&w.cv); + } + pthread_mutex_unlock(&w.mu); + rc = -1; + break; + } + } + + pthread_mutex_lock(&w.mu); + while (w.pending > 0) { + struct timespec ts; + make_timeout_ms(&ts, 10); + (void)pthread_cond_timedwait(&w.cv, &w.mu, &ts); + if (w.pending > 0) { + pthread_mutex_unlock(&w.mu); + cleanup_finished_iouring_tasks(&global_uring_ctx); + pthread_mutex_lock(&w.mu); + } + } + + if (w.fsync_err < 0 || w.submit_failed) { + rc = -1; + } + pthread_mutex_unlock(&w.mu); + + cleanup_finished_iouring_tasks(&global_uring_ctx); + kvs_free(args); + pthread_cond_destroy(&w.cv); + pthread_mutex_destroy(&w.mu); + return rc; +} + +static void oplog_push_tail(oplog_buf_t **head, oplog_buf_t **tail, oplog_buf_t *buf) { + if (!head || !tail || !buf) { + return; + } + buf->next = NULL; + if (!*tail) { + *head = *tail = buf; + return; + } + (*tail)->next = buf; + *tail = buf; +} + +static void oplog_push_front(oplog_buf_t **head, oplog_buf_t **tail, oplog_buf_t *buf) { + if (!head || !tail || !buf) { + return; + } + if (!*head) { + buf->next = NULL; + *head = *tail = buf; + return; + } + buf->next = *head; + *head = buf; +} + +static oplog_buf_t *oplog_pop_head(oplog_buf_t **head, oplog_buf_t **tail) { + oplog_buf_t *buf; + if (!head || !tail || !*head) { + return NULL; + } + buf = *head; + *head = buf->next; + if (!*head) { + *tail = NULL; + } + buf->next = NULL; + return buf; +} + +static oplog_buf_t *oplog_alloc_buf(void) { + oplog_buf_t *buf = (oplog_buf_t *)kvs_malloc(sizeof(oplog_buf_t)); + if (!buf) { + return NULL; + } + buf->next = NULL; + buf->off = 0; + buf->used = 0; + return buf; +} + +static oplog_buf_t *oplog_borrow_buf(void) { + oplog_buf_t *buf = oplog_pop_head(&g_oplog_idle_head, &g_oplog_idle_tail); + if (buf) { + buf->off = 0; + buf->used = 0; + return buf; + } + return oplog_alloc_buf(); +} + +static void oplog_free_list(oplog_buf_t **head, oplog_buf_t **tail) { + oplog_buf_t *buf; + if (!head || !tail) { + return; + } + while ((buf = oplog_pop_head(head, tail)) != NULL) { + kvs_free(buf); + } +} + +static void oplog_pool_release_all(void) { + if (g_oplog_cur) { + kvs_free(g_oplog_cur); + g_oplog_cur = NULL; + } + oplog_free_list(&g_oplog_idle_head, &g_oplog_idle_tail); + oplog_free_list(&g_oplog_ready_head, &g_oplog_ready_tail); +} + +static void oplog_recycle_done(task_t *t, void *arg) { + oplog_buf_t *buf = (oplog_buf_t *)arg; + (void)t; + if (!buf) { + return; + } + pthread_mutex_lock(&g_oplog_mu); + buf->off = 0; + buf->used = 0; + oplog_push_tail(&g_oplog_idle_head, &g_oplog_idle_tail, buf); + pthread_mutex_unlock(&g_oplog_mu); +} + +static int kvs_oplog_submit_ready_buf(oplog_buf_t *buf, int logfd) { + void *bufs[1]; + size_t lens[1]; + task_t *t; + + if (!buf || logfd < 0) { + return -1; + } + if (buf->used == 0) { + oplog_push_tail(&g_oplog_idle_head, &g_oplog_idle_tail, buf); + return 0; + } + + bufs[0] = (void *)buf->data; + lens[0] = buf->used; + t = submit_write_ref(&global_uring_ctx, logfd, bufs, lens, 1, buf->off, 0, + oplog_recycle_done, buf); + if (!t) { + return -1; + } + return 0; +} + +static int kvs_oplog_flush_internal(int logfd, int force) { + oplog_buf_t *buf; + if (logfd < 0) { + return -1; + } + + if (force && g_oplog_cur && g_oplog_cur->used > 0) { + oplog_push_tail(&g_oplog_ready_head, &g_oplog_ready_tail, g_oplog_cur); + g_oplog_cur = NULL; + } + + while ((buf = oplog_pop_head(&g_oplog_ready_head, &g_oplog_ready_tail)) != NULL) { + if (kvs_oplog_submit_ready_buf(buf, logfd) < 0) { + oplog_push_front(&g_oplog_ready_head, &g_oplog_ready_tail, buf); + return -1; + } + } + return 0; +} + +static void make_timeout_1s(struct timespec *ts) { + if (!ts) { + return; + } + clock_gettime(CLOCK_REALTIME, ts); + ts->tv_sec += 1; +} + +static void *oplog_sync_main(void *arg) { + (void)arg; + while (1) { + uint64_t target_gen = 0; + int fd = -1; + int flush_ok = 0; + struct timespec ts; + + make_timeout_1s(&ts); + pthread_mutex_lock(&g_oplog_mu); + (void)pthread_cond_timedwait(&g_sync_cv, &g_oplog_mu, &ts); + + if (g_sync_stop) { + pthread_mutex_unlock(&g_oplog_mu); + break; + } + + if (global_cfg.oplog_sync_mode != OPLOG_SYNC_EVERY_SEC || + g_sync_logfd < 0 || g_sync_synced_gen >= g_sync_gen) { + pthread_mutex_unlock(&g_oplog_mu); + continue; + } + + target_gen = g_sync_gen; + fd = g_sync_logfd; + flush_ok = (kvs_oplog_flush_internal(fd, 1) == 0); + pthread_mutex_unlock(&g_oplog_mu); + + if (!flush_ok) { + continue; + } + + if (kvs_oplog_fsync_all_workers(fd) == 0) { + pthread_mutex_lock(&g_oplog_mu); + if (g_sync_synced_gen < target_gen) { + g_sync_synced_gen = target_gen; + } + pthread_mutex_unlock(&g_oplog_mu); + } + } + return NULL; +} + +static void oplog_sync_thread_stop_locked(void) { + int need_join = g_sync_started; + + if (!need_join) { + return; + } + + g_sync_stop = 1; + pthread_cond_broadcast(&g_sync_cv); + pthread_mutex_unlock(&g_oplog_mu); + pthread_join(g_sync_th, NULL); + pthread_mutex_lock(&g_oplog_mu); + g_sync_started = 0; + g_sync_logfd = -1; +} + +static int oplog_sync_thread_start_locked(int logfd) { + if (g_sync_started) { + return 0; + } + g_sync_stop = 0; + g_sync_logfd = logfd; + if (pthread_create(&g_sync_th, NULL, oplog_sync_main, NULL) != 0) { + g_sync_logfd = -1; + return -1; + } + g_sync_started = 1; + return 0; +} + +static int kvs_oplog_append_direct(const uint8_t *cmd, size_t len, int logfd) { + uint32_t nlen; + void *bufs[2]; + size_t lens[2]; + size_t total; + off_t myoff; + task_t *t; + + nlen = htonl((uint32_t)len); + bufs[0] = (void *)&nlen; + lens[0] = sizeof(nlen); + bufs[1] = (void *)cmd; + lens[1] = len; + total = sizeof(nlen) + len; + + myoff = g_log_off; + g_log_off += (off_t)total; + t = submit_write(&global_uring_ctx, logfd, bufs, lens, 2, myoff); + if (!t) { + return -1; + } + oplog_mark_dirty_locked(); + return 0; +} int init_cmd_log(const char *file, int *logfd){ + int rc = 0; if(!file) return -1; int fd = open(file, O_RDWR | O_CREAT , 0644); if(fd < 0) return -2; + off_t off = lseek(fd, 0, SEEK_END); + if (off < 0) { + close(fd); + return -2; + } - g_log_off = lseek(fd, 0, SEEK_END); + pthread_mutex_lock(&g_oplog_mu); + oplog_sync_thread_stop_locked(); + g_log_off = off; + g_sync_gen = 0; + g_sync_synced_gen = 0; + g_sync_logfd = fd; + oplog_pool_release_all(); + if (global_cfg.oplog_sync_mode == OPLOG_SYNC_EVERY_SEC) { + rc = oplog_sync_thread_start_locked(fd); + } + pthread_mutex_unlock(&g_oplog_mu); + if (rc != 0) { + close(fd); + return -3; + } *logfd = fd; return 0; } int destroy_cmd_log(int logfd){ - fsync(logfd); + if (logfd < 0) { + return -1; + } + + pthread_mutex_lock(&g_oplog_mu); + oplog_sync_thread_stop_locked(); + if (kvs_oplog_flush_internal(logfd, 1) < 0) { + pthread_mutex_unlock(&g_oplog_mu); + return -2; + } + pthread_mutex_unlock(&g_oplog_mu); + + if (kvs_oplog_fsync_all_workers(logfd) < 0) { + return -3; + } + cleanup_finished_iouring_tasks(&global_uring_ctx); close(logfd); + + pthread_mutex_lock(&g_oplog_mu); + oplog_pool_release_all(); + g_log_off = -1; + g_sync_gen = 0; + g_sync_synced_gen = 0; + g_sync_logfd = -1; + pthread_mutex_unlock(&g_oplog_mu); global_oplog_fd = -1; return 0; } +int kvs_oplog_buffer_append(const uint8_t *cmd, size_t len, int logfd){ + if (logfd < 0 || !cmd || len == 0) return -1; + if (len > UINT32_MAX) return -2; + pthread_mutex_lock(&g_oplog_mu); + if (g_log_off < 0) { + pthread_mutex_unlock(&g_oplog_mu); + return -3; + } + + { + size_t need = sizeof(uint32_t) + len; + int became_full = 0; + uint32_t nlen = htonl((uint32_t)len); + + if (need > KVS_OPLOG_PAGE_SIZE) { + int rc = kvs_oplog_append_direct(cmd, len, logfd); + pthread_mutex_unlock(&g_oplog_mu); + return (rc == 0) ? KVS_OPLOG_BUF_NOT_FULL : -4; + } + + if (!g_oplog_cur) { + g_oplog_cur = oplog_borrow_buf(); + if (!g_oplog_cur) { + pthread_mutex_unlock(&g_oplog_mu); + return -4; + } + g_oplog_cur->off = g_log_off; + } + + if (g_oplog_cur->used + need > KVS_OPLOG_PAGE_SIZE) { + if (g_oplog_cur->used > 0) { + oplog_push_tail(&g_oplog_ready_head, &g_oplog_ready_tail, g_oplog_cur); + became_full = 1; + g_oplog_cur = NULL; + } + + g_oplog_cur = oplog_borrow_buf(); + if (!g_oplog_cur) { + pthread_mutex_unlock(&g_oplog_mu); + return -4; + } + g_oplog_cur->off = g_log_off; + } + + memcpy(g_oplog_cur->data + g_oplog_cur->used, &nlen, sizeof(nlen)); + g_oplog_cur->used += sizeof(nlen); + memcpy(g_oplog_cur->data + g_oplog_cur->used, cmd, len); + g_oplog_cur->used += len; + g_log_off += (off_t)need; + oplog_mark_dirty_locked(); + + if (g_oplog_cur->used == KVS_OPLOG_PAGE_SIZE) { + oplog_push_tail(&g_oplog_ready_head, &g_oplog_ready_tail, g_oplog_cur); + g_oplog_cur = NULL; + became_full = 1; + } + + pthread_mutex_unlock(&g_oplog_mu); + return became_full ? KVS_OPLOG_BUF_FULL : KVS_OPLOG_BUF_NOT_FULL; + } +} + +int kvs_oplog_flush(int logfd, int force) { + int rc; + pthread_mutex_lock(&g_oplog_mu); + rc = kvs_oplog_flush_internal(logfd, force); + pthread_mutex_unlock(&g_oplog_mu); + if (rc < 0) { + return -1; + } + return 0; +} + int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd){ if (logfd < 0 || !cmd || len == 0) return -1; if (len > UINT32_MAX) return -2; - - - uint32_t nlen = htonl((uint32_t)len); - - void *bufs[2]; - size_t lens[2]; - - bufs[0] = (void *)&nlen; - lens[0] = sizeof(nlen); - - bufs[1] = (void *)cmd; - lens[1] = len; - - size_t total = sizeof(nlen) + len; - - off_t myoff = g_log_off; - g_log_off += (off_t)total; - - task_t *t = submit_write(&global_uring_ctx, logfd, bufs, lens, 2, myoff); - if (!t) { - return -4; + pthread_mutex_lock(&g_oplog_mu); + if (g_log_off < 0) { + pthread_mutex_unlock(&g_oplog_mu); + return -3; } - + if (kvs_oplog_flush_internal(logfd, 1) < 0) { + pthread_mutex_unlock(&g_oplog_mu); + return -4; + } + if (kvs_oplog_append_direct(cmd, len, logfd) < 0) { + pthread_mutex_unlock(&g_oplog_mu); + return -4; + } + pthread_mutex_unlock(&g_oplog_mu); return 0; } @@ -69,8 +568,8 @@ int kvs_replay_log(int logfd){ uint32_t nlen = 0; int hr = read_full(logfd, &nlen, sizeof(nlen)); - if (hr == 0) break; /* EOF:正常结束 */ - if (hr < 0) { return -2; } /* 半截头 */ + if (hr == 0) break; + if (hr < 0) { return -2; } uint32_t len = ntohl(nlen); if (len == 0) { return -3; } @@ -79,24 +578,21 @@ int kvs_replay_log(int logfd){ if (!cmd_bytes ) { return -5; } int pr = read_full(logfd, cmd_bytes, len); - if (pr <= 0) { /* 半截 payload */ + if (pr <= 0) { kvs_free(cmd_bytes ); return -6; } - /* -------- RESP parse -------- */ resp_cmd_t cmd; memset(&cmd, 0, sizeof(cmd)); int clen = resp_parse_one_cmd(cmd_bytes, (int)len, &cmd); if (clen <= 0 || clen != (int)len) { - /* clen==0: need more data,但日志记录必须是一条完整命令,所以视为坏日志 */ kvs_free(cmd_bytes); return -7; } - /* -------- execute -------- */ resp_value_t outvalue; memset(&outvalue, 0, sizeof(outvalue)); @@ -106,14 +602,15 @@ int kvs_replay_log(int logfd){ return -8; } - /* 注意: - * outv 可能引用存储内存,但我们不 build response,因此无需处理。 - * cmd_bytes 可以释放,因为 cmd slice 指向 cmd_bytes,仅在 dispatch 期间使用。 - * */ kvs_free(cmd_bytes); } + pthread_mutex_lock(&g_oplog_mu); g_log_off = lseek(logfd, 0, SEEK_CUR); + if (g_sync_synced_gen < g_sync_gen) { + g_sync_synced_gen = g_sync_gen; + } + pthread_mutex_unlock(&g_oplog_mu); return 0; } @@ -123,8 +620,24 @@ int kvs_replay_log(int logfd){ */ int ksv_clear_log(int logfd){ if(logfd < 0) return -1; + + pthread_mutex_lock(&g_oplog_mu); + if (kvs_oplog_flush_internal(logfd, 1) < 0) { + pthread_mutex_unlock(&g_oplog_mu); + return -2; + } + pthread_mutex_unlock(&g_oplog_mu); + if (kvs_oplog_fsync_all_workers(logfd) < 0) { + return -3; + } + cleanup_finished_iouring_tasks(&global_uring_ctx); ftruncate(logfd, 0); lseek(logfd, 0, SEEK_SET); + + pthread_mutex_lock(&g_oplog_mu); g_log_off = 0; + g_sync_gen = 0; + g_sync_synced_gen = 0; + pthread_mutex_unlock(&g_oplog_mu); return 0; -} \ No newline at end of file +} diff --git a/kvstore.c b/kvstore.c index 3fca3f0..9376ab7 100644 --- a/kvstore.c +++ b/kvstore.c @@ -14,12 +14,12 @@ #include #include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include #define TIME_COLLECT 0 @@ -41,147 +41,147 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ } -#include -#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) -#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) - -static int checked_size_add(size_t a, size_t b, size_t *out) { - if (!out || a > SIZE_MAX - b) { - return -1; - } - *out = a + b; - return 0; -} - -static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) { - size_t len = 0; - - if (!v || !out_len) { - return -1; - } - - switch (v->type) { - case RESP_T_SIMPLE_STR: - case RESP_T_ERROR: - if (checked_size_add(1, (size_t)v->bulk.len, &len) < 0 || - checked_size_add(len, 2, &len) < 0) { - return -1; - } - break; - - case RESP_T_INTEGER: { - char tmp[64]; - int n = snprintf(tmp, sizeof(tmp), "%lld", (long long)v->i64); - if (n <= 0) { - return -1; - } - if (checked_size_add(1, (size_t)n, &len) < 0 || - checked_size_add(len, 2, &len) < 0) { - return -1; - } - break; - } - - case RESP_T_NIL: - len = 5; /* "$-1\r\n" */ - break; - - case RESP_T_BULK_STR: { - char tmp[32]; - int n; - size_t t; - - if (v->bulk.len > 0 && !v->bulk.ptr) { - return -1; - } - - n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->bulk.len); - if (n <= 0) { - return -1; - } - - if (checked_size_add(1, (size_t)n, &t) < 0 || /* '$' + len digits */ - checked_size_add(t, 2, &t) < 0 || /* \r\n */ - checked_size_add(t, (size_t)v->bulk.len, &t) < 0 || - checked_size_add(t, 2, &len) < 0) { /* trailing \r\n */ - return -1; - } - break; - } - - default: - return -1; - } - - *out_len = len; - return 0; -} - -static int flush_pending_response(struct conn *conn, uint8_t *buf, size_t *out_len) { - if (!conn || !buf || !out_len) { - return -1; - } - if (*out_len == 0) { - return 0; - } - if (chain_buffer_append(&conn->wbuf, buf, *out_len) < 0) { - return -1; - } - *out_len = 0; - return 0; -} - -static int is_update_cmd(const resp_cmd_t *cmd) { - const resp_slice_t *c0; - - if (!cmd || cmd->argc == 0 || !cmd->argv[0].ptr || cmd->argv[0].len == 0) { - return 0; - } - - c0 = &cmd->argv[0]; - return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0; -} - -int kvs_protocol(struct conn* conn){ -#if TIME_COLLECT == 1 - struct timeval func_start; - gettimeofday(&func_start, NULL); - long total_oplog_us = 0; +#include +#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) +#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) + +static int checked_size_add(size_t a, size_t b, size_t *out) { + if (!out || a > SIZE_MAX - b) { + return -1; + } + *out = a + b; + return 0; +} + +static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) { + size_t len = 0; + + if (!v || !out_len) { + return -1; + } + + switch (v->type) { + case RESP_T_SIMPLE_STR: + case RESP_T_ERROR: + if (checked_size_add(1, (size_t)v->bulk.len, &len) < 0 || + checked_size_add(len, 2, &len) < 0) { + return -1; + } + break; + + case RESP_T_INTEGER: { + char tmp[64]; + int n = snprintf(tmp, sizeof(tmp), "%lld", (long long)v->i64); + if (n <= 0) { + return -1; + } + if (checked_size_add(1, (size_t)n, &len) < 0 || + checked_size_add(len, 2, &len) < 0) { + return -1; + } + break; + } + + case RESP_T_NIL: + len = 5; /* "$-1\r\n" */ + break; + + case RESP_T_BULK_STR: { + char tmp[32]; + int n; + size_t t; + + if (v->bulk.len > 0 && !v->bulk.ptr) { + return -1; + } + + n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->bulk.len); + if (n <= 0) { + return -1; + } + + if (checked_size_add(1, (size_t)n, &t) < 0 || /* '$' + len digits */ + checked_size_add(t, 2, &t) < 0 || /* \r\n */ + checked_size_add(t, (size_t)v->bulk.len, &t) < 0 || + checked_size_add(t, 2, &len) < 0) { /* trailing \r\n */ + return -1; + } + break; + } + + default: + return -1; + } + + *out_len = len; + return 0; +} + +static int flush_pending_response(struct conn *conn, uint8_t *buf, size_t *out_len) { + if (!conn || !buf || !out_len) { + return -1; + } + if (*out_len == 0) { + return 0; + } + if (chain_buffer_append(&conn->wbuf, buf, *out_len) < 0) { + return -1; + } + *out_len = 0; + return 0; +} + +static int is_update_cmd(const resp_cmd_t *cmd) { + const resp_slice_t *c0; + + if (!cmd || cmd->argc == 0 || !cmd->argv[0].ptr || cmd->argv[0].len == 0) { + return 0; + } + + c0 = &cmd->argv[0]; + return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0; +} + +int kvs_protocol(struct conn* conn){ +#if TIME_COLLECT == 1 + struct timeval func_start; + gettimeofday(&func_start, NULL); + long total_oplog_us = 0; #endif - if (!conn) return -1; - - size_t request_size = 0; - const uint8_t *request = chain_buffer_linearize(&conn->rbuf, &request_size); - if (!request || request_size == 0) return 0; - if (request_size > (size_t)INT_MAX) return -1; - - int request_length = (int)request_size; - uint8_t response[KVS_MAX_RESPONSE]; - - int consumed = 0; - size_t out_len = 0; - - while(consumed < request_length ){ - const uint8_t *p = request+consumed; - int remain = request_length - consumed; + if (!conn) return -1; + + size_t request_size = 0; + const uint8_t *request = chain_buffer_linearize(&conn->rbuf, &request_size); + if (!request || request_size == 0) return 0; + if (request_size > (size_t)INT_MAX) return -1; + + int request_length = (int)request_size; + uint8_t response[KVS_MAX_RESPONSE]; + + int consumed = 0; + size_t out_len = 0; + + while(consumed < request_length ){ + const uint8_t *p = request+consumed; + int remain = request_length - consumed; resp_cmd_t cmd; memset(&cmd, 0, sizeof(cmd)); - int len = resp_parse_one_cmd(p, remain, &cmd); - if(len < 0){ - /* 协议错误:直接返回 */ - return -1; - } + int len = resp_parse_one_cmd(p, remain, &cmd); + if(len < 0){ + /* 协议错误:直接返回 */ + return -1; + } else if(len == 0){ // 半包 break; @@ -192,11 +192,6 @@ int kvs_protocol(struct conn* conn){ int dr = resp_dispatch(&cmd, &val); - - // if(global_cfg.persistence == PERSIST_INCREMENTAL){ - // kvs_oplog_append(p, len, global_oplog_fd); - // } - /* * 语义建议: * - resp_dispatch() 即使返回 -1(比如 unknown command / wrong argc), @@ -207,69 +202,22 @@ int kvs_protocol(struct conn* conn){ struct timeval oplog_start, oplog_end; gettimeofday(&oplog_start, NULL); #endif - // if(dr < 0){ - // if (val.type != RESP_T_SIMPLE_STR && - // val.type != RESP_T_ERROR && - // val.type != RESP_T_INTEGER && - // val.type != RESP_T_BULK_STR && - // val.type != RESP_T_NIL) { - // val = resp_error("ERR dispatch failed"); - // } - // } else { - // // persist into oplog - // /* 执行成功:在这里保存到日志中(只记录更新类命令) */ - // if (cmd.argc > 0 && cmd.argv[0].ptr) { - // /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ - // const resp_slice_t *c0 = &cmd.argv[0]; - // int is_update = 0; - // if (c0->ptr && c0->len) { - // if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || - // ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { - // is_update = 1; - // } - // } - // if (is_update) { - // if(global_cfg.persistence == PERSIST_INCREMENTAL){ - // kvs_oplog_append(p, len, global_oplog_fd); - // } + int need_persist = is_update_cmd(&cmd); - // // __completed_cmd(p, len, global_seq); - // // global_seq ++; - - // if (global_cfg.replica_mode == REPLICA_ENABLE) { - // uint32_t off = 0; - // int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); - // if (ar == 0) { - // // __replica_notify(global_seq, off, (uint32_t)len); - // global_seq++; - - // } else { - // // shm 满或异常:你可以选择降级(比如直接跳过复制,或阻塞/丢弃) - // // 为了不影响主路径,这里先打印并跳过 - // fprintf(stderr, "replica_shm_append failed %d\n", ar); - // } - // } - // } - // } - // } - - int need_persist = is_update_cmd(&cmd); - - if(global_cfg.persistence == PERSIST_INCREMENTAL && need_persist){ - kvs_oplog_append(p, len, global_oplog_fd); - } + if(global_cfg.persistence == PERSIST_INCREMENTAL && need_persist){ + int ar = kvs_oplog_buffer_append(p, (size_t)len, global_oplog_fd); + if (ar < 0) { + return -1; + } + if (ar == KVS_OPLOG_BUF_FULL && kvs_oplog_flush(global_oplog_fd, 0) < 0) { + return -1; + } + } // __completed_cmd(p, len, global_seq); // global_seq ++; - if (global_cfg.replica_mode == REPLICA_ENABLE && need_persist) { + if (global_cfg.replica_mode == REPLICA_ENABLE && need_persist) { uint32_t off = 0; int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); if (ar == 0) { @@ -286,63 +234,69 @@ int kvs_protocol(struct conn* conn){ gettimeofday(&oplog_end, NULL); total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 + (oplog_end.tv_usec - oplog_start.tv_usec); -#endif - - /* 构建响应 */ - int resp_len = resp_build_value(&val, response + out_len, sizeof(response) - out_len); - if (resp_len < 0) { - /* 当前批次剩余空间不够,先把已拼好的刷到发送队列再重试 */ - if (flush_pending_response(conn, response, &out_len) < 0) { - return -1; - } - - resp_len = resp_build_value(&val, response, sizeof(response)); - if (resp_len < 0) { - size_t resp_need = 0; - uint8_t *resp_heap = NULL; - - if (resp_value_encoded_len(&val, &resp_need) < 0) { - return -1; - } - - resp_heap = (uint8_t *)malloc(resp_need); - if (!resp_heap) { - return -1; - } - - resp_len = resp_build_value(&val, resp_heap, resp_need); - if (resp_len < 0 || - chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { - free(resp_heap); - return -1; - } - - free(resp_heap); - resp_len = 0; - } - } - - out_len += (size_t)resp_len; - - __completed_cmd(request, consumed, 0); - - consumed += len; - } +#endif - #if TIME_COLLECT == 1 + /* 构建响应 */ + int resp_len = resp_build_value(&val, response + out_len, sizeof(response) - out_len); + if (resp_len < 0) { + /* 当前批次剩余空间不够,先把已拼好的刷到发送队列再重试 */ + if (flush_pending_response(conn, response, &out_len) < 0) { + return -1; + } + + resp_len = resp_build_value(&val, response, sizeof(response)); + if (resp_len < 0) { + size_t resp_need = 0; + uint8_t *resp_heap = NULL; + + if (resp_value_encoded_len(&val, &resp_need) < 0) { + return -1; + } + + resp_heap = (uint8_t *)kvs_malloc(resp_need); + if (!resp_heap) { + return -1; + } + + resp_len = resp_build_value(&val, resp_heap, resp_need); + if (resp_len < 0 || + chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { + free(resp_heap); + return -1; + } + + free(resp_heap); + resp_len = 0; + } + } + + out_len += (size_t)resp_len; + + // __completed_cmd(request, consumed, 0); + + consumed += len; + } + + if (global_cfg.persistence == PERSIST_INCREMENTAL) { + if (kvs_oplog_flush(global_oplog_fd, 1) < 0) { + return -1; + } + } + + #if TIME_COLLECT == 1 struct timeval func_end; gettimeofday(&func_end, NULL); long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 + (func_end.tv_usec - func_start.tv_usec); - fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); -#endif - - if (flush_pending_response(conn, response, &out_len) < 0) { - return -1; - } - - return consumed; -} + fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); +#endif + + if (flush_pending_response(conn, response, &out_len) < 0) { + return -1; + } + + return consumed; +} @@ -461,6 +415,7 @@ int init_config(AppConfig *cfg){ printf("Persistence : %s\n", persistence_to_string(cfg->persistence)); printf("|—— Persist-dir : %s\n", cfg->persist_dir); printf("|—— Persist-oplog : %s\n", cfg->oplog_file); + printf("|—— Oplog-sync : %s\n", oplog_sync_mode_to_string(cfg->oplog_sync_mode)); printf("|—— Persist-array : %s\n", cfg->array_file); printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file); printf("|—— Persist-hash : %s\n", cfg->hash_file); diff --git a/test-redis/README.md b/test-redis/README.md index d9da8b8..d1e0a12 100644 --- a/test-redis/README.md +++ b/test-redis/README.md @@ -1,96 +1,3 @@ -## 测试口径 - -- 时间:2026-03-04 -- 工具:`./test-redis/bench` -- kvstore 测试命令:`RSET/RGET` -- Redis 测试命令:`SET/GET` -- 通用参数: - - 写:`requests=10000 pipeline=128 keyspace=1000000000 value-size=32` - - 读:`requests=300000 pipeline=128 keyspace=100000 value-size=32` -- kvstore 复测时临时使用 `persistence=none`(避免历史 oplog 回放影响)。 - ---- - -## 优化项 #1:ChainBuffer 接收链路改造 - -改造点:`readv` 直写 + 回环 chunk + 节点池(减少接收路径中转拷贝)。 - -### A. 改造前后(kvstore) - -| 指标 | 改造前(旧记录,单次) | 改造后(本次,3轮均值) | 变化 | -|---|---:|---:|---:| -| RSET QPS | 260604 | 331063 | **+27.04%** | -| RGET QPS | 288107 | 360581 | **+25.15%** | - -> 说明:旧值来自此前同文档记录;新值是本次重跑 3 轮的均值,可信度更高。 - -### B. 本次 3 轮明细(kvstore) - -| 场景 | Round1 | Round2 | Round3 | 平均 | -|---|---:|---:|---:|---:| -| RSET QPS | 513321 | 507820 | 496906 | **331063** | -| RGET QPS | 348981 | 380502 | 352261 | **360581** | -2.87 ---- - -## Redis 对照(同口径复测) - -### A. 默认 Redis 实例(127.0.0.1:6379,3轮均值) - -| 场景 | Round1 | Round2 | Round3 | 平均 | -|---|---:|---:|---:|---:| -| SET QPS | 299221 | 130792 | 312117 | **247377** | -| GET QPS | 349242 | 343573 | 353091 | **348635** | - -### B. 与 kvstore 对比(本次均值) - -| 指标 | kvstore | redis:6379 | 相对变化(kvstore 对 redis) | -|---|---:|---:|---:| -| 写 QPS | 331063 | 247377 | **+33.83%** | -| 读 QPS | 360581 | 348635 | **+3.36%** | - -> 解释:这说明“kvstore 在当前写路径上有优势,但读路径仍落后 Redis”。 - ---- - -## Redis 持久化策略对比(写压测,SET) - -| 策略 | QPS | avg(us/op) | 备注 | -|---|---:|---:|---| -| `none`(无持久化) | **492561** | 2.03 | 最高吞吐(但不持久) | -| `rdb_default` | **285885** | 3.50 | 本次“持久化策略中最快” | -| `aof_no` | 281870 | 3.55 | AOF,`appendfsync no` | -| `aof_everysec` | 266878 | 3.75 | AOF,`appendfsync everysec` | -| `aof_always` | 110793 | 9.03 | 最慢,但最强一致性 | - -结论: - -- 如果包含“无持久化”,最快是 `none`。 -- 如果限定“必须持久化”,本次最快是 `rdb_default`(略快于 `aof_no`)。 - ---- - -## 历史复现命令(关键) - -```bash -# kvstore 写(RSET) -./test-redis/bench --host 127.0.0.1 --port 8888 --mode set --set-cmd RSET --get-cmd RGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --key-prefix bench:ts:set: - -# kvstore 读(RGET) -./test-redis/bench --host 127.0.0.1 --port 8888 --mode get --set-cmd RSET --get-cmd RGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --verify-get --key-prefix bench:ts:get: - -# kvstore 写(HSET) -./test-redis/bench --host 127.0.0.1 --port 8888 --mode set --set-cmd HSET --get-cmd HGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --key-prefix bench:ts:set: - -# kvstore 读(HGET) -./test-redis/bench --host 127.0.0.1 --port 8888 --mode get --set-cmd HSET --get-cmd HGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --verify-get --key-prefix bench:ts:get: - -# Redis 策略对比(示例:6381 配置成 rdb_default) -./test-redis/bench --host 127.0.0.1 --port 6381 --mode set --requests 10000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench:ts:redis: -``` - ---- - ## `bench` 程序使用说明(命令 + 参数解释) ### 1) 快速查看帮助 @@ -136,6 +43,34 @@ | `--seed ` | 当前时间 | 随机种子;固定后可复现实验 | | `--verify-get` | 关闭 | 开启后校验 GET 返回内容是否与写入值一致 | | `--help` / `-h` | - | 打印帮助 | + + +## 命令 + +```bash +# kvstore 写(RSET) +./test-redis/bench --host 127.0.0.1 --port 8888 --mode set --set-cmd RSET --get-cmd RGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --key-prefix bench:ts:set: + +# kvstore 读(RGET) +./test-redis/bench --host 127.0.0.1 --port 8888 --mode get --set-cmd RSET --get-cmd RGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --verify-get --key-prefix bench:ts:get: + + +# kvstore 写(HSET) +./test-redis/bench --host 127.0.0.1 --port 8888 --mode set --set-cmd HSET --get-cmd HGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --key-prefix bench:ts:set: + +# kvstore 读(HGET) +./test-redis/bench --host 127.0.0.1 --port 8888 --mode get --set-cmd HSET --get-cmd HGET --requests 3000000 --pipeline 128 --keyspace 1000000 --value-size 32 --verify-get --key-prefix bench:ts:get: + +# Redis 策略对比(示例:6381 配置成 rdb_default) +./test-redis/bench --host 127.0.0.1 --port 6381 --mode set --requests 3000000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench:ts:redis: +./test-redis/bench --host 127.0.0.1 --port 6379 --mode set --requests 3000000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench:ts:redis: + +./test-redis/bench --host 127.0.0.1 --port 6381 --mode set --requests 3000000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench:ts:redis: +./test-redis/bench --host 127.0.0.1 --port 6379 --mode set --requests 3000000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench:ts:redis: +``` + + + ## run_hash_bench.sh 三轮均值复测(2026-03-05 07:59:54) - 轮次:3 轮(取均值) @@ -171,3 +106,79 @@ | aof_always (aof_always) | set | 75968 | 78265 | 76608 | 76947.00 | 13.00 | 13.00 | | aof_always (aof_always) | get | 276839 | 271247 | 275017 | 274367.67 | 3.65 | 3.65 | + +## run_hash_bench.sh 三轮均值复测(2026-03-05 12:58:34) + +- 轮次:5 轮(取均值) +- 参数:requests=1000000 pipeline=128 keyspace=1000000 value-size=32 +- 明细数据:`results/hash_bench_detail_20260305_125834.csv` +- 汇总数据:`results/hash_bench_summary_20260305_125834.csv` + +### kvstore:RSET/RGET(持久化 × allocator) + +| 场景 | 模式 | Round1 | Round2 | Round3 | Round4 | Round5 | 均值QPS | 均值avg(us/op) | 均值elapsed(s) | +|---|---:|---:|---:|---:|---:|---:|---:|---:|---:| +| persist_mypool (incremental, mypool) | set | 166832 | 172252 | 184622 | 173181 | 180547 | 175486.80 | 5.71 | 5.71 | +| persist_mypool (incremental, mypool) | get | 187976 | 188203 | 193459 | 184560 | 190543 | 188948.20 | 5.29 | 5.29 | +| nopersist_mypool (none, mypool) | set | 183273 | 180666 | 184588 | 182765 | 180441 | 182346.60 | 5.49 | 5.48 | +| nopersist_mypool (none, mypool) | get | 186969 | 185652 | 183546 | 187826 | 191845 | 187167.60 | 5.34 | 5.34 | +| persist_malloc (incremental, malloc) | set | 189030 | 135132 | 163693 | 161888 | 163137 | 162576.00 | 6.22 | 6.22 | +| persist_malloc (incremental, malloc) | get | 198683 | 166038 | 159371 | 184912 | 181288 | 178058.40 | 5.65 | 5.65 | +| nopersist_malloc (none, malloc) | set | 181197 | 189295 | 189128 | 181291 | 178993 | 183980.80 | 5.44 | 5.44 | +| nopersist_malloc (none, malloc) | get | 189424 | 193316 | 194474 | 186572 | 163052 | 185367.60 | 5.42 | 5.42 | + +### Redis:SET/GET(各持久化模式) + +| 场景 | 模式 | Round1 | Round2 | Round3 | Round4 | Round5 | 均值QPS | 均值avg(us/op) | 均值elapsed(s) | +|---|---:|---:|---:|---:|---:|---:|---:|---:|---:| +| none (none) | set | 229799 | 235912 | 233883 | 232683 | 239711 | 234397.60 | 4.27 | 4.27 | +| none (none) | get | 256082 | 274863 | 257801 | 250389 | 262797 | 260386.40 | 3.85 | 3.84 | +| rdb_default (rdb_default) | set | 238612 | 232348 | 231110 | 233426 | 227900 | 232679.20 | 4.30 | 4.30 | +| rdb_default (rdb_default) | get | 268351 | 263651 | 268189 | 265139 | 249811 | 263028.20 | 3.80 | 3.80 | +| aof_no (aof_no) | set | 178673 | 199836 | 205048 | 204901 | 195694 | 196830.40 | 5.09 | 5.09 | +| aof_no (aof_no) | get | 245237 | 271172 | 257791 | 260414 | 269400 | 260802.80 | 3.84 | 3.84 | +| aof_everysec (aof_everysec) | set | 198803 | 193822 | 188866 | 170821 | 97423 | 169947.00 | 6.32 | 6.32 | +| aof_everysec (aof_everysec) | get | 277244 | 257143 | 253943 | 269896 | 284002 | 268445.60 | 3.73 | 3.73 | +| aof_always (aof_always) | set | 66274 | 70593 | 65851 | 54098 | 74587 | 66280.60 | 15.27 | 15.27 | +| aof_always (aof_always) | get | 263043 | 265738 | 257121 | 266889 | 272781 | 265114.40 | 3.77 | 3.77 | + + +## run_hash_bench.sh 三轮均值复测(2026-03-06 09:00:30) + +- 轮次:3 轮(取均值) +- 参数:requests=1000000 pipeline=128 keyspace=1000000 value-size=32 +- 明细数据:`results/hash_bench_detail_20260306_090030.csv` +- 汇总数据:`results/hash_bench_summary_20260306_090030.csv` + +### kvstore:RSET/RGET(持久化 × allocator) + +| 场景 | 模式 | Round1 | Round2 | Round3 | 均值QPS | 均值avg(us/op) | 均值elapsed(s) | +|---|---:|---:|---:|---:|---:|---:|---:| +| persist_mypool (incremental, mypool) | set | 170954 | 158299 | 175493 | 168248.67 | 5.96 | 5.96 | +| persist_mypool (incremental, mypool) | get | 173519 | 181416 | 188295 | 181076.67 | 5.53 | 5.53 | +| everysec_mypool (incremental, mypool) | set | 158378 | 160029 | 175998 | 164801.67 | 6.08 | 6.08 | +| everysec_mypool (incremental, mypool) | get | 180343 | 177421 | 177644 | 178469.33 | 5.60 | 5.60 | +| nopersist_mypool (none, mypool) | set | 168031 | 160304 | 168327 | 165554.00 | 6.04 | 6.04 | +| nopersist_mypool (none, mypool) | get | 173737 | 173007 | 161784 | 169509.33 | 5.91 | 5.91 | +| persist_malloc (incremental, malloc) | set | 160752 | 142393 | 182518 | 161887.67 | 6.24 | 6.24 | +| persist_malloc (incremental, malloc) | get | 157918 | 177457 | 181606 | 172327.00 | 5.83 | 5.82 | +| everysec_malloc (incremental, malloc) | set | 178195 | 168682 | 177041 | 174639.33 | 5.73 | 5.73 | +| everysec_malloc (incremental, malloc) | get | 193145 | 149187 | 178615 | 173649.00 | 5.83 | 5.83 | +| nopersist_malloc (none, malloc) | set | 170011 | 172967 | 171752 | 171576.67 | 5.83 | 5.83 | +| nopersist_malloc (none, malloc) | get | 174777 | 173381 | 181365 | 176507.67 | 5.67 | 5.67 | + +### Redis:SET/GET(各持久化模式) + +| 场景 | 模式 | Round1 | Round2 | Round3 | 均值QPS | 均值avg(us/op) | 均值elapsed(s) | +|---|---:|---:|---:|---:|---:|---:|---:| +| none (none) | set | 218920 | 223448 | 222306 | 221558.00 | 4.52 | 4.51 | +| none (none) | get | 252607 | 255685 | 256130 | 254807.33 | 3.92 | 3.92 | +| rdb_default (rdb_default) | set | 216497 | 220924 | 214752 | 217391.00 | 4.60 | 4.60 | +| rdb_default (rdb_default) | get | 252492 | 246194 | 248558 | 249081.33 | 4.01 | 4.02 | +| aof_no (aof_no) | set | 166307 | 186052 | 181121 | 177826.67 | 5.63 | 5.64 | +| aof_no (aof_no) | get | 255742 | 254771 | 257784 | 256099.00 | 3.91 | 3.90 | +| aof_everysec (aof_everysec) | set | 180584 | 184429 | 172466 | 179159.67 | 5.59 | 5.59 | +| aof_everysec (aof_everysec) | get | 257102 | 232146 | 242470 | 243906.00 | 4.11 | 4.11 | +| aof_always (aof_always) | set | 65397 | 67487 | 67538 | 66807.33 | 14.97 | 14.97 | +| aof_always (aof_always) | get | 235975 | 247116 | 227383 | 236824.67 | 4.23 | 4.23 | + diff --git a/test-redis/bench b/test-redis/bench new file mode 100755 index 0000000..3959e4e Binary files /dev/null and b/test-redis/bench differ diff --git a/test-redis/run_hash_bench.sh b/test-redis/run_hash_bench.sh index c9bcc94..56166ce 100755 --- a/test-redis/run_hash_bench.sh +++ b/test-redis/run_hash_bench.sh @@ -54,11 +54,12 @@ set_config() { local ptype="$1" local alloc="$2" local pdir="$3" - python3 - "$CONFIG_XML" "$ptype" "$alloc" "$pdir" "$KV_PORT" <<'PY' + local oplog_sync="$4" + python3 - "$CONFIG_XML" "$ptype" "$alloc" "$pdir" "$KV_PORT" "$oplog_sync" <<'PY' import sys import xml.etree.ElementTree as ET -path, ptype, alloc, pdir, kv_port = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5] +path, ptype, alloc, pdir, kv_port, oplog_sync = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6] tree = ET.parse(path) root = tree.getroot() @@ -81,10 +82,13 @@ persistence = root.find("persistence") if persistence is not None: t = persistence.find("type") d = persistence.find("dir") + osync = persistence.find("oplog_sync") if t is not None: t.text = ptype if d is not None: d.text = pdir + if osync is not None: + osync.text = oplog_sync memory = root.find("memory") if memory is not None: @@ -277,6 +281,7 @@ run_kv_case() { local strategy="$1" local persistence="$2" local alloc="$3" + local oplog_sync="$4" local -a set_qps_list=() set_avg_list=() set_elapsed_list=() local -a get_qps_list=() get_avg_list=() get_elapsed_list=() @@ -289,7 +294,7 @@ run_kv_case() { rm -rf "$pdir_abs" mkdir -p "$pdir_abs" - set_config "$persistence" "$alloc" "$pdir_rel" + set_config "$persistence" "$alloc" "$pdir_rel" "$oplog_sync" start_kv "${strategy}_r${round}" local m qps avg elapsed @@ -496,10 +501,12 @@ main() { require_cmd redis-cli ensure_binaries - run_kv_case "persist_mypool" "incremental" "mypool" - run_kv_case "nopersist_mypool" "none" "mypool" - run_kv_case "persist_malloc" "incremental" "malloc" - run_kv_case "nopersist_malloc" "none" "malloc" + run_kv_case "persist_mypool" "incremental" "mypool" "none" + run_kv_case "everysec_mypool" "incremental" "mypool" "every_sec" + run_kv_case "nopersist_mypool" "none" "mypool" "none" + run_kv_case "persist_malloc" "incremental" "malloc" "none" + run_kv_case "everysec_malloc" "incremental" "malloc" "every_sec" + run_kv_case "nopersist_malloc" "none" "malloc" "none" run_redis_case "none" "none" 6381 "\"\"" "no" "everysec" run_redis_case "rdb_default" "rdb_default" 6382 "900 1 300 10 60 10000" "no" "everysec" diff --git a/test-redis/run_overwrite5_bench.sh b/test-redis/run_overwrite5_bench.sh new file mode 100644 index 0000000..5b9de59 --- /dev/null +++ b/test-redis/run_overwrite5_bench.sh @@ -0,0 +1,155 @@ +#!/usr/bin/env bash +set -euo pipefail + +TS=$(date +%Y%m%d_%H%M%S) +CSV="test-redis/results/kv_overwrite5_${TS}.csv" +SUMMARY="test-redis/results/kv_overwrite5_summary_${TS}.csv" +BACKUP=$(mktemp /tmp/config.xml.bak.${TS}.XXXXXX) +cp config/config.xml "$BACKUP" + +cleanup() { + cp "$BACKUP" config/config.xml || true + rm -f "$BACKUP" || true + pkill -x kvstore >/dev/null 2>&1 || true +} +trap cleanup EXIT + +pkill -x kvstore >/dev/null 2>&1 || true +mkdir -p test-redis/results +rm -rf data/* + +echo "case,mode,round,qps,avg_us,requests,keyspace,value_size,pipeline" > "$CSV" + +auto_set_cfg() { + local ptype="$1" alloc="$2" syncm="$3" pdir="$4" + python3 - "$ptype" "$alloc" "$syncm" "$pdir" <<'PY' +import sys +import xml.etree.ElementTree as ET + +ptype, alloc, syncm, pdir = sys.argv[1:] +path = "config/config.xml" +tree = ET.parse(path) +root = tree.getroot() + +server = root.find("server") +if server is not None: + ip = server.find("ip") + port = server.find("port") + mode = server.find("mode") + replica = server.find("replica") + if ip is not None: + ip.text = "127.0.0.1" + if port is not None: + port.text = "8888" + if mode is not None: + mode.text = "master" + if replica is not None: + replica.text = "disable" + +persist = root.find("persistence") +if persist is not None: + ptype_node = persist.find("type") + dir_node = persist.find("dir") + sync_node = persist.find("oplog_sync") + if ptype_node is not None: + ptype_node.text = ptype + if dir_node is not None: + dir_node.text = pdir + if sync_node is not None: + sync_node.text = syncm + +memory = root.find("memory") +if memory is not None: + alloc_node = memory.find("allocator") + leak_node = memory.find("leakage") + if alloc_node is not None: + alloc_node.text = alloc + if leak_node is not None: + leak_node.text = "disable" + +tree.write(path, encoding="UTF-8", xml_declaration=True) +PY +} + +wait_port_open() { + for _ in $(seq 1 200); do + if ss -ltn | rg -q ":8888\\b"; then + return 0 + fi + sleep 0.1 + done + return 1 +} + +extract_qps() { + local line="$1" + echo "$line" | sed -E "s/.*qps=([0-9]+).*/\\1/" +} + +extract_avg() { + local line="$1" + echo "$line" | sed -E "s/.*avg=([0-9]+(\\.[0-9]+)?).*/\\1/" +} + +run_case() { + local cname="$1" ptype="$2" alloc="$3" syncm="$4" + local REQ=2000000 KEYSPACE=1000000 VSIZE=64 PIPE=128 + + for r in 1 2 3 4 5; do + local pdir="data/ovw_${cname}_r${r}_${TS}" + local line qps avg out kp + + rm -rf "$pdir" + mkdir -p "$pdir" + auto_set_cfg "$ptype" "$alloc" "$syncm" "$pdir" + + ./kvstore > "/tmp/kv_${cname}_r${r}_${TS}.log" 2>&1 & + local kvpid=$! + wait_port_open + + kp="bench:ovw:long-key-prefix-abcdefghijklmnopqrstuvwxyz-0123456789:${cname}:r${r}:set:" + out=$(./test-redis/bench \ + --host 127.0.0.1 --port 8888 --mode set \ + --set-cmd RSET --get-cmd RGET \ + --requests "$REQ" --pipeline "$PIPE" --keyspace "$KEYSPACE" \ + --value-size "$VSIZE" --seed $((13000 + r * 31)) \ + --key-prefix "$kp") + echo "$out" + line=$(echo "$out" | rg "\\[result\\]" | tail -n1) + qps=$(extract_qps "$line") + avg=$(extract_avg "$line") + echo "${cname},set,${r},${qps},${avg},${REQ},${KEYSPACE},${VSIZE},${PIPE}" >> "$CSV" + + kp="bench:ovw:long-key-prefix-abcdefghijklmnopqrstuvwxyz-0123456789:${cname}:r${r}:get:" + out=$(./test-redis/bench \ + --host 127.0.0.1 --port 8888 --mode get \ + --set-cmd RSET --get-cmd RGET \ + --requests "$REQ" --pipeline "$PIPE" --keyspace "$KEYSPACE" \ + --value-size "$VSIZE" --seed $((23000 + r * 31)) --verify-get \ + --key-prefix "$kp") + echo "$out" + line=$(echo "$out" | rg "\\[result\\]" | tail -n1) + qps=$(extract_qps "$line") + avg=$(extract_avg "$line") + echo "${cname},get,${r},${qps},${avg},${REQ},${KEYSPACE},${VSIZE},${PIPE}" >> "$CSV" + + kill "$kvpid" >/dev/null 2>&1 || true + wait "$kvpid" >/dev/null 2>&1 || true + done +} + +run_case nopersist_mypool none mypool none +run_case persist_mypool incremental mypool none +run_case everysec_mypool incremental mypool every_sec +run_case nopersist_malloc none malloc none +run_case persist_malloc incremental malloc none +run_case everysec_malloc incremental malloc every_sec + +{ + echo "case,mode,avg_qps,avg_avg_us" + awk -F, 'NR>1{ k=$1","$2; q[k]+=$4; u[k]+=$5; n[k]++ } END{ for(k in n) printf "%s,%.2f,%.2f\n", k, q[k]/n[k], u[k]/n[k] }' "$CSV" | sort +} > "$SUMMARY" + +echo "RAW_CSV=$CSV" +echo "SUMMARY_CSV=$SUMMARY" +cat "$SUMMARY"