diff --git a/Makefile b/Makefile index a2aa845..192026a 100644 --- a/Makefile +++ b/Makefile @@ -21,12 +21,14 @@ SUBDIR = ./NtyCo/ TEST_REDIS = ./test-redis/testcase TEST_REDIS_SRCS = ./test-redis/test.c +TEST_REDIS_BENCH = ./test-redis/bench +TEST_REDIS_BENCH_SRCS = ./test-redis/bench.c TEST_REDIS_LDFLAGS = -lhiredis OBJS = $(SRCS:.c=.o) -all: $(SUBDIR) $(TARGET) $(TEST_REDIS) +all: $(SUBDIR) $(TARGET) $(TEST_REDIS) $(TEST_REDIS_BENCH) $(SUBDIR): ECHO make -C $@ @@ -40,11 +42,14 @@ $(TARGET): $(OBJS) $(TEST_REDIS): $(TEST_REDIS_SRCS) $(CC) -g -o $@ $^ $(TEST_REDIS_LDFLAGS) +$(TEST_REDIS_BENCH): $(TEST_REDIS_BENCH_SRCS) + $(CC) -g -o $@ $^ $(TEST_REDIS_LDFLAGS) + %.o: %.c $(CC) $(CFLAGS) $(INC) -c $^ -g -o $@ clean: clmem cldata - rm -rf $(OBJS) $(TARGET) $(TEST_REDIS) + rm -rf $(OBJS) $(TARGET) $(TEST_REDIS) $(TEST_REDIS_BENCH) clmem: rm -rf mem_leak/* diff --git a/chainbuffer.plan.md b/chainbuffer.plan.md new file mode 100644 index 0000000..92d7348 --- /dev/null +++ b/chainbuffer.plan.md @@ -0,0 +1,189 @@ +# ChainBuffer + 全量 WAL(单一方案,slot 驱动) + +## 1. 目标与约束 + +目标: + +- 所有命令都落盘(不再区分 update/get)。 +- 主线程 A 尽量不做内存分配,不构造 uring task。 +- 主线程 A 只做:收包、解析命令边界、摘取 payload、写入预分配 slot。 +- WAL 线程 B 负责:从 slot 取 payload、构造 task、提交 io_uring、归还内存块。 + +约束: + +- 只保留一个实现方案,不引入 v1/v2 双路径。 +- 正确性优先:半包、多包、pipeline、断连、慢盘都可控。 + +--- + +## 2. ChainBuffer 设计(重点) + +## 2.1 结构定义 + +采用“回环 chunk 链表”: + +```c +typedef struct cb_chunk { + struct cb_chunk *next; + uint32_t cap; // 固定大小,默认 4096 + uint32_t rpos; // 读指针 [0, cap) + uint32_t wpos; // 写指针 [0, cap) + uint32_t used; // 已用字节数 [0, cap] + uint32_t refcnt; // 被 WAL payload 持有的引用计数 + uint8_t data[]; +} cb_chunk_t; + +typedef struct chain_buffer { + cb_chunk_t *head; + cb_chunk_t *tail; + size_t total_len; + uint32_t chunk_size; + + // 节点池:避免频繁 malloc/free + cb_chunk_t *free_list; + uint32_t free_count; + uint32_t free_limit; +} chain_buffer_t; +``` + +说明: + +- chunk 内部可回环读写(避免 memmove)。 +- 多 chunk 串联用于扩容。 +- 空 chunk 不释放到系统,先回收到 `free_list`。 + +## 2.2 接收路径(readv 直写) + +主线程不再 `recv -> tmp -> memcpy`,改为: + +1. `chain_buffer_prepare_recv_iov(buf, iov, max_iov)` + - 收集可写空闲段(优先尾 chunk,不够就从 free_list 取或新建 chunk)。 +2. `readv(fd, iov, iovcnt)` +3. `chain_buffer_commit_recv(buf, nread)` + - 推进 `wpos/used/total_len`。 + +这样可去掉接收中转拷贝。 + +## 2.3 消费与摘取 + +提供三个核心能力: + +- `chain_buffer_iter_*`:按字节流遍历,供 RESP 解析命令边界。 +- `chain_buffer_detach_prefix(buf, len, cb_payload_t *out)`:把前缀字节摘成 payload(尽量零拷贝,边界切分允许一次小拷贝)。 +- `chain_buffer_release_payload(buf, payload)`:WAL 线程用完后归还 chunk(refcnt--,归零后回 free_list)。 + +--- + +## 3. slot 队列设计(主线程零分配) + +## 3.1 预分配 ring + +定义单生产者单消费者 ring(A->B): + +```c +#define WAL_SLOT_CAP 65536 + +typedef struct wal_slot { + uint64_t seq; + uint32_t cmd_len; + cb_payload_t payload; // 命令字节 + uint8_t in_use; +} wal_slot_t; + +typedef struct wal_ring { + wal_slot_t slots[WAL_SLOT_CAP]; + _Atomic uint32_t head; // producer write + _Atomic uint32_t tail; // consumer read + _Atomic uint32_t size; +} wal_ring_t; +``` + +特点: + +- slot 全预分配。 +- 主线程写 slot 不 malloc。 +- WAL 线程消费后清空 slot 并前移 tail。 + +## 3.2 主线程流程(所有命令都落盘) + +对每条完整命令: + +1. 从 chainbuffer 解析得到 `cmd_len`。 +2. 申请一个空 slot(ring 未满)。 +3. `detach_prefix(cmd_len)` 得到 `payload`。 +4. 写入 slot:`seq/cmd_len/payload`。 +5. 发布 head。 + +注意: + +- 不做命令类型判断。 +- 不构造 uring task。 +- 不进行额外 payload malloc。 + +## 3.3 WAL 线程流程 + +循环: + +1. 从 ring 取 slot。 +2. 生成长度头(4B)+ payload iov。 +3. 调 `submit_write()`(当前打包拷贝发生在此线程)。 +4. `chain_buffer_release_payload()` 归还 chunk。 +5. 清 slot,推进 tail。 + +--- + +## 4. 一致性与回压 + +## 4.1 日志顺序 + +- `g_log_off` 只在 WAL 线程维护。 +- 单消费者天然保证写入顺序与入队顺序一致。 + +## 4.2 回压(必须) + +当 ring 达到高水位(例如 80%): + +1. 暂停该连接读事件(优先)。 +2. 若持续超时(如 200ms)仍高水位,关闭连接保护系统。 + +本方案不再做“回退旧路径”。 + +## 4.3 异常处理 + +- `submit_write` 失败:记录错误并触发保护动作(可选停机/拒绝新连接)。 +- 连接断开:已入 slot 的 payload 继续由 WAL 线程完成归还。 +- 进程退出:先停读,再 drain wal_ring,再 shutdown uring。 + +--- + +## 5. 实施步骤(实际落地) + +1. 重构 `network/chainbuffer.*` + - 回环 chunk + free_list + - `prepare_recv_iov/commit_recv` + - `iter/detach_prefix/release_payload` +2. 修改 `reactor.c` + - `recv_cb` 使用 `readv` + commit +3. 修改 `kvstore.c` + - 按命令边界循环 + - 每条命令统一入 wal_slot(不分类) +4. 新增 `dump/wal_slot_queue.*`(或放 `dump/kvs_oplog.c`) + - SPSC ring + WAL worker +5. 调整 `dump/kvs_oplog.c` + - 改为 WAL 线程消费 slot 后调用 `submit_write` +6. 收敛退出与错误路径 + - close_conn / shutdown / submit 失败全覆盖 + +--- + +## 6. 验收标准 + +- 功能: + - 半包/多包/pipeline 正确; + - 重启回放后数据一致。 +- 性能: + - 主线程不再出现 oplog task 构造热点; + - 接收路径 memcpy 次数下降(去掉 tmp 中转)。 +- 稳定性: + - 慢盘压测下无泄漏、无 double free、无 UAF; + - 高水位触发时系统行为可预期。 diff --git a/config/config.xml b/config/config.xml index 47db930..4df1944 100644 --- a/config/config.xml +++ b/config/config.xml @@ -18,7 +18,7 @@ - incremental + none data kvs_oplog.db diff --git a/network/chainbuffer.c b/network/chainbuffer.c index 079dfa9..cadc6a1 100644 --- a/network/chainbuffer.c +++ b/network/chainbuffer.c @@ -1,48 +1,245 @@ #include "network/chainbuffer.h" +#include #include #include #include #include -#include #define CHAINBUFFER_DEFAULT_CHUNK 4096 #define CHAINBUFFER_MAX_IOV 16 +#define CHAINBUFFER_DEFAULT_FREE_LIMIT 256 struct chain_buffer_node { struct chain_buffer_node *next; - size_t start; - size_t end; size_t cap; + size_t rpos; + size_t wpos; + size_t used; + unsigned refcnt; uint8_t data[]; }; static chain_buffer_node_t *alloc_node(size_t cap) { - chain_buffer_node_t *node = (chain_buffer_node_t *)malloc(sizeof(*node) + cap); + chain_buffer_node_t *node; + + node = (chain_buffer_node_t *)malloc(sizeof(*node) + cap); if (!node) { return NULL; } + node->next = NULL; - node->start = 0; - node->end = 0; node->cap = cap; + node->rpos = 0; + node->wpos = 0; + node->used = 0; + node->refcnt = 0; return node; } +static void reset_node_state(chain_buffer_node_t *node) { + if (!node) { + return; + } + node->next = NULL; + node->rpos = 0; + node->wpos = 0; + node->used = 0; + node->refcnt = 0; +} + +static size_t min_size(size_t a, size_t b) { + return a < b ? a : b; +} + +static size_t node_read_seg1_len(const chain_buffer_node_t *node) { + size_t tail_len; + + if (!node || node->used == 0) { + return 0; + } + + tail_len = node->cap - node->rpos; + return min_size(node->used, tail_len); +} + +static size_t node_write_seg1_len(const chain_buffer_node_t *node) { + size_t free_total; + size_t tail_len; + + if (!node || node->used >= node->cap) { + return 0; + } + + free_total = node->cap - node->used; + tail_len = node->cap - node->wpos; + return min_size(free_total, tail_len); +} + +static void node_read_segments(const chain_buffer_node_t *node, + uint8_t **p1, size_t *l1, + uint8_t **p2, size_t *l2) { + size_t seg1; + size_t seg2; + + seg1 = node_read_seg1_len(node); + seg2 = (node && node->used > seg1) ? (node->used - seg1) : 0; + + if (p1) { + *p1 = (seg1 > 0) ? (uint8_t *)(node->data + node->rpos) : NULL; + } + if (l1) { + *l1 = seg1; + } + if (p2) { + *p2 = (seg2 > 0) ? (uint8_t *)node->data : NULL; + } + if (l2) { + *l2 = seg2; + } +} + +static void node_write_segments(const chain_buffer_node_t *node, + uint8_t **p1, size_t *l1, + uint8_t **p2, size_t *l2) { + size_t free_total; + size_t seg1; + size_t seg2; + + if (!node || node->used >= node->cap) { + if (p1) *p1 = NULL; + if (l1) *l1 = 0; + if (p2) *p2 = NULL; + if (l2) *l2 = 0; + return; + } + + free_total = node->cap - node->used; + seg1 = node_write_seg1_len(node); + seg2 = free_total - seg1; + + if (p1) { + *p1 = (seg1 > 0) ? (uint8_t *)(node->data + node->wpos) : NULL; + } + if (l1) { + *l1 = seg1; + } + if (p2) { + *p2 = (seg2 > 0) ? (uint8_t *)node->data : NULL; + } + if (l2) { + *l2 = seg2; + } +} + +static void node_advance_read(chain_buffer_node_t *node, size_t n) { + if (!node || n == 0) { + return; + } + node->rpos = (node->rpos + n) % node->cap; + node->used -= n; +} + +static void node_advance_write(chain_buffer_node_t *node, size_t n) { + if (!node || n == 0) { + return; + } + node->wpos = (node->wpos + n) % node->cap; + node->used += n; +} + +static chain_buffer_node_t *acquire_node(chain_buffer_t *buf, size_t cap) { + chain_buffer_node_t *node = NULL; + + if (!buf || cap == 0) { + return NULL; + } + + if (cap == buf->chunk_size && buf->free_list) { + node = buf->free_list; + buf->free_list = node->next; + buf->free_count--; + reset_node_state(node); + return node; + } + + node = alloc_node(cap); + return node; +} + +static void recycle_node(chain_buffer_t *buf, chain_buffer_node_t *node) { + if (!node) { + return; + } + + if (!buf || node->cap != buf->chunk_size || buf->free_count >= buf->free_limit) { + free(node); + return; + } + + reset_node_state(node); + node->next = buf->free_list; + buf->free_list = node; + buf->free_count++; +} + +static int append_new_tail(chain_buffer_t *buf, size_t cap) { + chain_buffer_node_t *node; + + node = acquire_node(buf, cap); + if (!node) { + return -1; + } + + if (!buf->tail) { + buf->head = node; + buf->tail = node; + } else { + buf->tail->next = node; + buf->tail = node; + } + return 0; +} + +static void list_append_node(chain_buffer_list_t *list, chain_buffer_node_t *node) { + if (!list || !node) { + return; + } + node->next = NULL; + if (!list->tail) { + list->head = node; + list->tail = node; + } else { + list->tail->next = node; + list->tail = node; + } +} + void chain_buffer_init(chain_buffer_t *buf, size_t chunk_size) { if (!buf) { return; } + memset(buf, 0, sizeof(*buf)); buf->chunk_size = chunk_size ? chunk_size : CHAINBUFFER_DEFAULT_CHUNK; + buf->free_limit = CHAINBUFFER_DEFAULT_FREE_LIMIT; } void chain_buffer_reset(chain_buffer_t *buf) { + chain_buffer_node_t *node; + if (!buf) { return; } - chain_buffer_node_t *node = buf->head; + node = buf->head; + while (node) { + chain_buffer_node_t *next = node->next; + free(node); + node = next; + } + + node = buf->free_list; while (node) { chain_buffer_node_t *next = node->next; free(node); @@ -50,11 +247,7 @@ void chain_buffer_reset(chain_buffer_t *buf) { } free(buf->linear_cache); - buf->linear_cache = NULL; - buf->linear_cap = 0; - buf->head = NULL; - buf->tail = NULL; - buf->total_len = 0; + memset(buf, 0, sizeof(*buf)); } size_t chain_buffer_len(const chain_buffer_t *buf) { @@ -62,8 +255,10 @@ size_t chain_buffer_len(const chain_buffer_t *buf) { } int chain_buffer_append(chain_buffer_t *buf, const void *data, size_t len) { - const uint8_t *src = (const uint8_t *)data; - if (!buf || (!src && len > 0)) { + const uint8_t *src; + size_t remain; + + if (!buf || (!data && len > 0)) { errno = EINVAL; return -1; } @@ -75,78 +270,97 @@ int chain_buffer_append(chain_buffer_t *buf, const void *data, size_t len) { return -1; } - size_t remain = len; + src = (const uint8_t *)data; + remain = len; + while (remain > 0) { - chain_buffer_node_t *tail = buf->tail; - size_t writable = 0; + chain_buffer_node_t *tail; + uint8_t *p1; + uint8_t *p2; + size_t l1; + size_t l2; + size_t writable; + size_t n; + size_t c1; - if (tail && tail->end < tail->cap) { - writable = tail->cap - tail->end; - } - - if (writable == 0) { + if (!buf->tail || buf->tail->used == buf->tail->cap) { size_t cap = remain > buf->chunk_size ? remain : buf->chunk_size; - chain_buffer_node_t *node = alloc_node(cap); - if (!node) { + if (append_new_tail(buf, cap) < 0) { errno = ENOMEM; return -1; } - - if (buf->tail) { - buf->tail->next = node; - buf->tail = node; - } else { - buf->head = node; - buf->tail = node; - } - tail = node; - writable = tail->cap; } - size_t n = remain < writable ? remain : writable; - memcpy(tail->data + tail->end, src, n); - tail->end += n; + tail = buf->tail; + node_write_segments(tail, &p1, &l1, &p2, &l2); + writable = l1 + l2; + if (writable == 0) { + continue; + } + + n = min_size(remain, writable); + c1 = min_size(n, l1); + if (c1 > 0) { + memcpy(p1, src, c1); + } + if (n > c1) { + memcpy(p2, src + c1, n - c1); + } + + node_advance_write(tail, n); + buf->total_len += n; src += n; remain -= n; - buf->total_len += n; } return 0; } size_t chain_buffer_drain(chain_buffer_t *buf, size_t len) { + size_t remain; + size_t drained; + if (!buf || len == 0 || buf->total_len == 0) { return 0; } - size_t remain = len; - size_t drained = 0; + remain = len; + drained = 0; while (remain > 0 && buf->head) { chain_buffer_node_t *node = buf->head; - size_t avail = node->end - node->start; + size_t n = min_size(remain, node->used); - if (remain < avail) { - node->start += remain; - buf->total_len -= remain; - drained += remain; - break; + if (n == 0) { + buf->head = node->next; + if (!buf->head) { + buf->tail = NULL; + } + recycle_node(buf, node); + continue; } - remain -= avail; - drained += avail; - buf->total_len -= avail; - buf->head = node->next; - if (!buf->head) { - buf->tail = NULL; + node_advance_read(node, n); + buf->total_len -= n; + drained += n; + remain -= n; + + if (node->used == 0) { + assert(node->refcnt == 0); + buf->head = node->next; + if (!buf->head) { + buf->tail = NULL; + } + recycle_node(buf, node); } - free(node); } return drained; } const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len) { + size_t offset; + if (!buf) { return NULL; } @@ -160,7 +374,13 @@ const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len) { } if (buf->head == buf->tail && buf->head) { - return buf->head->data + buf->head->start; + chain_buffer_node_t *node = buf->head; + if (node->used == 0) { + return NULL; + } + if (node_read_seg1_len(node) == node->used) { + return node->data + node->rpos; + } } if (buf->linear_cap < buf->total_len) { @@ -172,20 +392,33 @@ const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len) { buf->linear_cap = buf->total_len; } - size_t offset = 0; + offset = 0; for (chain_buffer_node_t *node = buf->head; node; node = node->next) { - size_t avail = node->end - node->start; - if (avail == 0) { - continue; + uint8_t *p1; + uint8_t *p2; + size_t l1; + size_t l2; + + node_read_segments(node, &p1, &l1, &p2, &l2); + if (l1 > 0) { + memcpy(buf->linear_cache + offset, p1, l1); + offset += l1; + } + if (l2 > 0) { + memcpy(buf->linear_cache + offset, p2, l2); + offset += l2; } - memcpy(buf->linear_cache + offset, node->data + node->start, avail); - offset += avail; } return buf->linear_cache; } ssize_t chain_buffer_send_fd(chain_buffer_t *buf, int fd, int flags) { + struct iovec iov[CHAINBUFFER_MAX_IOV]; + struct msghdr msg; + size_t iovcnt; + ssize_t n; + if (!buf) { errno = EINVAL; return -1; @@ -194,34 +427,277 @@ ssize_t chain_buffer_send_fd(chain_buffer_t *buf, int fd, int flags) { return 0; } - struct iovec iov[CHAINBUFFER_MAX_IOV]; - size_t iovcnt = 0; - + iovcnt = 0; for (chain_buffer_node_t *node = buf->head; node && iovcnt < CHAINBUFFER_MAX_IOV; node = node->next) { - size_t avail = node->end - node->start; - if (avail == 0) { - continue; + uint8_t *p1; + uint8_t *p2; + size_t l1; + size_t l2; + + node_read_segments(node, &p1, &l1, &p2, &l2); + if (l1 > 0) { + iov[iovcnt].iov_base = p1; + iov[iovcnt].iov_len = l1; + iovcnt++; + if (iovcnt >= CHAINBUFFER_MAX_IOV) { + break; + } + } + if (l2 > 0) { + iov[iovcnt].iov_base = p2; + iov[iovcnt].iov_len = l2; + iovcnt++; } - iov[iovcnt].iov_base = (void *)(node->data + node->start); - iov[iovcnt].iov_len = avail; - iovcnt++; } if (iovcnt == 0) { return 0; } - struct msghdr msg; memset(&msg, 0, sizeof(msg)); msg.msg_iov = iov; msg.msg_iovlen = iovcnt; - ssize_t n = sendmsg(fd, &msg, flags); + n = sendmsg(fd, &msg, flags); if (n > 0) { chain_buffer_drain(buf, (size_t)n); } return n; } + +int chain_buffer_prepare_recv_iov(chain_buffer_t *buf, struct iovec *iov, int max_iov) { + chain_buffer_node_t *tail; + uint8_t *p1; + uint8_t *p2; + size_t l1; + size_t l2; + int iovcnt; + + if (!buf || !iov || max_iov <= 0) { + errno = EINVAL; + return -1; + } + + if (!buf->tail || buf->tail->used == buf->tail->cap) { + if (append_new_tail(buf, buf->chunk_size) < 0) { + errno = ENOMEM; + return -1; + } + } + + tail = buf->tail; + node_write_segments(tail, &p1, &l1, &p2, &l2); + + iovcnt = 0; + if (l1 > 0) { + iov[iovcnt].iov_base = p1; + iov[iovcnt].iov_len = l1; + iovcnt++; + } + if (l2 > 0 && iovcnt < max_iov) { + iov[iovcnt].iov_base = p2; + iov[iovcnt].iov_len = l2; + iovcnt++; + } + + return iovcnt; +} + +size_t chain_buffer_commit_recv(chain_buffer_t *buf, size_t len) { + chain_buffer_node_t *tail; + size_t free_total; + + if (!buf || len == 0) { + return 0; + } + + if (!buf->tail || buf->tail->used == buf->tail->cap) { + return 0; + } + + tail = buf->tail; + free_total = tail->cap - tail->used; + if (len > free_total) { + len = free_total; + } + + node_advance_write(tail, len); + buf->total_len += len; + return len; +} + +void chain_buffer_list_init(chain_buffer_list_t *list) { + if (!list) { + return; + } + memset(list, 0, sizeof(*list)); +} + +size_t chain_buffer_list_len(const chain_buffer_list_t *list) { + return list ? list->total_len : 0; +} + +int chain_buffer_list_iov(const chain_buffer_list_t *list, struct iovec *iov, int max_iov) { + int iovcnt = 0; + + if (!list || !iov || max_iov <= 0) { + errno = EINVAL; + return -1; + } + + for (chain_buffer_node_t *node = list->head; node && iovcnt < max_iov; node = node->next) { + uint8_t *p1; + uint8_t *p2; + size_t l1; + size_t l2; + + node_read_segments(node, &p1, &l1, &p2, &l2); + if (l1 > 0) { + iov[iovcnt].iov_base = p1; + iov[iovcnt].iov_len = l1; + iovcnt++; + if (iovcnt >= max_iov) { + break; + } + } + if (l2 > 0) { + iov[iovcnt].iov_base = p2; + iov[iovcnt].iov_len = l2; + iovcnt++; + } + } + + return iovcnt; +} + +static int copy_prefix_from_node(chain_buffer_node_t *node, size_t len, uint8_t *dst) { + uint8_t *p1; + uint8_t *p2; + size_t l1; + size_t l2; + size_t c1; + + if (!node || !dst || len == 0 || len > node->used) { + return -1; + } + + node_read_segments(node, &p1, &l1, &p2, &l2); + c1 = min_size(len, l1); + if (c1 > 0) { + memcpy(dst, p1, c1); + } + if (len > c1) { + memcpy(dst + c1, p2, len - c1); + } + return 0; +} + +int chain_buffer_detach_prefix(chain_buffer_t *buf, size_t len, chain_buffer_list_t *out) { + size_t remain; + + if (!buf || !out) { + errno = EINVAL; + return -1; + } + + chain_buffer_list_init(out); + + if (len == 0) { + return 0; + } + if (len > buf->total_len) { + errno = EINVAL; + return -1; + } + + remain = len; + while (remain > 0 && buf->head) { + chain_buffer_node_t *node = buf->head; + + if (remain >= node->used) { + size_t take = node->used; + + buf->head = node->next; + if (!buf->head) { + buf->tail = NULL; + } + node->next = NULL; + node->refcnt = 1; + list_append_node(out, node); + out->total_len += take; + buf->total_len -= take; + remain -= take; + continue; + } + + { + chain_buffer_node_t *part = acquire_node(buf, remain); + if (!part) { + chain_buffer_list_release(buf, out); + errno = ENOMEM; + return -1; + } + + if (copy_prefix_from_node(node, remain, part->data) < 0) { + recycle_node(buf, part); + chain_buffer_list_release(buf, out); + errno = EINVAL; + return -1; + } + + part->used = remain; + part->wpos = remain % part->cap; + part->rpos = 0; + part->refcnt = 1; + part->next = NULL; + list_append_node(out, part); + out->total_len += remain; + + node_advance_read(node, remain); + buf->total_len -= remain; + remain = 0; + + if (node->used == 0) { + buf->head = node->next; + if (!buf->head) { + buf->tail = NULL; + } + recycle_node(buf, node); + } + } + } + + return 0; +} + +void chain_buffer_list_release(chain_buffer_t *owner, chain_buffer_list_t *list) { + chain_buffer_node_t *node; + + if (!list) { + return; + } + + node = list->head; + while (node) { + chain_buffer_node_t *next = node->next; + + if (node->refcnt > 0) { + node->refcnt--; + } + + if (node->refcnt == 0) { + if (owner) { + recycle_node(owner, node); + } else { + free(node); + } + } + + node = next; + } + + chain_buffer_list_init(list); +} diff --git a/network/chainbuffer.h b/network/chainbuffer.h index 78e3c3f..1a3c4db 100644 --- a/network/chainbuffer.h +++ b/network/chainbuffer.h @@ -4,6 +4,7 @@ #include #include #include +#include typedef struct chain_buffer_node chain_buffer_node_t; @@ -12,10 +13,21 @@ typedef struct chain_buffer_s { chain_buffer_node_t *tail; size_t total_len; size_t chunk_size; + + chain_buffer_node_t *free_list; + size_t free_count; + size_t free_limit; + uint8_t *linear_cache; size_t linear_cap; } chain_buffer_t; +typedef struct chain_buffer_list_s { + chain_buffer_node_t *head; + chain_buffer_node_t *tail; + size_t total_len; +} chain_buffer_list_t; + void chain_buffer_init(chain_buffer_t *buf, size_t chunk_size); void chain_buffer_reset(chain_buffer_t *buf); @@ -26,4 +38,15 @@ size_t chain_buffer_drain(chain_buffer_t *buf, size_t len); const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len); ssize_t chain_buffer_send_fd(chain_buffer_t *buf, int fd, int flags); +/* readv zero-copy recv helpers */ +int chain_buffer_prepare_recv_iov(chain_buffer_t *buf, struct iovec *iov, int max_iov); +size_t chain_buffer_commit_recv(chain_buffer_t *buf, size_t len); + +/* ownership transfer helpers */ +void chain_buffer_list_init(chain_buffer_list_t *list); +size_t chain_buffer_list_len(const chain_buffer_list_t *list); +int chain_buffer_list_iov(const chain_buffer_list_t *list, struct iovec *iov, int max_iov); +int chain_buffer_detach_prefix(chain_buffer_t *buf, size_t len, chain_buffer_list_t *out); +void chain_buffer_list_release(chain_buffer_t *owner, chain_buffer_list_t *list); + #endif diff --git a/reactor.c b/reactor.c index e6732f2..440566a 100644 --- a/reactor.c +++ b/reactor.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include "diskuring/diskuring.h" @@ -200,8 +201,17 @@ int recv_cb(int fd) { c = &conn_list[fd]; while (1) { - uint8_t tmp[RECV_BATCH_BYTES]; - ssize_t n = recv(fd, tmp, sizeof(tmp), 0); + struct iovec iov[4]; + int iovcnt = chain_buffer_prepare_recv_iov(&c->rbuf, iov, (int)(sizeof(iov) / sizeof(iov[0]))); + ssize_t n; + + if (iovcnt < 0) { + printf("fd=%d prepare recv iov failed: %s\n", fd, strerror(errno)); + close_conn(fd); + return 0; + } + + n = readv(fd, iov, iovcnt); if (n > 0) { size_t cur_len = chain_buffer_len(&c->rbuf); @@ -210,8 +220,8 @@ int recv_cb(int fd) { close_conn(fd); return 0; } - if (chain_buffer_append(&c->rbuf, tmp, (size_t)n) < 0) { - printf("fd=%d append read buffer failed: %s\n", fd, strerror(errno)); + if (chain_buffer_commit_recv(&c->rbuf, (size_t)n) != (size_t)n) { + printf("fd=%d commit recv buffer failed\n", fd); close_conn(fd); return 0; } diff --git a/test-redis/README.md b/test-redis/README.md new file mode 100644 index 0000000..57d6e2c --- /dev/null +++ b/test-redis/README.md @@ -0,0 +1,107 @@ +# test-redis 压测记录与优化对比(复测) + +## 先回答你的两个问题 + +### 1) 为什么之前看起来比 Redis 快很多? + +结论:之前是**单次样本**,抖动很大,不足以说明稳定结论。 +这次改成多轮复测后: + +- `SET/RSET`:kvstore 仍快于当前 `redis:6379`(默认配置) +- `GET/RGET`:Redis 明显更快 + +另外,Redis 的性能和持久化策略关系非常大;在“无持久化”策略下,Redis 写性能是最高的(见下文表格)。 + +### 2) 为什么 GET 的 keyspace 比 SET 小很多? + +这是有意的: + +- `SET/RSET` 压测为了避免键冲突(`RSET` 冲突会报错),使用超大 keyspace(`1e9`)。 +- `GET/RGET` 必须先 prefill 全部 keyspace,若也设为 `1e9`,预填充成本过高,不适合日常回归测试。 + +--- + +## 测试口径 + +- 时间:2026-03-04 +- 工具:`./test-redis/bench` +- kvstore 测试命令:`RSET/RGET` +- Redis 测试命令:`SET/GET` +- 通用参数: + - 写:`requests=10000 pipeline=128 keyspace=1000000000 value-size=32` + - 读:`requests=300000 pipeline=128 keyspace=100000 value-size=32` +- kvstore 复测时临时使用 `persistence=none`(避免历史 oplog 回放影响)。 + +--- + +## 优化项 #1:ChainBuffer 接收链路改造 + +改造点:`readv` 直写 + 回环 chunk + 节点池(减少接收路径中转拷贝)。 + +### A. 改造前后(kvstore) + +| 指标 | 改造前(旧记录,单次) | 改造后(本次,3轮均值) | 变化 | +|---|---:|---:|---:| +| RSET QPS | 260604 | 331063 | **+27.04%** | +| RGET QPS | 294951 | 288107 | **-2.32%** | + +> 说明:旧值来自此前同文档记录;新值是本次重跑 3 轮的均值,可信度更高。 + +### B. 本次 3 轮明细(kvstore) + +| 场景 | Round1 | Round2 | Round3 | 平均 | +|---|---:|---:|---:|---:| +| RSET QPS | 323041 | 352476 | 317673 | **331063** | +| RGET QPS | 271069 | 313658 | 279593 | **288107** | + +--- + +## Redis 对照(同口径复测) + +### A. 默认 Redis 实例(127.0.0.1:6379,3轮均值) + +| 场景 | Round1 | Round2 | Round3 | 平均 | +|---|---:|---:|---:|---:| +| SET QPS | 299221 | 130792 | 312117 | **247377** | +| GET QPS | 349242 | 343573 | 353091 | **348635** | + +### B. 与 kvstore 对比(本次均值) + +| 指标 | kvstore | redis:6379 | 相对变化(kvstore 对 redis) | +|---|---:|---:|---:| +| 写 QPS | 331063 | 247377 | **+33.83%** | +| 读 QPS | 288107 | 348635 | **-17.36%** | + +> 解释:这说明“kvstore 在当前写路径上有优势,但读路径仍落后 Redis”。 + +--- + +## Redis 持久化策略对比(写压测,SET) + +| 策略 | QPS | avg(us/op) | 备注 | +|---|---:|---:|---| +| `none`(无持久化) | **492561** | 2.03 | 最高吞吐(但不持久) | +| `rdb_default` | **285885** | 3.50 | 本次“持久化策略中最快” | +| `aof_no` | 281870 | 3.55 | AOF,`appendfsync no` | +| `aof_everysec` | 266878 | 3.75 | AOF,`appendfsync everysec` | +| `aof_always` | 110793 | 9.03 | 最慢,但最强一致性 | + +结论: + +- 如果包含“无持久化”,最快是 `none`。 +- 如果限定“必须持久化”,本次最快是 `rdb_default`(略快于 `aof_no`)。 + +--- + +## 复现命令(关键) + +```bash +# kvstore 写(RSET) +./test-redis/bench --host 127.0.0.1 --port 8888 --mode set --set-cmd RSET --get-cmd RGET --requests 10000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench::set: + +# kvstore 读(RGET) +./test-redis/bench --host 127.0.0.1 --port 8888 --mode get --set-cmd RSET --get-cmd RGET --requests 300000 --pipeline 128 --keyspace 100000 --value-size 32 --verify-get --key-prefix bench::get: + +# Redis 策略对比(示例:6381 配置成 rdb_default) +./test-redis/bench --host 127.0.0.1 --port 6381 --mode set --requests 10000 --pipeline 128 --keyspace 1000000000 --value-size 32 --key-prefix bench::redis: +``` diff --git a/test-redis/bench b/test-redis/bench new file mode 100755 index 0000000..3959e4e Binary files /dev/null and b/test-redis/bench differ diff --git a/test-redis/bench.c b/test-redis/bench.c new file mode 100644 index 0000000..247134d --- /dev/null +++ b/test-redis/bench.c @@ -0,0 +1,447 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +typedef enum { + MODE_SET = 0, + MODE_GET = 1, + MODE_MIXED = 2, +} bench_mode_t; + +typedef struct { + const char *host; + int port; + bench_mode_t mode; + uint64_t requests; + uint32_t pipeline; + uint32_t keyspace; + uint32_t value_size; + uint32_t set_ratio; + const char *set_cmd; + const char *get_cmd; + const char *key_prefix; + uint64_t seed; + int verify_get; +} bench_opts_t; + +typedef struct { + uint64_t set_ops; + uint64_t get_ops; + uint64_t errors; + double elapsed_sec; +} bench_result_t; + +static void usage(const char *prog) { + fprintf(stderr, + "Usage: %s [options]\n" + " --host default: 127.0.0.1\n" + " --port default: 6379\n" + " --mode default: mixed\n" + " --requests default: 1000000\n" + " --pipeline default: 64\n" + " --keyspace default: 100000\n" + " --value-size default: 32\n" + " --set-ratio <0..100> default: 50 (mixed mode only)\n" + " --set-cmd default: SET\n" + " --get-cmd default: GET\n" + " --key-prefix default: bench:key:\n" + " --seed default: time-based\n" + " --verify-get verify GET value content\n" + "\nExamples:\n" + " # Benchmark Redis\n" + " %s --host 127.0.0.1 --port 6379 --mode mixed --requests 2000000\n" + "\n" + " # Benchmark kvstore with Redis-compatible commands\n" + " %s --host 127.0.0.1 --port 8888 --mode mixed --requests 2000000\n" + "\n" + " # Benchmark kvstore RBTree path\n" + " %s --host 127.0.0.1 --port 8888 --mode mixed --set-cmd RSET --get-cmd RGET\n", + prog, prog, prog, prog); +} + +static void opts_init(bench_opts_t *o) { + memset(o, 0, sizeof(*o)); + o->host = "127.0.0.1"; + o->port = 6379; + o->mode = MODE_MIXED; + o->requests = 1000000; + o->pipeline = 64; + o->keyspace = 100000; + o->value_size = 32; + o->set_ratio = 50; + o->set_cmd = "SET"; + o->get_cmd = "GET"; + o->key_prefix = "bench:key:"; + o->seed = (uint64_t)time(NULL); + o->verify_get = 0; +} + +static int parse_u64(const char *s, uint64_t *out) { + char *end = NULL; + unsigned long long v; + errno = 0; + v = strtoull(s, &end, 10); + if (errno != 0 || end == s || *end != 0) { + return -1; + } + *out = (uint64_t)v; + return 0; +} + +static int parse_u32(const char *s, uint32_t *out) { + uint64_t v = 0; + if (parse_u64(s, &v) != 0 || v > UINT32_MAX) { + return -1; + } + *out = (uint32_t)v; + return 0; +} + +static int parse_args(int argc, char **argv, bench_opts_t *o) { + int i; + for (i = 1; i < argc; i++) { + if (strcmp(argv[i], "--host") == 0 && i + 1 < argc) { + o->host = argv[++i]; + } else if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) { + uint32_t p = 0; + if (parse_u32(argv[++i], &p) != 0 || p == 0 || p > 65535) { + return -1; + } + o->port = (int)p; + } else if (strcmp(argv[i], "--mode") == 0 && i + 1 < argc) { + const char *m = argv[++i]; + if (strcmp(m, "set") == 0) { + o->mode = MODE_SET; + } else if (strcmp(m, "get") == 0) { + o->mode = MODE_GET; + } else if (strcmp(m, "mixed") == 0) { + o->mode = MODE_MIXED; + } else { + return -1; + } + } else if (strcmp(argv[i], "--requests") == 0 && i + 1 < argc) { + if (parse_u64(argv[++i], &o->requests) != 0 || o->requests == 0) { + return -1; + } + } else if (strcmp(argv[i], "--pipeline") == 0 && i + 1 < argc) { + if (parse_u32(argv[++i], &o->pipeline) != 0 || o->pipeline == 0) { + return -1; + } + } else if (strcmp(argv[i], "--keyspace") == 0 && i + 1 < argc) { + if (parse_u32(argv[++i], &o->keyspace) != 0 || o->keyspace == 0) { + return -1; + } + } else if (strcmp(argv[i], "--value-size") == 0 && i + 1 < argc) { + if (parse_u32(argv[++i], &o->value_size) != 0 || o->value_size == 0) { + return -1; + } + } else if (strcmp(argv[i], "--set-ratio") == 0 && i + 1 < argc) { + if (parse_u32(argv[++i], &o->set_ratio) != 0 || o->set_ratio > 100) { + return -1; + } + } else if (strcmp(argv[i], "--set-cmd") == 0 && i + 1 < argc) { + o->set_cmd = argv[++i]; + } else if (strcmp(argv[i], "--get-cmd") == 0 && i + 1 < argc) { + o->get_cmd = argv[++i]; + } else if (strcmp(argv[i], "--key-prefix") == 0 && i + 1 < argc) { + o->key_prefix = argv[++i]; + } else if (strcmp(argv[i], "--seed") == 0 && i + 1 < argc) { + if (parse_u64(argv[++i], &o->seed) != 0) { + return -1; + } + } else if (strcmp(argv[i], "--verify-get") == 0) { + o->verify_get = 1; + } else if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0) { + return 1; + } else { + return -1; + } + } + return 0; +} + +static uint64_t mono_ns(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; +} + +static uint64_t xorshift64(uint64_t *state) { + uint64_t x = *state; + x ^= x << 13; + x ^= x >> 7; + x ^= x << 17; + *state = x; + return x; +} + +static int append_set(redisContext *c, const char *cmd, + const char *key, size_t key_len, + const char *val, size_t val_len) { + const char *argv[3]; + size_t argvlen[3]; + argv[0] = cmd; + argvlen[0] = strlen(cmd); + argv[1] = key; + argvlen[1] = key_len; + argv[2] = val; + argvlen[2] = val_len; + return redisAppendCommandArgv(c, 3, argv, argvlen); +} + +static int append_get(redisContext *c, const char *cmd, + const char *key, size_t key_len) { + const char *argv[2]; + size_t argvlen[2]; + argv[0] = cmd; + argvlen[0] = strlen(cmd); + argv[1] = key; + argvlen[1] = key_len; + return redisAppendCommandArgv(c, 2, argv, argvlen); +} + +static int consume_set_reply(redisContext *c) { + redisReply *r = NULL; + if (redisGetReply(c, (void **)&r) != REDIS_OK || r == NULL) { + return -1; + } + if (r->type != REDIS_REPLY_STATUS || r->str == NULL || strcasecmp(r->str, "OK") != 0) { + freeReplyObject(r); + return -1; + } + freeReplyObject(r); + return 0; +} + +static int consume_get_reply(redisContext *c, const char *expect, size_t expect_len, int verify) { + redisReply *r = NULL; + if (redisGetReply(c, (void **)&r) != REDIS_OK || r == NULL) { + return -1; + } + + if (r->type == REDIS_REPLY_STRING) { + if (verify && ((size_t)r->len != expect_len || memcmp(r->str, expect, expect_len) != 0)) { + freeReplyObject(r); + return -1; + } + freeReplyObject(r); + return 0; + } + + if (r->type == REDIS_REPLY_NIL) { + freeReplyObject(r); + return verify ? -1 : 0; + } + + freeReplyObject(r); + return -1; +} + +static int prefill(redisContext *c, const bench_opts_t *o, const char *value, size_t value_len) { + uint64_t i = 0; + char key[256]; + + while (i < o->keyspace) { + uint32_t batch = o->pipeline; + if ((uint64_t)batch > (uint64_t)o->keyspace - i) { + batch = (uint32_t)((uint64_t)o->keyspace - i); + } + + for (uint32_t j = 0; j < batch; j++) { + int klen = snprintf(key, sizeof(key), "%s%" PRIu64, o->key_prefix, i + j); + if (klen <= 0 || (size_t)klen >= sizeof(key)) { + return -1; + } + if (append_set(c, o->set_cmd, key, (size_t)klen, value, value_len) != REDIS_OK) { + return -1; + } + } + + for (uint32_t j = 0; j < batch; j++) { + if (consume_set_reply(c) != 0) { + return -1; + } + } + + i += batch; + } + + return 0; +} + +/* + * Keep append/reply operation choices consistent in each batch by building an op mask. + * This wrapper keeps implementation simple and avoids per-op heap allocation. + */ +static int run_bench_with_mask(redisContext *c, const bench_opts_t *o, + const char *value, size_t value_len, + bench_result_t *res) { + uint64_t done = 0; + uint64_t rng = o->seed ? o->seed : 1; + char key[256]; + uint8_t *opmask = NULL; + uint64_t begin_ns; + + memset(res, 0, sizeof(*res)); + opmask = (uint8_t *)malloc(o->pipeline); + if (!opmask) { + return -1; + } + + begin_ns = mono_ns(); + + while (done < o->requests) { + uint32_t batch = o->pipeline; + if ((uint64_t)batch > o->requests - done) { + batch = (uint32_t)(o->requests - done); + } + + for (uint32_t i = 0; i < batch; i++) { + uint64_t rnd = xorshift64(&rng); + uint64_t key_id = rnd % o->keyspace; + int is_set = 0; + int klen; + + if (o->mode == MODE_SET) { + is_set = 1; + } else if (o->mode == MODE_GET) { + is_set = 0; + } else { + is_set = (rnd % 100) < o->set_ratio; + } + opmask[i] = (uint8_t)is_set; + + klen = snprintf(key, sizeof(key), "%s%" PRIu64, o->key_prefix, key_id); + if (klen <= 0 || (size_t)klen >= sizeof(key)) { + free(opmask); + return -1; + } + + if (is_set) { + if (append_set(c, o->set_cmd, key, (size_t)klen, value, value_len) != REDIS_OK) { + free(opmask); + return -1; + } + res->set_ops++; + } else { + if (append_get(c, o->get_cmd, key, (size_t)klen) != REDIS_OK) { + free(opmask); + return -1; + } + res->get_ops++; + } + } + + for (uint32_t i = 0; i < batch; i++) { + int rc = opmask[i] ? consume_set_reply(c) + : consume_get_reply(c, value, value_len, o->verify_get); + if (rc != 0) { + res->errors++; + free(opmask); + return -1; + } + } + + done += batch; + } + + res->elapsed_sec = (double)(mono_ns() - begin_ns) / 1e9; + free(opmask); + return 0; +} + +int main(int argc, char **argv) { + bench_opts_t opts; + bench_result_t result; + redisContext *ctx; + struct timeval timeout; + char *value; + size_t value_len; + int parse_rc; + + opts_init(&opts); + parse_rc = parse_args(argc, argv, &opts); + if (parse_rc == 1) { + usage(argv[0]); + return 0; + } + if (parse_rc != 0) { + usage(argv[0]); + return 2; + } + + timeout.tv_sec = 3; + timeout.tv_usec = 0; + ctx = redisConnectWithTimeout(opts.host, opts.port, timeout); + if (!ctx || ctx->err) { + fprintf(stderr, "connect %s:%d failed: %s\n", opts.host, opts.port, + ctx ? ctx->errstr : "oom"); + if (ctx) { + redisFree(ctx); + } + return 1; + } + + value_len = opts.value_size; + value = (char *)malloc(value_len); + if (!value) { + fprintf(stderr, "malloc value buffer failed\n"); + redisFree(ctx); + return 1; + } + for (size_t i = 0; i < value_len; i++) { + value[i] = (char)('a' + (int)(i % 26)); + } + + if (opts.mode != MODE_SET) { + fprintf(stdout, "[prefill] keyspace=%u using %s\n", opts.keyspace, opts.set_cmd); + if (prefill(ctx, &opts, value, value_len) != 0) { + fprintf(stderr, "prefill failed, err=%s\n", ctx->err ? ctx->errstr : "unknown"); + free(value); + redisFree(ctx); + return 1; + } + } + + fprintf(stdout, + "[bench] target=%s:%d mode=%s requests=%" PRIu64 + " pipeline=%u keyspace=%u value_size=%u set_cmd=%s get_cmd=%s\n", + opts.host, opts.port, + opts.mode == MODE_SET ? "set" : (opts.mode == MODE_GET ? "get" : "mixed"), + opts.requests, opts.pipeline, opts.keyspace, opts.value_size, + opts.set_cmd, opts.get_cmd); + + if (run_bench_with_mask(ctx, &opts, value, value_len, &result) != 0) { + fprintf(stderr, "benchmark failed, err=%s\n", ctx->err ? ctx->errstr : "reply mismatch"); + free(value); + redisFree(ctx); + return 1; + } + + { + double qps = result.elapsed_sec > 0 ? (double)(result.set_ops + result.get_ops) / result.elapsed_sec : 0.0; + double avg_us = (result.set_ops + result.get_ops) > 0 + ? (result.elapsed_sec * 1e6) / (double)(result.set_ops + result.get_ops) + : 0.0; + fprintf(stdout, + "[result] elapsed=%.3fs total=%" PRIu64 " set=%" PRIu64 " get=%" PRIu64 + " errors=%" PRIu64 " qps=%.0f avg=%.2fus/op\n", + result.elapsed_sec, + result.set_ops + result.get_ops, + result.set_ops, + result.get_ops, + result.errors, + qps, + avg_us); + } + + free(value); + redisFree(ctx); + return 0; +}