diff --git a/README.md b/README.md index 917883e..7c3e1c5 100644 --- a/README.md +++ b/README.md @@ -353,8 +353,15 @@ ALL TESTS PASSED. 10. 能够跟哪些系统交互使用? -### 架构设计 -![image](https://disk.0voice.com/p/py) - - - +## 项目收获 +reactor网络模型,用户态网络缓冲区的写法。\ +特殊字符串支持的引擎层数据结构设计,支持\0作为键值存储。\ +实现RESP协议的服务端协议解析。\ +使用fork的Copy On Write机制,实现的异步快照创建,不会受到原字符串的影响。\ +基于BinLog上OffSet的主从同步设计。\ +基于bpf的实时数据同步设计。\ +基于共享缓冲区+额外进程的实时数据同步设计。\ +基于bpf的内存泄露探测功能,实现热插拔。\ +实现支持分配可变长度内存块的内存池。\ +实现专门uring线程实现异步的增量、全量落盘操作。\ +使用配置文件规定端口、保存文件路径等,使用mmap加载到内存,使用libxml解析。\ \ No newline at end of file diff --git a/config/config.xml b/config/config.xml index df40a51..f13c407 100644 --- a/config/config.xml +++ b/config/config.xml @@ -18,7 +18,7 @@ - none + incremental data kvs_oplog.db diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index 5914653..b4ad9be 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -3,7 +3,8 @@ #include #include -static destroy_queue_t g_destroy_queue = {NULL, PTHREAD_MUTEX_INITIALIZER}; +static destroy_queue_t g_destroy_queue = {ATOMIC_VAR_INIT(NULL)}; +static destroy_queue_t g_submit_queue = {ATOMIC_VAR_INIT(NULL)}; static long long push_to_queue = 0; static long long push_to_sqe = 0; @@ -13,8 +14,6 @@ static long long release_cnt = 0; void task_init(task_t *t) { push_to_queue ++; - pthread_mutex_init(&t->m, NULL); - pthread_cond_init(&t->cv, NULL); t->done = 0; t->res = 0; t->next = NULL; @@ -22,27 +21,18 @@ void task_init(task_t *t) void task_finish(task_t *t, int res) { - pthread_mutex_lock(&t->m); t->res = res; t->done = 1; - pthread_cond_broadcast(&t->cv); - pthread_mutex_unlock(&t->m); } int task_wait(task_t *t) { - pthread_mutex_lock(&t->m); - while (!t->done) - pthread_cond_wait(&t->cv, &t->m); - int r = t->res; - pthread_mutex_unlock(&t->m); - return r; + } void task_destroy(task_t *t) { - pthread_mutex_destroy(&t->m); - pthread_cond_destroy(&t->cv); + if (t->iovs) { for (int i = 0; i < t->iovcnt; i++) { if (t->iovs[i].iov_base) { @@ -55,49 +45,71 @@ void task_destroy(task_t *t) kvs_free(t); } -static void queue_push(iouring_ctx_t *ctx, task_t *t) +static void submit_queue_push(iouring_ctx_t *ctx, task_t *t) { - pthread_mutex_lock(&ctx->q_m); - if (ctx->q_tail) - ctx->q_tail->next = t; - else - ctx->q_head = t; - ctx->q_tail = t; - pthread_cond_signal(&ctx->q_cv); - pthread_mutex_unlock(&ctx->q_m); + task_t *old_head; + do { + old_head = atomic_load_explicit(&g_submit_queue.head, memory_order_relaxed); + t->next = old_head; + } while (!atomic_compare_exchange_weak_explicit( + &g_submit_queue.head, &old_head, t, + memory_order_release, + memory_order_relaxed)); + + /* 若之前队列为空,通知 worker */ + if (old_head == NULL) { + uint64_t val = 1; + write(ctx->event_fd, &val, sizeof(val)); + } } -static task_t *queue_pop(iouring_ctx_t *ctx) +static task_t *submit_steal_all(iouring_ctx_t *ctx) { - pthread_mutex_lock(&ctx->q_m); - task_t *t = ctx->q_head; - if (t) { - ctx->q_head = t->next; - if (!ctx->q_head) { - ctx->q_tail = NULL; - } - t->next = NULL; - } - pthread_mutex_unlock(&ctx->q_m); - return t; + return atomic_exchange_explicit(&g_submit_queue.head, NULL, + memory_order_acquire); } -static void queue_push_front(iouring_ctx_t *ctx, task_t *t) +static void submit_queue_putback(iouring_ctx_t *ctx, task_t *head) { - pthread_mutex_lock(&ctx->q_m); - t->next = ctx->q_head; - ctx->q_head = t; - if (!ctx->q_tail) { - ctx->q_tail = t; + while (head) { + task_t *nxt = head->next; + submit_queue_push(ctx, head); + head = nxt; } - pthread_mutex_unlock(&ctx->q_m); } +static int submit_queue_empty(iouring_ctx_t *ctx) +{ + return atomic_load_explicit(&g_submit_queue.head, + memory_order_acquire) == NULL; +} + +static void destroy_queue_push(task_t *t) +{ + task_t *old_head; + do { + old_head = atomic_load_explicit(&g_destroy_queue.head, memory_order_relaxed); + t->next = old_head; + } while (!atomic_compare_exchange_strong_explicit( + &g_destroy_queue.head, + &old_head, + t, + memory_order_release, + memory_order_relaxed)); +} + +static task_t *destroy_queue_steal_all(void) +{ + return atomic_exchange_explicit(&g_destroy_queue.head, NULL, memory_order_acquire); +} + +/* =============================================================================================== */ + extern void sync_wakeup(); static void *worker_main(void *arg) { iouring_ctx_t *ctx = (iouring_ctx_t *)arg; - const int BATCH_SIZE = 256; // 每次最多准备这么多,防止一次占满 SQ + const int BATCH_SIZE = 512; // 每次最多准备这么多,防止一次占满 SQ while (!ctx->stop) { @@ -115,6 +127,9 @@ static void *worker_main(void *arg) unsigned head; io_uring_for_each_cqe(&ctx->ring, head, cqe) { + if (cqe->user_data == 0) { + continue; + } task_t *done = (task_t *)(uintptr_t)cqe->user_data; atomic_fetch_sub(&ctx->in_flight, 1); task_finish(done, cqe->res); @@ -123,10 +138,7 @@ static void *worker_main(void *arg) fprintf(stderr, "write fail: fd=%d res=%d, offset=%ld\n", done->fd, cqe->res, done->off); } - pthread_mutex_lock(&g_destroy_queue.lock); - done->next = g_destroy_queue.head; - g_destroy_queue.head = done; - pthread_mutex_unlock(&g_destroy_queue.lock); + destroy_queue_push(done); get_from_cqe++; cq_count++; @@ -145,19 +157,25 @@ static void *worker_main(void *arg) // ========== 2. 批量准备 SQE ========== int batch_count = 0; - while (true) { - int current_in_flight = atomic_load(&ctx->in_flight); - if (current_in_flight >= ctx->max_in_flight) { + task_t *pending = submit_steal_all(ctx); + while (pending) { + if (atomic_load(&ctx->in_flight) >= ctx->max_in_flight) { + submit_queue_putback(ctx, pending); + pending = NULL; break; // 满了,停止取任务 } - task_t *t = queue_pop(ctx); - if (!t) break; + struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); if (!sqe) { - queue_push_front(ctx, t); + submit_queue_putback(ctx, pending); + pending = NULL; break; } + task_t *t = pending; + pending = pending->next; + t->next = NULL; + io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); sqe->user_data = (uint64_t)(uintptr_t)t; batch_count++; @@ -168,24 +186,32 @@ static void *worker_main(void *arg) int submitted = io_uring_submit(&ctx->ring); push_to_sqe += submitted; atomic_fetch_add(&ctx->in_flight, submitted); - continue; } // ========== 4. 没事做就等待 ========== - if (batch_count == 0) { - int inflight = atomic_load(&ctx->in_flight); - if (inflight > 0) { - // 有任务在飞,等一个CQE - continue; - } else { - // 真没事了,等新任务 - pthread_mutex_lock(&ctx->q_m); - while (ctx->q_head == NULL && !ctx->stop) { - pthread_cond_wait(&ctx->q_cv, &ctx->q_m); + int inflight = atomic_load(&ctx->in_flight); + if (inflight > 0) { + io_uring_submit_and_wait(&ctx->ring, 1); + // 有任务在飞,等一个CQE + continue; + } else { + // 真没事了,等新任务 + if (submit_queue_empty(ctx)) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); + if (sqe) { + uint64_t buf; + io_uring_prep_read(sqe, ctx->event_fd, &buf, sizeof(buf), 0); + sqe->user_data = 0; // Special for event + io_uring_submit_and_wait(&ctx->ring, 1); + // After wait, consume the cqe + struct io_uring_cqe *cqe; + io_uring_peek_cqe(&ctx->ring, &cqe); + if (cqe && cqe->user_data == 0) { + io_uring_cq_advance(&ctx->ring, 1); + } } - pthread_mutex_unlock(&ctx->q_m); } } } @@ -201,10 +227,7 @@ static void *worker_main(void *arg) atomic_fetch_sub(&ctx->in_flight, 1); task_finish(done, cqe->res); - pthread_mutex_lock(&g_destroy_queue.lock); - done->next = g_destroy_queue.head; - g_destroy_queue.head = done; - pthread_mutex_unlock(&g_destroy_queue.lock); + destroy_queue_push(done); get_from_cqe++; final_cq++; @@ -229,16 +252,21 @@ static void *worker_main(void *arg) int iouring_init(iouring_ctx_t *ctx, unsigned entries) { memset(ctx, 0, sizeof(*ctx)); - pthread_mutex_init(&ctx->q_m, NULL); - pthread_cond_init(&ctx->q_cv, NULL); ctx->stop = 0; + atomic_init(&g_submit_queue.head, NULL); + atomic_init(&g_destroy_queue.head, NULL); + + + ctx->event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (ctx->event_fd < 0) { + // Handle error + return -errno; + } + struct io_uring_params params; memset(¶ms, 0, sizeof(params)); - // params.flags |= IORING_SETUP_CQSIZE; - // params.cq_entries = 256 * 1024; - // params.sq_entries = 128 * 1024; int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); if (ret < 0) { fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n", @@ -263,16 +291,15 @@ int iouring_init(iouring_ctx_t *ctx, unsigned entries) void iouring_shutdown(iouring_ctx_t *ctx) { - pthread_mutex_lock(&ctx->q_m); ctx->stop = 1; - pthread_cond_broadcast(&ctx->q_cv); - pthread_mutex_unlock(&ctx->q_m); + uint64_t val = 1; + write(ctx->event_fd, &val, sizeof(val)); + pthread_join(ctx->th, NULL); io_uring_queue_exit(&ctx->ring); - pthread_mutex_destroy(&ctx->q_m); - pthread_cond_destroy(&ctx->q_cv); + close(ctx->event_fd); } task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){ @@ -304,37 +331,35 @@ task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int memcpy(buf, bufs[i], len); t->iovs[i].iov_base = buf; t->iovs[i].iov_len = len; + + // t->iovs[i].iov_base = bufs[i]; + // t->iovs[i].iov_len = lens[i]; + } t->iovcnt = count; - queue_push(ctx, t); + submit_queue_push(ctx, t); return t; } int uring_task_complete(iouring_ctx_t *ctx){ - pthread_mutex_lock(&ctx->q_m); - int notask = ctx->q_head == NULL; - pthread_mutex_unlock(&ctx->q_m); - int noflight = atomic_load(&ctx->in_flight); - // printf("%d\n", noflight); - return (noflight == 0) && notask; + return submit_queue_empty(ctx) && atomic_load(&ctx->in_flight) == 0; } // 主线程定期调用此函数清理 void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { - pthread_mutex_lock(&g_destroy_queue.lock); - task_t *list = g_destroy_queue.head; - g_destroy_queue.head = NULL; - pthread_mutex_unlock(&g_destroy_queue.lock); + task_t *list = destroy_queue_steal_all(); int cnt = 0; - while (list) { - cnt ++; - task_t *next = list->next; - task_destroy(list); // 在主线程执行销毁 - list = next; + task_t *cur = list; + while (cur) { + cnt++; + task_t *next = cur->next; + task_destroy(cur); + cur = next; } + release_cnt += cnt; // printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt); } \ No newline at end of file diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h index 4f985d2..f5b5942 100644 --- a/diskuring/diskuring.h +++ b/diskuring/diskuring.h @@ -9,7 +9,7 @@ #include #include #include - +#include typedef enum { TASK_READ, TASK_WRITE } task_op_t; @@ -24,30 +24,28 @@ typedef struct task { struct iovec *iovs; // iovec 数组 int iovcnt; // iovec 数量 - pthread_mutex_t m; - pthread_cond_t cv; - struct task *next; } task_t; +typedef struct { + _Atomic(task_t *) head; +} task_stack_t; + +typedef struct { + _Atomic(task_t *) head; +} destroy_queue_t; + typedef struct { struct io_uring ring; pthread_t th; - pthread_mutex_t q_m; - pthread_cond_t q_cv; - task_t *q_head, *q_tail; + int event_fd; int stop; - atomic_int in_flight; + _Atomic int in_flight; int max_in_flight; } iouring_ctx_t; -typedef struct { - task_t *head; - pthread_mutex_t lock; -} destroy_queue_t; - void task_init(task_t *t); void task_finish(task_t *t, int res); diff --git a/dump/kvs_dump.h b/dump/kvs_dump.h index bab43b3..58fffe8 100644 --- a/dump/kvs_dump.h +++ b/dump/kvs_dump.h @@ -10,8 +10,8 @@ extern char global_rbtree_file[256]; extern char global_hash_file[256]; int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file); -int kvs_create_snapshot_async(const char *ip, int port); - +int kvs_create_snapshot_async_1(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file); +int kvs_create_snapshot_async_2(const char *ip, int port); extern int global_oplog_fd; diff --git a/dump/kvs_snapshot.c b/dump/kvs_snapshot.c index 17d8564..8930225 100644 --- a/dump/kvs_snapshot.c +++ b/dump/kvs_snapshot.c @@ -40,6 +40,24 @@ int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char return ret; } +int kvs_create_snapshot_async_1(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file){ + pid_t pid = fork(); + if (pid == -1) { perror("fork"); return -1; } + + if (pid == 0) { + int ret = kvs_create_snapshot(uring, array_file, rbtree_file, hash_file); + if (ret != 0) { + fprintf(stderr, "snapshot creation failed\n"); + _exit(1); + } + + _exit(0); + } else { + + return 0; + } +} + static int send_file_to_ipport(const char *ip, int port, const char *filename) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket"); return -1; } @@ -86,7 +104,7 @@ static int send_file_to_ipport(const char *ip, int port, const char *filename) { return 0; } -int kvs_create_snapshot_async(const char *ip, int port){ +int kvs_create_snapshot_async_2(const char *ip, int port){ pid_t pid = fork(); if (pid == -1) { perror("fork"); return -1; } diff --git a/ebpf/c/Makefile b/ebpf/c/Makefile index 5580d5d..3dab8ea 100644 --- a/ebpf/c/Makefile +++ b/ebpf/c/Makefile @@ -1,12 +1,12 @@ # SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) OUTPUT := .output CLANG ?= clang -LIBBPF_SRC := $(abspath ../../libbpf/src) -BPFTOOL_SRC := $(abspath ../../bpftool/src) +LIBBPF_SRC := $(abspath ../../libbpf-bootstrap/libbpf/src) +BPFTOOL_SRC := $(abspath ../../libbpf-bootstrap/bpftool/src) LIBBPF_OBJ := $(abspath $(OUTPUT)/libbpf.a) BPFTOOL_OUTPUT ?= $(abspath $(OUTPUT)/bpftool) BPFTOOL ?= $(BPFTOOL_OUTPUT)/bootstrap/bpftool -LIBBLAZESYM_SRC := $(abspath ../../blazesym/) +LIBBLAZESYM_SRC := $(abspath ../../libbpf-bootstrap/blazesym/) LIBBLAZESYM_INC := $(abspath $(LIBBLAZESYM_SRC)/capi/include) LIBBLAZESYM_OBJ := $(abspath $(OUTPUT)/libblazesym_c.a) ARCH ?= $(shell uname -m | sed 's/x86_64/x86/' \ @@ -16,11 +16,11 @@ ARCH ?= $(shell uname -m | sed 's/x86_64/x86/' \ | sed 's/mips.*/mips/' \ | sed 's/riscv64/riscv/' \ | sed 's/loongarch64/loongarch/') -VMLINUX := ../../vmlinux.h/include/$(ARCH)/vmlinux.h +VMLINUX := ../../libbpf-bootstrap/vmlinux.h/include/$(ARCH)/vmlinux.h # Use our own libbpf API headers and Linux UAPI headers distributed with # libbpf to avoid dependency on system-wide headers, which could be missing or # outdated -INCLUDES := -I$(OUTPUT) -I../../libbpf/include/uapi -I$(dir $(VMLINUX)) -I$(LIBBLAZESYM_INC) +INCLUDES := -I$(OUTPUT) -I../../libbpf-bootstrap/libbpf/include/uapi -I$(dir $(VMLINUX)) -I$(LIBBLAZESYM_INC) CFLAGS := -g -Wall ALL_LDFLAGS := $(LDFLAGS) $(EXTRA_LDFLAGS) diff --git a/img/主从同步.png b/img/主从同步.png new file mode 100755 index 0000000..903b927 Binary files /dev/null and b/img/主从同步.png differ diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index 49dfb75..ee0225e 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -539,7 +539,7 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { /* ---------------- misc ---------------- */ case KVS_CMD_SAVE: { if (cmd->argc != 1) { *out_value = resp_error("ERR wrong number of arguments for 'save'"); return 0; } - int r = kvs_create_snapshot(&global_uring_ctx, global_array_file, global_rbtree_file, global_hash_file); + int r = kvs_create_snapshot_async_1(&global_uring_ctx, global_array_file, global_rbtree_file, global_hash_file); if(r == 0) ksv_clear_log(global_oplog_fd); if (r < 0) { *out_value = resp_error("ERR save failed"); return 0; } *out_value = resp_simple("OK"); @@ -547,7 +547,7 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { } case KVS_CMD_SSYNC: __ssync(cmd->argv[1].ptr, cmd->argv[1].len, atoi(cmd->argv[2].ptr), global_seq); - kvs_create_snapshot_async(cmd->argv[1].ptr, atoi(cmd->argv[2].ptr)); + kvs_create_snapshot_async_2(cmd->argv[1].ptr, atoi(cmd->argv[2].ptr)); *out_value = resp_simple("OK"); return 0; case KVS_CMD_SREADY: diff --git a/kvstore.c b/kvstore.c index 9d602bd..91bb092 100644 --- a/kvstore.c +++ b/kvstore.c @@ -45,7 +45,14 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ // asm volatile("" ::: "memory"); // } +#include +#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) +#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) int kvs_protocol(struct conn* conn){ + // struct timeval func_start; + // gettimeofday(&func_start, NULL); + // long total_oplog_us = 0; + if (!conn) return -1; char *request = conn->rbuffer; int request_length = conn->rlength; @@ -80,65 +87,80 @@ int kvs_protocol(struct conn* conn){ int dr = resp_dispatch(&cmd, &val); + + // if(global_cfg.persistence == PERSIST_INCREMENTAL){ + // kvs_oplog_append(p, len, global_oplog_fd); + // } + /* * 语义建议: * - resp_dispatch() 即使返回 -1(比如 unknown command / wrong argc), * 一般也已经把 out_value 设置成了 RESP error,这样客户端能收到错误响应。 * - 如果 dr < 0 但 val.type 没被正确设置,兜底回一个通用错误。 */ - if(dr < 0){ - if (val.type != RESP_T_SIMPLE_STR && - val.type != RESP_T_ERROR && - val.type != RESP_T_INTEGER && - val.type != RESP_T_BULK_STR && - val.type != RESP_T_NIL) { - val = resp_error("ERR dispatch failed"); - } - } else { - // persist into oplog - /* 执行成功:在这里保存到日志中(只记录更新类命令) */ - if (cmd.argc > 0 && cmd.argv[0].ptr) { - /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ - const resp_slice_t *c0 = &cmd.argv[0]; - int is_update = 0; - if (c0->ptr && c0->len) { - if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { - is_update = 1; - } - } + // struct timeval oplog_start, oplog_end; + // gettimeofday(&oplog_start, NULL); - if (is_update) { - if(global_cfg.persistence == PERSIST_INCREMENTAL){ - kvs_oplog_append(p, len, global_oplog_fd); - } + // if(dr < 0){ + // if (val.type != RESP_T_SIMPLE_STR && + // val.type != RESP_T_ERROR && + // val.type != RESP_T_INTEGER && + // val.type != RESP_T_BULK_STR && + // val.type != RESP_T_NIL) { + // val = resp_error("ERR dispatch failed"); + // } + // } else { + // // persist into oplog + // /* 执行成功:在这里保存到日志中(只记录更新类命令) */ + // if (cmd.argc > 0 && cmd.argv[0].ptr) { + // /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ + // const resp_slice_t *c0 = &cmd.argv[0]; + // int is_update = 0; + // if (c0->ptr && c0->len) { + // if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || + // ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { + // is_update = 1; + // } + // } - // __completed_cmd(p, len, global_seq); - // global_seq ++; + // if (is_update) { + // if(global_cfg.persistence == PERSIST_INCREMENTAL){ + // kvs_oplog_append(p, len, global_oplog_fd); + // } + + // // __completed_cmd(p, len, global_seq); + // // global_seq ++; - if (global_cfg.replica_mode == REPLICA_ENABLE) { - uint32_t off = 0; - int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); - if (ar == 0) { - // __replica_notify(global_seq, off, (uint32_t)len); - global_seq++; + // if (global_cfg.replica_mode == REPLICA_ENABLE) { + // uint32_t off = 0; + // int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); + // if (ar == 0) { + // // __replica_notify(global_seq, off, (uint32_t)len); + // global_seq++; - } else { - // shm 满或异常:你可以选择降级(比如直接跳过复制,或阻塞/丢弃) - // 为了不影响主路径,这里先打印并跳过 - fprintf(stderr, "replica_shm_append failed %d\n", ar); - } - } - } - } - } + // } else { + // // shm 满或异常:你可以选择降级(比如直接跳过复制,或阻塞/丢弃) + // // 为了不影响主路径,这里先打印并跳过 + // fprintf(stderr, "replica_shm_append failed %d\n", ar); + // } + // } + // } + // } + // } + + if(global_cfg.persistence == PERSIST_INCREMENTAL){ + kvs_oplog_append(p, len, global_oplog_fd); + } + // gettimeofday(&oplog_end, NULL); + // total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 + + // (oplog_end.tv_usec - oplog_start.tv_usec); /* 构建响应 */ int cap = KVS_MAX_RESPONSE - out_len; @@ -157,6 +179,13 @@ int kvs_protocol(struct conn* conn){ consumed += len; } + + // struct timeval func_end; + // gettimeofday(&func_end, NULL); + // long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 + + // (func_end.tv_usec - func_start.tv_usec); + // fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); + *response_length = out_len; return consumed; } diff --git a/memory/mempool.c b/memory/mempool.c index f06e2bf..70a12c6 100644 --- a/memory/mempool.c +++ b/memory/mempool.c @@ -27,29 +27,29 @@ static inline int size_to_index(size_t size){ // bitmap 操作函数 -static inline void bitmap_set(uint64_t *bitmap, uint16_t index){ - bitmap[index / 64] |= (1ULL << (index % 64)); -} +// static inline void bitmap_set(uint64_t *bitmap, uint16_t index){ +// bitmap[index / 64] |= (1ULL << (index % 64)); +// } -static inline void bitmap_clear(uint64_t *bitmap, uint16_t index){ - bitmap[index / 64] &= ~(1ULL << (index % 64)); -} +// static inline void bitmap_clear(uint64_t *bitmap, uint16_t index){ +// bitmap[index / 64] &= ~(1ULL << (index % 64)); +// } -static inline int bitmap_test(uint64_t *bitmap, uint16_t index){ - return (bitmap[index / 64] & (1ULL << (index % 64))) != 0; -} +// static inline int bitmap_test(uint64_t *bitmap, uint16_t index){ +// return (bitmap[index / 64] & (1ULL << (index % 64))) != 0; +// } -static inline void bitmap_clear_all(uint64_t *bitmap, size_t size){ - memset(bitmap, 0, size * sizeof(uint64_t)); -} +// static inline void bitmap_clear_all(uint64_t *bitmap, size_t size){ +// memset(bitmap, 0, size * sizeof(uint64_t)); +// } -// 根据指针计算在页中的块索引 -static inline uint16_t ptr_to_block_index(mp_page_t *pg, void *ptr){ - char *base = (char*)page_payload(pg); - char *p = (char*)ptr; - size_t offset = p - base; - return (uint16_t)(offset / pg->owner->block_size); -} +// // 根据指针计算在页中的块索引 +// static inline uint16_t ptr_to_block_index(mp_page_t *pg, void *ptr){ +// char *base = (char*)page_payload(pg); +// char *p = (char*)ptr; +// size_t offset = p - base; +// return (uint16_t)(offset / pg->owner->block_size); +// } static mp_page_t* mp_page_create(mp_bucket_t *owner){ @@ -74,7 +74,7 @@ static mp_page_t* mp_page_create(mp_bucket_t *owner){ pg->prev = NULL; pg->next = NULL; - bitmap_clear_all(pg->bitmap, 16); + // bitmap_clear_all(pg->bitmap, 16); char *p = (char*)page_payload(pg); for(uint16_t i = 0;i < cap - 1; ++ i){ @@ -95,8 +95,8 @@ static void *mp_page_alloc(mp_page_t *pg){ pg->free_count --; // 标记该块为已分配 - uint16_t index = ptr_to_block_index(pg, ret); - bitmap_set(pg->bitmap, index); + // uint16_t index = ptr_to_block_index(pg, ret); + // bitmap_set(pg->bitmap, index); return ret; } @@ -105,14 +105,14 @@ static int mp_page_free(mp_page_t *pg, void *ptr){ if(!pg || !ptr) return MEMPOOL_INVALID_INPUT; // 检查是否是 double free - uint16_t index = ptr_to_block_index(pg, ptr); - if(!bitmap_test(pg->bitmap, index)){ - // 该块未被分配,可能是 double free - return MEMPOOL_DOUBLE_FREE; - } + // uint16_t index = ptr_to_block_index(pg, ptr); + // if(!bitmap_test(pg->bitmap, index)){ + // // 该块未被分配,可能是 double free + // return MEMPOOL_DOUBLE_FREE; + // } // 标记该块为空闲 - bitmap_clear(pg->bitmap, index); + // bitmap_clear(pg->bitmap, index); *(void**)ptr = pg->free_list; pg->free_list = ptr; diff --git a/memory/mempool.h b/memory/mempool.h index e44d50e..38782d0 100644 --- a/memory/mempool.h +++ b/memory/mempool.h @@ -8,7 +8,7 @@ #include // #define MEMPOOL_PAGE_SIZE 4096 -#define MEMPOOL_PAGE_SIZE (4096*2) +#define MEMPOOL_PAGE_SIZE (256 * 1024) #define MEMPOOL_BLOCK_MAX_SIZE 512 #define MEMPOOL_ALIGNMENT 8 #define MEMPOOL_NUM_CLASSES (MEMPOOL_BLOCK_MAX_SIZE / MEMPOOL_ALIGNMENT) @@ -36,7 +36,7 @@ struct mp_page_s{ uint16_t free_count; uint16_t capacity; - uint64_t bitmap[16]; // 最多支持 512/1280 个块 (64*20) + // uint64_t bitmap[16]; }; struct mp_bucket_s{ diff --git a/test-redis/test.c b/test-redis/test.c index e432a93..1c86a9d 100644 --- a/test-redis/test.c +++ b/test-redis/test.c @@ -10,48 +10,49 @@ static void die(redisContext *c, const char *msg) { fprintf(stderr, "%s: %s\n", msg, c && c->err ? c->errstr : "unknown"); + redisFree(c); exit(1); } -static void must_ok(redisReply *r, const char *what) { - if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } +static int must_ok(redisReply *r, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); return -1; } if (!(r->type == REDIS_REPLY_STATUS && r->str && strcasecmp(r->str, "OK") == 0)) { fprintf(stderr, "%s: expect +OK, got type=%d str=%s\n", what, r->type, r->str ? r->str : "(null)"); freeReplyObject(r); - exit(1); + return -1; } freeReplyObject(r); } -static void must_int(redisReply *r, long long expect, const char *what) { - if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } +static int must_int(redisReply *r, long long expect, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); return -1; } if (r->type != REDIS_REPLY_INTEGER || r->integer != expect) { fprintf(stderr, "%s: expect :%lld, got type=%d int=%lld\n", what, expect, r->type, (long long)r->integer); freeReplyObject(r); - exit(1); + return -1; } freeReplyObject(r); } -static void must_bulk_eq(redisReply *r, const void *buf, size_t n, const char *what) { - if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } +static int must_bulk_eq(redisReply *r, const void *buf, size_t n, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); return -1; } if (r->type != REDIS_REPLY_STRING || r->len != n || memcmp(r->str, buf, n) != 0) { fprintf(stderr, "%s: bulk mismatch. type=%d len=%zu\n", what, r->type, r->len); fprintf(stderr, "expect:%s, truely:%s\n", (const char*)buf, r->str); freeReplyObject(r); - exit(1); + return -1; } freeReplyObject(r); } -static void must_nil(redisReply *r, const char *what) { - if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } +static int must_nil(redisReply *r, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); return -1; } if (r->type != REDIS_REPLY_NIL) { fprintf(stderr, "%s: expect nil, got type=%d\n", what, r->type); freeReplyObject(r); - exit(1); + return -1; } freeReplyObject(r); } diff --git a/test/test_client.c b/test/test_client.c deleted file mode 100644 index fb64e68..0000000 --- a/test/test_client.c +++ /dev/null @@ -1,220 +0,0 @@ - -#include "test_client.h" -#include -#include -#include - -int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) { - return (p + n <= end) ? 0 : -1; -} - -// 注意u8类型不需要ntoh或者hton -int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) { - const uint8_t *p = *pp; - if (kvs_need(p, end, 1) < 0) return -1; - *out = *p; - *pp = p + 1; - return 0; -} - -int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out) { - const uint8_t *p = *pp; - if (kvs_need(p, end, 2) < 0) return -1; - uint16_t v; - memcpy(&v, p, 2); - *out = ntohs(v); - *pp = p + 2; - return 0; -} - -int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) { - const uint8_t *p = *pp; - if (kvs_need(p, end, 4) < 0) return -1; - uint32_t v; - memcpy(&v, p, 4); - *out = ntohl(v); - *pp = p + 4; - return 0; -} - -int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v) { - uint8_t *p = *pp; - if (kvs_need(p, end, 1) < 0) return -1; - *p = v; - *pp = p + 1; - return 0; -} - -int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v) { - uint8_t *p = *pp; - if (kvs_need(p, end, 2) < 0) return -1; - uint16_t be = htons(v); - memcpy(p, &be, 2); - *pp = p + 2; - return 0; -} - -int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) { - uint8_t *p = *pp; - if (kvs_need(p, end, 4) < 0) return -1; - uint32_t be = htonl(v); - memcpy(p, &be, 4); - *pp = p + 4; - return 0; -} - - - -int getcmd(uint8_t op, const char *key, uint32_t key_len, const char *value, uint32_t value_len, uint8_t *buf){ - if(!buf) return -1; - uint8_t *end = buf + CMD_SIZE; - uint8_t *p = buf; - uint8_t argc = (key == NULL)?0:1; - argc += (value == NULL)?0:1; - - - if (kvs_write_u8(&p, end, op) < 0) return -1; - if (kvs_write_u8(&p, end, argc) < 0) return -1; - - - // 写入 key - if(key){ - int keylen = key_len; - if (kvs_write_u32(&p, end, keylen) < 0) return -1; - if (kvs_need(p, end, keylen) < 0) return -1; - if (keylen > 0) { - memcpy(p, key, keylen); - p += keylen; - } - } - - if(value){ - int vallen = value_len; - if (kvs_write_u32(&p, end, vallen) < 0) return -1; - if (kvs_need(p, end, vallen) < 0) return -1; - if (vallen > 0) { - memcpy(p, value, vallen); - p += vallen; - } - } - - return (p - buf); -} - -int parse_response(const uint8_t *buf, int buflen, kvs_response_t *rsp) { - if(buflen == 0) return 0; - const uint8_t *p = buf; - const uint8_t *end = buf + buflen; - - // 读取 OP - if (kvs_read_u8(&p, end, &rsp->op) < 0) { - fprintf(stderr, "Failed to read op\n"); - return -1; - } - - // 读取 status - if (kvs_read_u8(&p, end, &rsp->status) < 0) { - fprintf(stderr, "Failed to read status\n"); - return -1; - } - - // 读取 datalen - if (kvs_read_u32(&p, end, &rsp->datalen) < 0) { - fprintf(stderr, "Failed to read datalen\n"); - return -1; - } - - // 检查数据长度 - if (kvs_need(p, end, rsp->datalen) < 0) { - fprintf(stderr, "Data length mismatch: expected %u bytes, but only %ld available\n", - rsp->datalen, end - p); - return -1; - } - - // 指向数据部分 - rsp->data = (uint8_t *)p; - - return (p - buf) + rsp->datalen; -} - - -void print_response(const char *cmd_name, const kvs_response_t *rsp) { - printf("%s ", cmd_name); - if(rsp->op == KVS_CMD_GET || rsp->op == KVS_CMD_HGET || rsp->op == KVS_CMD_RGET){ - if (rsp->datalen > 0 && rsp->data != NULL) { - printf("Data: "); - // 尝试以字符串形式打印(如果是可打印字符) - int is_printable = 1; - for (uint32_t i = 0; i < rsp->datalen; i++) { - if (rsp->data[i] < 32 || rsp->data[i] > 126) { - is_printable = 0; - break; - } - } - - if (is_printable) { - printf("\""); - for (uint32_t i = 0; i < rsp->datalen; i++) { - printf("%c", rsp->data[i]); - } - printf("\"\n"); - } else { - // 以十六进制打印 - printf("0x"); - for (uint32_t i = 0; i < rsp->datalen; i++) { - printf("%02x", rsp->data[i]); - } - printf("\n"); - } - } else { - printf("Data: (empty)\n"); - } - }else { - switch (rsp->status) { - case KVS_STATUS_OK: - printf("(OK)\n"); - break; - case KVS_STATUS_ERROR: - printf("(ERROR)\n"); - break; - case KVS_STATUS_NO_EXIST: - printf("(NO_EXIST)\n"); - break; - case KVS_STATUS_EXIST: - printf("(EXISTS)\n"); - break; - default: - printf("(UNKNOWN)\n"); - break; - } - } -} - -int verify_response(const kvs_response_t *rsp, uint8_t expected_op, - uint8_t expected_status, const char *expected_data, uint32_t expected_len) { - if (rsp->op != expected_op) { - printf("❌ OP mismatch: expected %u, got %u\n", expected_op, rsp->op); - return 0; - } - - if (rsp->status != expected_status) { - printf("❌ Status mismatch: expected %u, got %u\n", expected_status, rsp->status); - return 0; - } - - if (expected_data != NULL) { - if (rsp->datalen != expected_len) { - printf("❌ Data length mismatch: expected %u, got %u\n", expected_len, rsp->datalen); - return 0; - } - - if (memcmp(rsp->data, expected_data, expected_len) != 0) { - printf("❌ Data content mismatch\n"); - return 0; - } - } - - return 1; -} - - diff --git a/test/test_client.h b/test/test_client.h deleted file mode 100644 index 7325862..0000000 --- a/test/test_client.h +++ /dev/null @@ -1,161 +0,0 @@ - - /** - * Request - * Cmd: | OP(1) | argc(1) | repeat { arglen(4) | arg } | - * - * Response - * Rsp: | OP(1) | status(1) | datalen(4) | data | - */ -#include -#include -#include -#include -#include -#include -#include - -#define CMD_SIZE (1024) -#define BATCH_SIZE (65536) -#define KVS_BATCH_MAX 128 -#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) - -// #define PRESP print_response -#define PRESP - - -typedef enum { - KVS_STATUS_OK = 0, - KVS_STATUS_ERROR = 1, - KVS_STATUS_NO_EXIST = 2, - KVS_STATUS_EXIST = 3, - KVS_STATUS_BADREQ = 4 -}rsp_ret_status_e; - -enum { - KVS_CMD_START = 0, - // array - KVS_CMD_SET = KVS_CMD_START, - KVS_CMD_GET, - KVS_CMD_DEL, - KVS_CMD_MOD, - KVS_CMD_EXIST, - // rbtree - KVS_CMD_RSET, - KVS_CMD_RGET, - KVS_CMD_RDEL, - KVS_CMD_RMOD, - KVS_CMD_REXIST, - // hash - KVS_CMD_HSET, - KVS_CMD_HGET, - KVS_CMD_HDEL, - KVS_CMD_HMOD, - KVS_CMD_HEXIST, - - KVS_CMD_SSYNC, - KVS_CMD_SAVE, - - KVS_CMD_COUNT, -}; - -typedef struct { - uint8_t op; - uint8_t status; - uint32_t datalen; - uint8_t *data; -} kvs_response_t; - -int kvs_need(const uint8_t *p, const uint8_t *end, size_t n); -int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out); -int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out); -int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out); - -int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v); -int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v); -int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v); - -int getcmd(uint8_t op, const char *key, uint32_t key_len, const char *value, uint32_t value_len, uint8_t *buf); - -int parse_response(const uint8_t *buf, int buflen, kvs_response_t *rsp); -void print_response(const char *cmd_name, const kvs_response_t *rsp); -int verify_response(const kvs_response_t *rsp, uint8_t expected_op, - uint8_t expected_status, const char *expected_data, uint32_t expected_len); - - - - - -typedef struct { - uint8_t buf[BATCH_SIZE]; - int len; // 当前已写入的 batch 字节数 - int cnt; // 当前 batch 里命令条数 - int cmd_len[KVS_BATCH_MAX]; -} kvs_batch_t; - -static void kvs_batch_init(kvs_batch_t *b) -{ - b->len = 0; - b->cnt = 0; - memset(b->cmd_len, 0, sizeof(b->cmd_len)); -} - -/** - * 用 getcmd() 生成单条命令,然后 append 到 batch buffer - * 返回:0 成功,-1 失败(太多条 or buffer 不够) - */ -static int kvs_batch_add(kvs_batch_t *b, uint8_t op, const char *key, uint32_t key_len, const char *value, uint32_t value_len){ - if (b->cnt >= KVS_BATCH_MAX) return -1; - uint8_t tmp[CMD_SIZE]; - int n = getcmd(op, key, key_len, value, value_len, tmp); // 你提供的函数 - if (n <= 0) return -1; - - if (b->len + n > (int)sizeof(b->buf)) return -1; - - memcpy(b->buf + b->len, tmp, n); - b->cmd_len[b->cnt] = n; - b->cnt++; - b->len += n; - return 0; -} - -/** - * 一次性发送 batch - * 返回:发送字节数,<0 表示失败 - */ -static int kvs_batch_send(int fd, const kvs_batch_t *b) -{ - // printf("send : %d\n", b->len); - return (int)send(fd, b->buf, b->len, 0); -} - -/** - * 一次 recv 收回所有响应,然后批量解析为 rsp 数组 - * - * 返回:成功解析出的响应条数(期望是 b->cnt) - */ -static int kvs_batch_recv_parse(int fd, - const kvs_batch_t *b, - kvs_response_t *rsps, // 输出数组,长度 >= b->cnt - uint8_t *recvbuf, - int recvbuf_cap) -{ - int parsed = 0; - int used = 0; - - while(parsed < b->cnt){ - int nrecv = (int)recv(fd, recvbuf+used, recvbuf_cap, 0); - if (nrecv <= 0) return -1; - - int off = 0; - - while (parsed < b->cnt) { - int consumed = parse_response(recvbuf + used, nrecv - off, &rsps[parsed]); - if (consumed <= 0) break; // 不够解析/失败,简单处理:直接退出 - - off += consumed; - used+= consumed; - parsed++; - } - } - return parsed; -} \ No newline at end of file diff --git a/test/testcase.c b/test/testcase.c deleted file mode 100644 index 3e03fdb..0000000 --- a/test/testcase.c +++ /dev/null @@ -1,539 +0,0 @@ - -#include "test_client.h" -#include - - -int connect_tcpserver(const char *ip, unsigned short port) { - - int connfd = socket(AF_INET, SOCK_STREAM, 0); - - struct sockaddr_in server_addr; - memset(&server_addr, 0, sizeof(struct sockaddr_in)); - - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = inet_addr(ip); - server_addr.sin_port = htons(port); - - if (0 != connect(connfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr_in))) { - perror("connect"); - return -1; - } - - return connfd; - -} - - -int send_msg(int connfd, char *msg, int length) { - - int res = send(connfd, msg, length, 0); - if (res < 0) { - perror("send"); - exit(1); - } - return res; -} - -int recv_msg(int connfd, char *msg, int length) { - - int res = recv(connfd, msg, length, 0); - if (res < 0) { - perror("recv"); - exit(1); - } - return res; - -} - -void testcase(int connfd, uint8_t op, const char* key, uint32_t key_len, const char* value, - uint32_t value_len, rsp_ret_status_e st, const char* rsp_value, uint32_t expect_len, const char* command_name){ - uint8_t buf[CMD_SIZE]; - uint8_t result[CMD_SIZE]; - kvs_response_t rsp; - int len, recv_len; - - len = getcmd(op, key, key_len, value, value_len, buf); - send_msg(connfd, buf, len); - recv_len = recv_msg(connfd, result, CMD_SIZE); - if (parse_response(result, recv_len, &rsp) > 0) { - PRESP(command_name, &rsp); - if(!verify_response(&rsp, op, st, rsp_value, expect_len)) printf("%s\n", command_name); - }else{ - printf("parser error\n"); - } - - return ; -} - - -void array_testcase_1w(int connfd) { - - int count = 1000; - int i = 0; - - struct timeval tv_begin; - gettimeofday(&tv_begin, NULL); - - for (i = 0;i < count;i ++) { - testcase(connfd, KVS_CMD_SET, "name", 4, "l\r\0n", 4, KVS_STATUS_OK, NULL, 0, "SET NAME"); - testcase(connfd, KVS_CMD_GET, "name", 4, NULL, 0, KVS_STATUS_OK, "l\r\0n", 4, "GET NAME"); - testcase(connfd, KVS_CMD_MOD, "name", 4, "liu", 3, KVS_STATUS_OK, NULL, 0, "MOD NAME"); - testcase(connfd, KVS_CMD_GET, "name", 4, NULL, 0, KVS_STATUS_OK, "liu", 3, "GET NAME"); - testcase(connfd, KVS_CMD_EXIST, "name", 4, NULL, 0, KVS_STATUS_EXIST, NULL, 0, "EXIST NAME"); - testcase(connfd, KVS_CMD_DEL, "name", 4, NULL, 0, KVS_STATUS_OK, NULL, 0, "DEL NAME"); - testcase(connfd, KVS_CMD_EXIST, "name", 4, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "EXIST NAME"); - - testcase(connfd, KVS_CMD_MOD, "stu", 3, "liu", 3, KVS_STATUS_NO_EXIST, NULL, 0, "MOD NAME"); - testcase(connfd, KVS_CMD_DEL, "stu", 3, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "DEL SUT"); - } - - struct timeval tv_end; - gettimeofday(&tv_end, NULL); - - int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms - - printf("array testcase --> time_used: %d, qps: %d\n", time_used, 9000 * 1000 / time_used); - -} - -void rbtree_testcase_1w(int connfd) { - - int count = 1000; - int i = 0; - - struct timeval tv_begin; - gettimeofday(&tv_begin, NULL); - - for (i = 0;i < count;i ++) { - testcase(connfd, KVS_CMD_RSET, "name", 4, "l\r\0n", 4, KVS_STATUS_OK, NULL, 0, "RSET NAME"); - testcase(connfd, KVS_CMD_RGET, "name", 4, NULL, 0, KVS_STATUS_OK, "l\r\0n", 4, "RGET NAME"); - testcase(connfd, KVS_CMD_RMOD, "name", 4, "liu", 3, KVS_STATUS_OK, NULL, 0, "RMOD NAME"); - testcase(connfd, KVS_CMD_RGET, "name", 4, NULL, 0, KVS_STATUS_OK, "liu", 3, "RGET NAME"); - testcase(connfd, KVS_CMD_REXIST, "name", 4, NULL, 0, KVS_STATUS_EXIST, NULL, 0, "REXIST NAME"); - testcase(connfd, KVS_CMD_RDEL, "name", 4, NULL, 0, KVS_STATUS_OK, NULL, 0, "RDEL NAME"); - testcase(connfd, KVS_CMD_REXIST, "name", 4, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "REXIST NAME"); - - testcase(connfd, KVS_CMD_RMOD, "stu", 3, "liu", 3, KVS_STATUS_NO_EXIST, NULL, 0, "RMOD NAME"); - testcase(connfd, KVS_CMD_RDEL, "stu", 3, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "RDEL SUT"); - } - - struct timeval tv_end; - gettimeofday(&tv_end, NULL); - - int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms - - printf("array testcase --> time_used: %d, qps: %d\n", time_used, 9000 * 1000 / time_used); - -} - -void hash_testcase_1w(int connfd) { - - int count = 1000; - int i = 0; - - struct timeval tv_begin; - gettimeofday(&tv_begin, NULL); - - for (i = 0;i < count;i ++) { - testcase(connfd, KVS_CMD_HSET, "name", 4, "l\r\0n", 4, KVS_STATUS_OK, NULL, 0, "HSET NAME"); - testcase(connfd, KVS_CMD_HGET, "name", 4, NULL, 0, KVS_STATUS_OK, "l\r\0n", 4, "HGET NAME"); - testcase(connfd, KVS_CMD_HMOD, "name", 4, "liu", 3, KVS_STATUS_OK, NULL, 0, "HMOD NAME"); - testcase(connfd, KVS_CMD_HGET, "name", 4, NULL, 0, KVS_STATUS_OK, "liu", 3, "HGET NAME"); - testcase(connfd, KVS_CMD_HEXIST, "name", 4, NULL, 0, KVS_STATUS_EXIST, NULL, 0, "HEXIST NAME"); - testcase(connfd, KVS_CMD_HDEL, "name", 4, NULL, 0, KVS_STATUS_OK, NULL, 0, "HDEL NAME"); - testcase(connfd, KVS_CMD_HEXIST, "name", 4, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "HEXIST NAME"); - - testcase(connfd, KVS_CMD_HMOD, "stu", 3, "liu", 3, KVS_STATUS_NO_EXIST, NULL, 0, "HMOD NAME"); - testcase(connfd, KVS_CMD_HDEL, "stu", 3, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "HDEL SUT"); - } - - struct timeval tv_end; - gettimeofday(&tv_end, NULL); - - int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms - - printf("array testcase --> time_used: %d, qps: %d\n", time_used, 9*count * 1000 / time_used); - -} - -void do_batch_test(int fd, int op, const char *key, const char *value, rsp_ret_status_e st, const char *rsp_value){ - kvs_batch_t batch; - kvs_batch_init(&batch); - - char bkey[256]={0}, bval[256]={0}; - - // 组 batch(最多 64 条) - for(int i = 0;i < 100; ++ i){ - if(value == NULL){ - int klen = sprintf(bkey, "%s%d", key, i); - kvs_batch_add(&batch, op, bkey, klen, NULL, 0); - }else{ - int klen = sprintf(bkey, "%s%d", key, i); - int vlen = sprintf(bval, "%s%d", value, i); - kvs_batch_add(&batch, op, bkey, klen, bval, vlen); - } - } - - // 一次性发送 - kvs_batch_send(fd, &batch); - - // 一次性 recv + parse - uint8_t recvbuf[BATCH_SIZE]; - kvs_response_t rsps[KVS_BATCH_MAX]; - - int nrsp = kvs_batch_recv_parse(fd, &batch, rsps, recvbuf, sizeof(recvbuf)); - - // 打印/处理 - for (int i = 0; i < nrsp; i++) { - print_response(bkey, &rsps[i]); - int vlen; - if(rsp_value != NULL) vlen = sprintf(bval, "%s%d", rsp_value, i); - else vlen = 0; - verify_response(&rsps[i], op, st, bval, vlen); - } -} - -void array_testcase_1w_batch(int connfd) { - kvs_batch_t batch; - kvs_batch_init(&batch); - - int count = 1000; - int i = 0; - - struct timeval tv_begin; - gettimeofday(&tv_begin, NULL); - - for (i = 0;i < count;i ++) { - batch.cnt = 0; - batch.len = 0; - kvs_batch_add(&batch, KVS_CMD_SET, "name", 4, "l\r\0n", 4); - kvs_batch_add(&batch, KVS_CMD_GET, "name", 4, NULL, 0); - kvs_batch_add(&batch, KVS_CMD_MOD, "name", 4, "liu", 3); - kvs_batch_add(&batch, KVS_CMD_GET, "name", 4, NULL, 0); - kvs_batch_add(&batch, KVS_CMD_EXIST, "name", 4, NULL, 0); - kvs_batch_add(&batch, KVS_CMD_DEL, "name", 4, NULL, 0); - kvs_batch_add(&batch, KVS_CMD_EXIST, "name", 4, NULL, 0); - - kvs_batch_add(&batch, KVS_CMD_MOD, "stu", 3, "liu", 3); - kvs_batch_add(&batch, KVS_CMD_DEL, "stu", 3, NULL, 0); - - kvs_batch_send(connfd, &batch); - - uint8_t recvbuf[BATCH_SIZE]; - kvs_response_t rsps[KVS_BATCH_MAX]; - int nrsp = kvs_batch_recv_parse(connfd, &batch, rsps, recvbuf, sizeof(recvbuf)); - } - - struct timeval tv_end; - gettimeofday(&tv_end, NULL); - - int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms - - printf("array testcase --> time_used: %d, qps: %d\n", time_used, 9000 * 1000 / time_used); - -} - -void batch_qps(int connfd) { - const int N = 1000000; - const int B = 100; // do_batch_test() 里写死 50 - static char valA[256]; - static char valB[256]; - static char valC[256]; - - static int inited = 0; - if (!inited) { - // 填充 255 字节,最后补 '\0' - memset(valA, 'A', 255); valA[255] = '\0'; - memset(valB, 'B', 255); valB[255] = '\0'; - memset(valC, 'C', 255); valC[255] = '\0'; - inited = 1; - } - - struct timeval tv_begin, tv_end; - gettimeofday(&tv_begin, NULL); - - // ---------------- Phase 1: ADD 两条 DEL 一条 (100w) ---------------- - // 每轮:ADD/SET A_i, ADD/SET B_i, DEL A_i - // 用 batch:每批处理 idx..idx+49 - for (int base = 0; base < N; base += B) { - // prefix 必须短,避免 do_batch_test 里 bkey[15] 溢出 - // do_batch_test 会生成: prefix + i (0..49) - // 我们把 base 编进 prefix,保证全局 key 唯一 - char preA[16], preB[16]; - - // 例:A123450_0..A123450_49(base=123450) - // 注意:这里 prefix 长度要尽量 <= 10 左右 - snprintf(preA, sizeof(preA), "A%d_", base/100); - snprintf(preB, sizeof(preB), "B%d_", base/100); - - do_batch_test(connfd, KVS_CMD_RSET, preA, valA, KVS_STATUS_OK, NULL); // 50次 RSET A - do_batch_test(connfd, KVS_CMD_RSET, preB, valB, KVS_STATUS_OK, NULL); // 50次 RSET B - do_batch_test(connfd, KVS_CMD_RDEL, preA, NULL, KVS_STATUS_OK, NULL); // 50次 RDEL A - - if (base % 10000 == 0) printf("P1 base:%d\n", base); - } - printf("phase 1 end\n"); - - // ---------------- Phase 2: ADD 一条 DEL 两条 (100w) ---------------- - // 每轮:ADD/SET C_i, DEL B_i, DEL C_i - for (int base = 0; base < N; base += B) { - char preB[16], preC[16]; - snprintf(preB, sizeof(preB), "B%d_", base/100); - snprintf(preC, sizeof(preC), "C%d_", base/100); - - do_batch_test(connfd, KVS_CMD_RSET, preC, valC, KVS_STATUS_OK, NULL); // 50次 RSET C - do_batch_test(connfd, KVS_CMD_RDEL, preB, NULL, KVS_STATUS_OK, NULL); // 50次 RDEL B - do_batch_test(connfd, KVS_CMD_RDEL, preC, NULL, KVS_STATUS_OK, NULL); // 50次 RDEL C - - if (base % 10000 == 0) printf("P2 base:%d\n", base); - } - printf("phase 2 end\n"); - - gettimeofday(&tv_end, NULL); - int time_used = TIME_SUB_MS(tv_end, tv_begin); - - // 真实总 ops 还是 6*N (每轮 6 个操作) - long long ops = (long long)N * 6; - long long qps = (time_used > 0) ? (ops * 1000 / time_used) : 0; - - printf("BATCH(do_batch_test) ADD2-DEL1 then ADD1-DEL2 (N=%d) --> time_used=%d ms, ops=%lld, qps=%lld\n", - N, time_used, ops, qps); -} - -void save(int connfd){ - testcase(connfd, KVS_CMD_SAVE, NULL, 0, NULL, 0, KVS_STATUS_OK, NULL, 0, "SAVE"); -} - -void testcase_add2_del1_then_add1_del2_100w(int connfd) { - const int N = 1000000; - - // 如果你有 KVS_CMD_ADD 就用 ADD;没有就用 SET(但 SET 会覆盖,意义不同) - // 这里按你说的“会返回 EXIST”,所以我用 KVS_CMD_ADD 来写。 - // 若你实际命令叫 KVS_CMD_SET 且语义是 ADD,请自行替换。 - const char *valA = "va"; - const char *valB = "vb"; - const char *valC = "vc"; - - char keyA[64], keyB[64], keyC[64]; - - struct timeval tv_begin, tv_end; - gettimeofday(&tv_begin, NULL); - - // ---------------- Phase 1: ADD 两条 DEL 一条 (100w) ---------------- - // 每轮:ADD A_i, ADD B_i, DEL A_i -> 留下 B_i - for (int i = 0; i < N; i++) { - int klenA = snprintf(keyA, sizeof(keyA), "A_%d", i); - int klenB = snprintf(keyB, sizeof(keyB), "B_%d", i); - - testcase(connfd, KVS_CMD_RSET, keyA, klenA, valA, 2, KVS_STATUS_OK, NULL, 0, "P1 ADD A_i"); - testcase(connfd, KVS_CMD_RSET, keyB, klenB, valB, 2, KVS_STATUS_OK, NULL, 0, "P1 ADD B_i"); - testcase(connfd, KVS_CMD_RDEL, keyA, klenA, NULL, 0, KVS_STATUS_OK, NULL, 0, "P1 DEL A_i"); - - if(i%10000 == 0) printf("i:%d\n", i); - } - - printf("phase 1 end\n"); - - // ---------------- Phase 2: ADD 一条 DEL 两条 (100w) ---------------- - // 每轮:ADD C_i, DEL B_i, DEL C_i -> 每轮净删一个 B_i - for (int i = 0; i < N; i++) { - int klenC = snprintf(keyC, sizeof(keyC), "C_%d", i); - int klenB = snprintf(keyB, sizeof(keyB), "B_%d", i); - - testcase(connfd, KVS_CMD_RSET, keyC, klenC, valC, 2, KVS_STATUS_OK, NULL, 0, "P2 ADD C_i"); - testcase(connfd, KVS_CMD_RDEL, keyB, klenB, NULL, 0, KVS_STATUS_OK, NULL, 0, "P2 DEL B_i"); - testcase(connfd, KVS_CMD_RDEL, keyC, klenC, NULL, 0, KVS_STATUS_OK, NULL, 0, "P2 DEL C_i"); - - if(i%10000 == 0) printf("i:%d\n", i); - } - - printf("phase 2 end\n"); - - // for (int j = 0; j < 5; j++) { - // int idx = (j == 0) ? 0 : (j == 1) ? (N/2) : (N-1); - - // int klenA = snprintf(keyA, sizeof(keyA), "A_%d", idx); - // int klenB = snprintf(keyB, sizeof(keyB), "B_%d", idx); - // int klenC = snprintf(keyC, sizeof(keyC), "C_%d", idx); - - // testcase(connfd, KVS_CMD_EXIST, keyA, klenA, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "FINAL A not exist"); - // testcase(connfd, KVS_CMD_EXIST, keyB, klenB, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "FINAL B not exist"); - // testcase(connfd, KVS_CMD_EXIST, keyC, klenC, NULL, 0, KVS_STATUS_NO_EXIST, NULL, 0, "FINAL C not exist"); - // } - - gettimeofday(&tv_end, NULL); - int time_used = TIME_SUB_MS(tv_end, tv_begin); - - // 统计:Phase1 每轮3 ops,Phase2 每轮3 ops - long long ops = (long long)N * 3 + (long long)N * 3; - long long qps = (time_used > 0) ? (ops * 1000 / time_used) : 0; - - printf("ADD2-DEL1 then ADD1-DEL2 (N=%d) --> time_used=%d ms, ops=%lld, qps=%lld\n", - N, time_used, ops, qps); -} - -void send_spec_chars(int connfd){ - const char *v_ws = "li an\tok\nend"; /* 内容含空格、\t、\n */ - int v_ws_len = 12; /* 手算:l i ' ' a n \t o k \n e n d = 12 */ - - testcase(connfd, KVS_CMD_RSET, - "ws", 2, - v_ws, v_ws_len, - KVS_STATUS_OK, - NULL, 0, - "RSET WHITESPACE"); - - testcase(connfd, KVS_CMD_RGET, - "ws", 2, - NULL, 0, - KVS_STATUS_OK, - v_ws, v_ws_len, - "RGET WHITESPACE"); - - /* 2) 引号与反斜杠:测试是否被错误转义 */ - const char *v_quote = "he\"llo\\world'!"; - /* 字节数:h e " l l o \ w o r l d ' ! = 15 */ - int v_quote_len = 15; - - testcase(connfd, KVS_CMD_RSET, - "quote", 5, - v_quote, v_quote_len, - KVS_STATUS_OK, - NULL, 0, - "RSET QUOTE BACKSLASH"); - - testcase(connfd, KVS_CMD_RGET, - "quote", 5, - NULL, 0, - KVS_STATUS_OK, - v_quote, v_quote_len, - "RGET QUOTE BACKSLASH"); - - - /* 3) 分隔符字符:冒号/逗号/分号/竖线 */ - const char *v_sep = "a:b,c;d|e"; - int v_sep_len = 9; /* a : b , c ; d | e = 9 */ - - testcase(connfd, KVS_CMD_RSET, - "sep", 3, - v_sep, v_sep_len, - KVS_STATUS_OK, - NULL, 0, - "RSET SEPARATORS"); - - testcase(connfd, KVS_CMD_RGET, - "sep", 3, - NULL, 0, - KVS_STATUS_OK, - v_sep, v_sep_len, - "RGET SEPARATORS"); - - /* 4) CRLF:\r\n 组合(最容易把一条请求拆错/响应拼错) */ - const char *v_crlf = "line1\r\nline2"; - int v_crlf_len = 12; /* line1(5) + \r(1) + \n(1) + line2(5) = 12 */ - - testcase(connfd, KVS_CMD_RSET, - "crlf", 4, - v_crlf, v_crlf_len, - KVS_STATUS_OK, - NULL, 0, - "RSET CRLF"); - - testcase(connfd, KVS_CMD_RGET, - "crlf", 4, - NULL, 0, - KVS_STATUS_OK, - v_crlf, v_crlf_len, - "RGET CRLF"); - - /* 5) 二进制数据:包含 \0,必须按长度处理,不能用 strlen */ - char v_bin[] = { 'A', 0x00, 'B', 'C', 0x00, 'D' }; - int v_bin_len = 6; - - testcase(connfd, KVS_CMD_RSET, - "bin", 3, - v_bin, v_bin_len, - KVS_STATUS_OK, - NULL, 0, - "RSET BINARY WITH NUL"); - - testcase(connfd, KVS_CMD_RGET, - "bin", 3, - NULL, 0, - KVS_STATUS_OK, - v_bin, v_bin_len, - "RGET BINARY WITH NUL"); - - /* 6) UTF-8:中文 + emoji(测试多字节) */ - const char *v_utf8 = "中文🙂"; - /* "中"(3字节) + "文"(3字节) + "🙂"(4字节) = 10字节 */ - int v_utf8_len = 10; - - testcase(connfd, KVS_CMD_RSET, - "utf8", 4, - v_utf8, v_utf8_len, - KVS_STATUS_OK, - NULL, 0, - "RSET UTF8"); - - testcase(connfd, KVS_CMD_RGET, - "utf8", 4, - NULL, 0, - KVS_STATUS_OK, - v_utf8, v_utf8_len, - "RGET UTF8"); - -} - -int main(int argc, char *argv[]) { - if (argc != 4) { - printf("arg error\n"); - return -1; - } - - char *ip = argv[1]; - int port = atoi(argv[2]); - int mode = atoi(argv[3]); - - int connfd = connect_tcpserver(ip, port); - - if(mode == 0){ - array_testcase_1w(connfd); - }else if(mode == 1){ - array_testcase_1w_batch(connfd); - }else if(mode == 2){ - rbtree_testcase_1w(connfd); - }else if(mode == 3){ - hash_testcase_1w(connfd); - }else if(mode == 4){ - testcase_add2_del1_then_add1_del2_100w(connfd); - }else if(mode == 10){ - do_batch_test(connfd, KVS_CMD_SET, "array_set", "array_val", KVS_STATUS_OK, NULL); - }else if(mode == 11){ - do_batch_test(connfd, KVS_CMD_GET, "array_set", NULL, KVS_STATUS_OK, "array_val"); - }else if(mode == 12){ - do_batch_test(connfd, KVS_CMD_EXIST, "array_set", NULL, KVS_STATUS_EXIST, NULL); - }else if(mode == 13){ - do_batch_test(connfd, KVS_CMD_DEL, "array_set", NULL, KVS_STATUS_OK, NULL); - }else if(mode == 20){ - do_batch_test(connfd, KVS_CMD_RSET, "rbtree_set", "rbtree_val", KVS_STATUS_OK, NULL); - }else if(mode == 21){ - do_batch_test(connfd, KVS_CMD_RGET, "rbtree_set", NULL, KVS_STATUS_OK, "rbtree_val"); - }else if(mode == 22){ - do_batch_test(connfd, KVS_CMD_REXIST, "rbtree_set", NULL, KVS_STATUS_OK, NULL); - }else if(mode == 30){ - do_batch_test(connfd, KVS_CMD_HSET, "hash_set", "hash_val", KVS_STATUS_OK, NULL); - }else if(mode == 31){ - do_batch_test(connfd, KVS_CMD_HGET, "hash_set", NULL, KVS_STATUS_OK, "hash_val"); - }else if(mode == 32){ - do_batch_test(connfd, KVS_CMD_HEXIST, "hash_set", NULL, KVS_STATUS_OK, NULL); - }else if(mode == -1){ - save(connfd); - }else if(mode == 5){ - batch_qps(connfd); - }else if(mode == 6){ - send_spec_chars(connfd); - } - - return 0; -} \ No newline at end of file