From ba2004c2587b9c151c7270a82f65a5fa5a152f6b Mon Sep 17 00:00:00 2001 From: 1iaan <139833683+1iaan@users.noreply.github.com> Date: Thu, 22 Jan 2026 12:38:34 +0000 Subject: [PATCH] =?UTF-8?q?mmap=E5=8A=A0=E8=BD=BD=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6=EF=BC=8Curing=E5=AE=9E=E7=8E=B0=E6=8C=81?= =?UTF-8?q?=E4=B9=85=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 4 +- README.md | 47 +++-- common/config.c | 156 +++++++++++--- diskuring/diskuring.c | 198 ++++++++++++++++++ diskuring/diskuring.h | 59 ++++++ kvs_array_bin.c | 55 ++++- kvs_hash.o | Bin 13600 -> 0 bytes kvs_hash_bin.c | 60 ++++-- kvs_inc_log.c => kvs_oplog.c | 50 ++++- kvs_inc_log.h => kvs_oplog.h | 4 +- kvs_protocol_resp.c | 2 + kvs_rbtree_bin.c | 77 ++++--- kvs_rw_tools.c | 380 +---------------------------------- kvs_rw_tools.h | 15 +- kvstore.c | 159 +++------------ 15 files changed, 627 insertions(+), 639 deletions(-) create mode 100644 diskuring/diskuring.c create mode 100644 diskuring/diskuring.h delete mode 100644 kvs_hash.o rename kvs_inc_log.c => kvs_oplog.c (70%) rename kvs_inc_log.h => kvs_oplog.h (62%) diff --git a/Makefile b/Makefile index bc295ad..97f4e39 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ CC = gcc -FLAGS = -g -DJEMALLOC_NO_DEMANGLE -I./NtyCo/core/ -I/usr/include/libxml2 -L ./NtyCo/ +FLAGS = -g -DJEMALLOC_NO_DEMANGLE -I./NtyCo/core/ -I/usr/include/libxml2 -L ./NtyCo/ -L ./mem_pool/ -L ./diskuring/ LDFLAGS = -lntyco -lpthread -luring -ldl -lxml2 -SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_protocol_resp.c kvs_inc_log.c kvs_slave.c ./mem_pool/mem_pool.c ./common/config.c +SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_protocol_resp.c kvs_oplog.c kvs_slave.c ./mem_pool/mem_pool.c ./common/config.c ./diskuring/diskuring.c TARGET = kvstore SUBDIR = ./NtyCo/ diff --git a/README.md b/README.md index 6335b4e..7c851b1 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,11 @@ # 9.1 Kvstore -## 环境安装 -```shell -# xml -sudo apt install libxml2 libxml2-dev -# hiredis client -sudo apt install -y libhiredis-dev -``` - ## 需求 -- ntyco需要作为kvstore的submodule,通过git clone一次下载。 **完成**。 -- README需要包含编译步骤,测试方案与可行性,性能数据。 -- 增量持久化需要包含完整的指令。 -- 全量持久化保存数据集。 -- 持久化的性能数据。 - -- 特殊字符,可以解决redis的resp协议。 +1. ntyco需要作为kvstore的submodule,通过git clone一次下载。 **完成**。 +2. README需要包含编译步骤,测试方案与可行性,性能数据。 +3. 全量持久化保存数据集。 **BUG FIX,完成**。 +4. 持久化的性能数据。 +5. 特殊字符,可以解决redis的resp协议。 **完成**。 ``` 简单字符串 +OK\r\n @@ -28,15 +18,28 @@ $6\r\nfoobar\r\n 数组 *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n ``` +6. 实现配置文件,把日志级别,端口ip,主从模式,持久化方案。 **完成**。 +7. 持久化落盘用io_uring,加载配置文件用mmap。 +8. 主从同步的性能,开启与关闭性能做到。 +9. 主从同步600w条,出现的coredump。 +10. 主从同步用ebpf实现。 +11. 内存池测试qps与虚拟内存,物理内存。 +12. 实现一个内存泄露检测组件。 -- 实现配置文件,把日志级别,端口ip,主从模式,持久化方案。 **完成**。 -- 持久化落盘用io_uring,加载配置文件用mmap。 -- 主从同步的性能,开启与关闭性能做到。 -- 主从同步60ew条,出现的coredump。 -- 主从同步用ebpf实现。 -- 内存池测试qps与虚拟内存,物理内存。 -- 实现一个内存泄露检测组件。 +## 环境安装与编译 +```shell +# xml +sudo apt install libxml2 libxml2-dev +# hiredis client +sudo apt install -y libhiredis-dev + +git clone git@gitlab.0voice.com:lianyiheng/9.1-kvstore.git +cd 9.1-kvstore/ +make +``` + +## 测试 ### 面试题 1. 为什么会实现kvstore,使用场景在哪里? diff --git a/common/config.c b/common/config.c index d0da03e..3477c51 100644 --- a/common/config.c +++ b/common/config.c @@ -6,6 +6,12 @@ #include // strcasecmp #include #include +#include +#include +#include +#include +#include +#include static xmlNodePtr find_child(xmlNodePtr parent, const char *name) { @@ -87,6 +93,52 @@ static void parse_allocator(const char *s, AllocatorType *out) else *out = ALLOC_OTHER; } +static int read_file_mmap(const char *filename, void **out_addr, size_t *out_len, int *out_fd) { + if (!filename || !out_addr || !out_len || !out_fd) return -1; + + *out_addr = NULL; + *out_len = 0; + *out_fd = -1; + + int fd = open(filename, O_RDONLY); + if (fd < 0) { + fprintf(stderr, "config_load: open(%s) failed: %s\n", filename, strerror(errno)); + return -1; + } + + struct stat st; + if (fstat(fd, &st) != 0) { + fprintf(stderr, "config_load: fstat(%s) failed: %s\n", filename, strerror(errno)); + close(fd); + return -1; + } + + if (!S_ISREG(st.st_mode)) { + fprintf(stderr, "config_load: %s is not a regular file\n", filename); + close(fd); + return -1; + } + + if (st.st_size <= 0) { + fprintf(stderr, "config_load: %s is empty\n", filename); + close(fd); + return -1; + } + + size_t len = (size_t)st.st_size; + void *addr = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); + if (addr == MAP_FAILED) { + fprintf(stderr, "config_load: mmap(%s) failed: %s\n", filename, strerror(errno)); + close(fd); + return -1; + } + + *out_addr = addr; + *out_len = len; + *out_fd = fd; + return 0; +} + /* ---- 对外的枚举转字符串工具 ---- */ const char *log_level_to_string(LogLevel lvl) @@ -129,27 +181,9 @@ const char *allocator_to_string(AllocatorType a) /* ---- 主函数:从 XML 加载配置 ---- */ -int config_load(const char *filename, AppConfig *out_cfg) -{ - if (!filename || !out_cfg) return -1; - - set_default_config(out_cfg); - - xmlDocPtr doc = xmlReadFile(filename, "UTF-8", XML_PARSE_NOBLANKS); - if (!doc) { - fprintf(stderr, "config_load: failed to read file %s\n", filename); - return -1; - } - - xmlNodePtr root = xmlDocGetRootElement(doc); - if (!root || xmlStrcmp(root->name, BAD_CAST "config") != 0) { - fprintf(stderr, "config_load: root element is not \n"); - xmlFreeDoc(doc); - return -1; - } - - /* server 部分 */ - xmlNodePtr server = find_child(root, "server"); +/* server 部分 */ +void server_load(xmlNodePtr *root, AppConfig *out_cfg){ + xmlNodePtr server = find_child(*root, "server"); if (server) { /* ip */ xmlNodePtr ip_node = find_child(server, "ip"); @@ -205,9 +239,11 @@ int config_load(const char *filename, AppConfig *out_cfg) } } } +} - /* log 部分 */ - xmlNodePtr log = find_child(root, "log"); +/* log 部分 */ +void log_load(xmlNodePtr *root, AppConfig *out_cfg){ + xmlNodePtr log = find_child(*root, "log"); if (log) { xmlNodePtr lvl_node = find_child(log, "level"); if (lvl_node) { @@ -218,9 +254,11 @@ int config_load(const char *filename, AppConfig *out_cfg) } } } +} - /* persistence 部分 */ - xmlNodePtr pers = find_child(root, "persistence"); +/* persistence 部分 */ +void persist_load(xmlNodePtr *root, AppConfig *out_cfg){ + xmlNodePtr pers = find_child(*root, "persistence"); if (pers) { xmlNodePtr type_node = find_child(pers, "type"); if (type_node) { @@ -281,9 +319,11 @@ int config_load(const char *filename, AppConfig *out_cfg) } } } +} - /* memory 部分 */ - xmlNodePtr mem = find_child(root, "memory"); +/* memory 部分 */ +void memory_load(xmlNodePtr *root, AppConfig *out_cfg){ + xmlNodePtr mem = find_child(*root, "memory"); if (mem) { xmlNodePtr alloc_node = find_child(mem, "allocator"); if (alloc_node) { @@ -294,9 +334,61 @@ int config_load(const char *filename, AppConfig *out_cfg) } } } - - xmlFreeDoc(doc); - /* xmlCleanupParser() 建议在程序退出时 main 里统一调用 */ - - return 0; +} + +int config_load(const char *filename, AppConfig *out_cfg) +{ + if (!filename || !out_cfg) return -1; + + set_default_config(out_cfg); + + int rc = -1; + xmlDocPtr doc = NULL; + + int fd = -1; + void *addr = NULL; + size_t len = 0; + + if (read_file_mmap(filename, &addr, &len, &fd) != 0) { + // read_file_mmap 已经打印错误 + return -1; + } + + /* + * 用 xmlReadMemory 从内存解析。 + * - "UTF-8":你原来指定了 UTF-8;如果希望自动探测,可以传 NULL。 + * - XML_PARSE_NONET:禁用网络访问(防 XXE/外部实体拉取) + * - XML_PARSE_NOBLANKS:保持你原来的行为 + * 你也可以加 XML_PARSE_NOERROR | XML_PARSE_NOWARNING 减少噪音,但调试阶段不建议。 + */ + int parse_opts = XML_PARSE_NOBLANKS | XML_PARSE_NONET; + + + // xmlDocPtr doc = xmlReadFile(filename, "UTF-8", XML_PARSE_NOBLANKS); + doc = xmlReadMemory((const char *)addr, (int)len, filename, "UTF-8", parse_opts); + if (!doc) { + fprintf(stderr, "config_load: failed to read file %s\n", filename); + goto cleanup; + } + + xmlNodePtr root = xmlDocGetRootElement(doc); + if (!root || xmlStrcmp(root->name, BAD_CAST "config") != 0) { + fprintf(stderr, "config_load: root element is not \n"); + goto cleanup; + } + + server_load(&root, out_cfg); + + log_load(&root, out_cfg); + + persist_load(&root, out_cfg); + + memory_load(&root, out_cfg); + + rc = 0; +cleanup: + if (doc) xmlFreeDoc(doc); + if (addr && len) munmap(addr, len); + if (fd >= 0) close(fd); + return rc; } diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c new file mode 100644 index 0000000..288e506 --- /dev/null +++ b/diskuring/diskuring.c @@ -0,0 +1,198 @@ +#include "diskuring.h" +#include "../mem_pool/mem_pool.h" +#include +#include + +iouring_ctx_t global_uring_ctx; + +void task_init(task_t *t) +{ + pthread_mutex_init(&t->m, NULL); + pthread_cond_init(&t->cv, NULL); + t->done = 0; + t->res = 0; + t->next = NULL; +} + +void task_finish(task_t *t, int res) +{ + pthread_mutex_lock(&t->m); + t->res = res; + t->done = 1; + pthread_cond_broadcast(&t->cv); + pthread_mutex_unlock(&t->m); +} + +int task_wait(task_t *t) +{ + pthread_mutex_lock(&t->m); + while (!t->done) + pthread_cond_wait(&t->cv, &t->m); + int r = t->res; + pthread_mutex_unlock(&t->m); + return r; +} + +void task_destroy(task_t *t) +{ + pthread_mutex_destroy(&t->m); + pthread_cond_destroy(&t->cv); + if (t->iovs) { + for (int i = 0; i < t->iovcnt; i++) { + if (t->iovs[i].iov_base) { + kvs_free(t->iovs[i].iov_base); + } + } + kvs_free(t->iovs); + } + + kvs_free(t); +} + +static void queue_push(iouring_ctx_t *ctx, task_t *t) +{ + pthread_mutex_lock(&ctx->q_m); + if (ctx->q_tail) + ctx->q_tail->next = t; + else + ctx->q_head = t; + ctx->q_tail = t; + pthread_cond_signal(&ctx->q_cv); + pthread_mutex_unlock(&ctx->q_m); +} + +static task_t *queue_pop_all(iouring_ctx_t *ctx) +{ + pthread_mutex_lock(&ctx->q_m); + while (!ctx->stop && ctx->q_head == NULL) + { + pthread_cond_wait(&ctx->q_cv, &ctx->q_m); + } + task_t *list = ctx->q_head; + ctx->q_head = ctx->q_tail = NULL; + pthread_mutex_unlock(&ctx->q_m); + return list; +} + +static void *worker_main(void *arg) +{ + iouring_ctx_t *ctx = (iouring_ctx_t *)arg; + + while (!ctx->stop) + { + + // 1. 阻塞等任务(这里睡觉) + task_t *list = queue_pop_all(ctx); + if (!list) + continue; + + // 2. 提交所有任务 + task_t *t; + int submit_cnt = 0; + + for (t = list; t; t = t->next) + { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); + if (!sqe) + break; + + io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); + sqe->user_data = (uint64_t)(uintptr_t)t; + submit_cnt++; + } + + io_uring_submit(&ctx->ring); + + // 3. 等全部完成 + for (int i = 0; i < submit_cnt; ++i) + { + struct io_uring_cqe *cqe; + io_uring_wait_cqe(&ctx->ring, &cqe); + + task_t *done = (task_t *)(uintptr_t)cqe->user_data; + task_finish(done, cqe->res); + + io_uring_cqe_seen(&ctx->ring, cqe); + } + } + + return NULL; +} + +int iouring_register_fd(iouring_ctx_t *ctx, int fd) { + int fds[1] = {fd}; + int ret = io_uring_register_files(&ctx->ring, fds, 1); + return ret; +} + +int iouring_init(iouring_ctx_t *ctx, unsigned entries) +{ + memset(ctx, 0, sizeof(*ctx)); + pthread_mutex_init(&ctx->q_m, NULL); + pthread_cond_init(&ctx->q_cv, NULL); + + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); + if (ret < 0) + return ret; + + ret = pthread_create(&ctx->th, NULL, worker_main, ctx); + if (ret != 0) + { + io_uring_queue_exit(&ctx->ring); + return -ret; + } + return 0; +} + +void iouring_shutdown(iouring_ctx_t *ctx) +{ + pthread_mutex_lock(&ctx->q_m); + ctx->stop = 1; + pthread_cond_broadcast(&ctx->q_cv); + pthread_mutex_unlock(&ctx->q_m); + + pthread_join(ctx->th, NULL); + io_uring_queue_exit(&ctx->ring); + + pthread_mutex_destroy(&ctx->q_m); + pthread_cond_destroy(&ctx->q_cv); +} + +task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){ + if (!bufs || !lens || count <= 0) return NULL; + + task_t *t = (task_t *)kvs_malloc(sizeof(task_t)); + task_init(t); + t->op = TASK_WRITE; + t->fd = fd; + t->off = off; + + t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count); + if(!t->iovs) { + kvs_free(t); + return NULL; + } + + for(int i = 0;i < count; ++ i){ + size_t len = lens[i]; + void *buf = kvs_malloc(len); + if(!buf){ + for(int j = 0; j < i; ++j){ + if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base); + } + kvs_free(t->iovs); + kvs_free(t); + return NULL; + } + memcpy(buf, bufs[i], len); + t->iovs[i].iov_base = buf; + t->iovs[i].iov_len = len; + } + + t->iovcnt = count; + + queue_push(ctx, t); + return t; +} diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h new file mode 100644 index 0000000..29ba85b --- /dev/null +++ b/diskuring/diskuring.h @@ -0,0 +1,59 @@ +#ifndef __DISK_IOURING_H__ +#define __DISK_IOURING_H__ + +#include +#include +#include +#include +#include +#include +#include +#include + + +typedef enum { TASK_READ, TASK_WRITE } task_op_t; + +typedef struct task { + task_op_t op; + int fd; + off_t off; + + int res; // cqe->res + int done; // 0/1 + + struct iovec *iovs; // iovec 数组 + int iovcnt; // iovec 数量 + + pthread_mutex_t m; + pthread_cond_t cv; + + struct task *next; +} task_t; + +typedef struct { + struct io_uring ring; + pthread_t th; + + pthread_mutex_t q_m; + pthread_cond_t q_cv; + task_t *q_head, *q_tail; + + int stop; +} iouring_ctx_t; + +int iouring_register_fd(iouring_ctx_t *ctx, int fd); + +void task_init(task_t *t); +void task_finish(task_t *t, int res); + +int task_wait(task_t *t); +void task_destroy(task_t *t); + +int iouring_init(iouring_ctx_t *ctx, unsigned entries); +void iouring_shutdown(iouring_ctx_t *ctx); + +task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); + +extern iouring_ctx_t global_uring_ctx; + +#endif \ No newline at end of file diff --git a/kvs_array_bin.c b/kvs_array_bin.c index 0d5d9a8..dfc50ac 100644 --- a/kvs_array_bin.c +++ b/kvs_array_bin.c @@ -3,8 +3,7 @@ #include "kvstore.h" #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" -#include - +#include "diskuring/diskuring.h" // singleton @@ -194,28 +193,62 @@ int kvs_array_exist_bin(kvs_array_t *inst, const void *key, uint32_t key_len) { // return: 0 success, <0 error int kvs_array_save(kvs_array_t *inst, const char* filename){ if(!inst || !filename) return -1; - FILE *fp = fopen(filename, "wb"); - if(!fp) return -2; + int fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if(fd < 0) return -2; + + off_t current_off = 0; for(int i = 0;i < inst->total; ++ i){ kvs_array_item_t *it = &inst->table[i]; if(!it->key || it->key_len == 0) continue; // 跳过空槽 - if(it->value_len > 0 && !it->value) { fclose(fp); return -3; } + if(it->value_len > 0 && !it->value) { close(fd); return -3; } uint32_t klen = htonl(it->key_len); uint32_t vlen = htonl(it->value_len); - if (kvs_write_file(fp, &klen, 4) < 0) { fclose(fp); return -4; } - if (kvs_write_file(fp, &vlen, 4) < 0) { fclose(fp); return -4; } + void *bufs[4]; + size_t lens[4]; + int count = 0; - if (kvs_write_file(fp, it->key, it->key_len) < 0) { fclose(fp); return -4; } - if (it->value_len > 0) { - if (kvs_write_file(fp, it->value, it->value_len) < 0) { fclose(fp); return -4; } + bufs[count] = &klen; + lens[count] = sizeof(klen); + count++; + + bufs[count] = &vlen; + lens[count] = sizeof(vlen); + count++; + + if (it->key_len > 0){ + bufs[count] = it->key; + lens[count] = it->key_len; + count++; } + + if (it->value_len > 0) { + bufs[count] = it->value; + lens[count] = it->value_len; + count++; + } + + size_t total = 0; + for (int i = 0; i < count; i++) total += lens[i]; + + task_t *t = submit_write(&global_uring_ctx, fd, bufs, lens, count, current_off); + + if (!t) { close(fd); return -4; } + int res = task_wait(t); + task_destroy(t); + + if (res < 0) { + close(fd); + return -5; + } + + current_off += (off_t) total; } - fclose(fp); + close(fd); return 0; } diff --git a/kvs_hash.o b/kvs_hash.o deleted file mode 100644 index 1c85d5c303b860335c7b1332b03ff31c6730b383..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 13600 zcmb_i4RBP~b-r&`SK39=Nr{jBTwXf-zt)YlvVm7-R><_=ha3-6v_yYFF&O zCw^>df`}7Y(;792ja_U*8rSJK!A_@5sKG?G(+FyhD?4pWr*6a4xFbf)K#bcGn}nWo z@40W^UY1Fx(>t?z_k8E)p1*tFzPEa`b=@kDVK6lec00>c3T5m}Q+^&6(=eOF>e!{K z8K1UC20n@m>>eAi&qn%pk1@OK=v5fwt4CTdqv%NM6}*g39c;Z4iH}AGTgM{CXw^?x z)rn!F^>Tmf6+`Fj8@AsYY`y&SfA2m!FuM6oXO;f*&9Ktx$bdcivngP63=KyHz@S)o z!cnBpRYL@@3W$wJf&O8`nGdvH1mTO!CgBg!f?N2H+YF#DoF_OTL?F_>1Mb>V-3 z`DRh0{@tfp)xP~8G17V-1?eRo2HHrReX5`Qo9`oDXj9e{>9UL3y-m=|GoSzHIh z;40A9DnJcTPP*yT8S8qYzx6y;@bqBVx48BEBSWOE@;z?sFYq?3Iw~^IeiG_)nppw3 zj54bXE+?^3u$|An4IzN3(GwzmT=I8UuMV`HC-X(4X#^&C4|?sRk%3<+7er2Rn~m?1 z0u-j`%eO)JBrfDqpoA33S~W;w!SSlIF-KHNNJfQhs?!5)71x$52uT=&ZFL4h9+Q!u z!HtY{8}9)-XPmJ8S1{~P0E32yB7UM5@$z`HSXhL8#uWvGJwbB9zVRtE!va$;p1Rhr zBE`X~j5<}p(+Vu*tMbrif3PZZK%r<=tVxpNF2^sSUtmRXU|`A4{R#XbF`p_Z&?lXF z(Jfzg^!IosZY+?z&Nk1R65AY`0m|s!=HNYU(l(!-G2GW64b?6{NK}rj+{R|4dmMN@AgZ5avEf+$xA_Nh}h? z6iF-=L{Jh-1aX5TZWBbaB$f(dktCK0A}oopAR>~uLlA9}SRsf_lK67{GVpVYBv#d} z1fp9K5#e>OB-RMRlEj_mIS65gB<`Bn2E-motgpHkh`o~7;9CRM`XzC9)qOyGOA_~0 zJ_=&{CGo)I1aJ;W;=zUt5QikOS-A59Nqo(}8gyQfM07$U5Jx4kMff%>i4LC)27WG! z#LF6<1-S|EVI?;O%JEw2TLc27K4es2!O*Wt><&bmfgrJHe)@AoIm9lA5kxN}i{aS$?!@T6C zAtim!2QDft-yvaKSnwUL!N!#Hzi`PMamlT8JB`M_Y7^S!)T04 zH)AxOp|(ka`=25n&rmn4i}t0}kpioO;$Q(&2QN5QM`oZ^@6gdlrP+}i(X4mql}GE7 zi5<-Ndl8doXq{ZyfDcht&-Z^j7>p? z@n(U4VhMx~&g3fUH!13Fis*2G#(XED=N*m1A|lUFL9;nMwWg~4qr3~rkioLv~)_QK%4?yTM*qPSPwDR6oO|G+Q~{?=Lf-Y>|&BK1jT zJfI9ag^o%Mcw$7o%~a@oBiu%O*$A;}rxB>GYYIYF){KKU!Cg#-zJ@ZZk~OyyZK%=e zBL5IC4h5kaZ4ISenpTEiPCFPGW|0RBpsu|=RfA#WY- z2(`dn3Z<_wj5W0=UpW)(G=W@TflG0D0}55mgovuapig;+25SlhJ+P}MhK%J6Du%fY zYicpH@RTQ4G~g1o++`WX>i}>}A~4Bq5`lUE3M3|%L*gbvm8TS6>M6NYLm{`ojj16> zl?){BBIrSkH)G}CLubF3;UD+cF9(Flzd#duSsApSVSE;-gumil3(`$?F58q!b~JUx z<4rpjEsQQ)(3nc5?H!F>X}f80Q!*V-*$I;gIU=r=>quI;|Ak1CwX@eO6l82`UoP4m z%XNq1=yJ|VB+SlGH$G4aX_!K&IV)?&t>~&X>skSTaPl+INM-s9^*Fyh(9C!wu7xkox#F6q*Q`fi#`vbh4--M1cfOTNwd96rJ3Ro~;Dy?pvUs>J7H2I?SxSSC>ltiyeLPBX7O#toY{HN$dH zcA1M}%W+#42O>ViOD)dNa&Wrr7)D%ol6(7b4t1DthzAaHUq7#D;*mm!RdXx^msmpEiJqnI}b7~iV3;xgXDNT zu$j~5_Gmf-n{@U1_Tb8vTZ4;33qsAo#vaQ~o6EY)w3$uDfsAG2-OFH)H!fTdZ0rg) zcE)l$)A8yeswl@G-0BRoh%y9nla0iKI5JR2eU#x7wa>lW;(%g-I=Tv#OJ_V za+evkSW|bV$Ap6_mTt;*$FgPoJAR?b#%( z)oZ~DovBz?&hZBh96WbKz|kBNhY*WGmbRMN9P5jvY%tK7HO;JGu~%0p(Fk{8t761flJDqvZ$ykX41(*JY!=FAOkIt z5T#e8v(t9YOw7+Sv4o<{Q5>uYY2l$`E@7rvbbAuQPQm&}lGm1%DxB%KR+L=t5=?qk%JC;m~W_4u+lR*(epFqA* z54A7=*j;unT%Znk3rkBB!jh#Cy(*(u>2}g2CHiFNs-8y=^N|B3Hj}iUzb}d#XFmV9 zTVVB!ZLzeuLwbg-22QfoD_1TF-q79w6|&`#(%51Ps^yR7H;0-RgyzqaxOgdq@A%;n z0}mm*V-Lgq7;n^K)X~-0bq_OkR2$PO%6$9b4nG}m{NNDZS6c(sk9k&BmU=dWgyb!i zY$m=vE53KBfdZ)eAo0cbamDw1)zN>1_~L=V;lJSU4-#KI$~gQh4*xju58&0|KjH8{ zAij9cQ+)3{$GLXNaph^k*&DCN& zZ_!`YuvV&Cg<4)w-9ikXG~v$Ufa+NT0k7x+hviJ*3#c=9PrUL4h8q%duY7$}1h2RT zL%=Ivyh`w?U(X&!rX$11*L+`Qcvf zzRMPT!-%e4w_(K{>!KT0t@=vq#^}a7R;+7{iX*TmmV(0$A1>e!XFX<*JPxzzI2`$C z8t?x3!%}{2$T!6_$q4iE2L4k5oCusswerVsZV z{N^RbVHXZhtPcJo7w(pS$A!D)KXl=EMl1UlT)10a^#S^WHB<7_sjm=s%irL_-To|h z;q@*%_qcGkoh}#d_A}?g-SPu2-0kN-xNx^W1Jvi}zuTXeuY=F0K1X?c_oDI;cj35C z6h03HVd(NFuY-H3|B>zXr^B6UhYbyU_M%+ClgUc;wSyua0Owa-h5uliwx?AK~I{$PY7sNuD= z{~I;@$7E-ThW|pEVXHNK9nl*#yr1mEH2h)0(;EJdgg>m|i^>0QY54Dm{;r1q7uorq zhW~)D-93Oy2_raO9SB-HTegKR&gl#xg@W?7b*PD37<^s zEBv2`o~_}HRNod2UrV^ktFrSf$!*c(KcIEDYB(o5k7@YpB!57|my`VWHT)Rm;S~*k zjr#3<4WCJNKGN_fXx;M~zLRK!_$uBSTDMZe&k?TbrsPi(K3kK2hwQg#_!}g@Ov9fd zT%E7Vek<8&)8y+2e@MgI$Nlr_gM`;MD~9}IFjzCrB5~aACkQ4J0vkA^}T#L#jX6rpYm}m(QuWwHVs$(@c`k< zzIqp%%U#>7d836B0oRV}L-8Kc@V5wmiE!nQdQN*olUM#+&~UZxRSlo7@<@4A_SJLa zOv3fNE!S`r@7FY3t=psFva1-|t>MoTV;|wlA9arZnI^B|{i!B@3+4HwCa?T?Uz5Lw zIEGZfAhG?&lU|= z`QM}As!tASxa#L&4Ojc9uwYHgwpVHPzYY}!p}kQ0-I>~sW=3`$Ml+6YbV(;yW!;t{si;?GJ;?QQ{(O# zG#ti1FuJJUBqe3l9IyB+h(}czKJGE)^OA~Cf8w(0#lZJ_IOf0yaS}sFj2E?a z6;5STr1-%eZT?^N7lN*C6OF6&FKK@0vMnH6EdDLC&3Vfax{? zV$aZ*h3{%ijb!cD(Ql#$?PfIQ8oGWP@QV3AhX0KSL+PXc=)3NJ5-{B+KrBk@|5_`I zE}H<^V(U+#2YJ;exIUic`ua9tXjAzKQ!#F*7YbiRD7uZEA(S9K}2HmWSbg8>KCAK!UlOY8dhUPx;ZKtO+nxG?oTK~nkrD;nFZn0~NH NN*q&ybk)+q{s+7WBs%~A diff --git a/kvs_hash_bin.c b/kvs_hash_bin.c index cb89ebb..dbfddda 100755 --- a/kvs_hash_bin.c +++ b/kvs_hash_bin.c @@ -1,17 +1,13 @@ -#include -#include -#include -#include + #include "kvstore.h" #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" -#include - +#include "diskuring/diskuring.h" // Key, Value --> // Modify @@ -269,30 +265,64 @@ int kvs_hash_exist_bin(kvs_hash_t *hash, const void *key, uint32_t key_len) { // 0 suc, <0 error int kvs_hash_save(kvs_hash_t *inst, const char* filename){ if(!inst || !filename) return -1; - FILE *fp = fopen(filename, "wb"); - if(!fp) return -2; + int fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if(fd < 0) return -2; + off_t current_off = 0; for(int i = 0;i < inst->max_slots; ++ i){ for (hashnode_t *n = inst->nodes[i]; n != NULL; n = n->next) { if (!n->key || n->key_len == 0) continue; - if (n->value_len > 0 && !n->value) { fclose(fp); return -3; } + if (n->value_len > 0 && !n->value) { close(fd); return -3; } uint32_t klen = htonl((uint32_t)n->key_len); uint32_t vlen = htonl((uint32_t)n->value_len); - if (kvs_write_file(fp, &klen, 4) < 0) { fclose(fp); return -4; } - if (kvs_write_file(fp, &vlen, 4) < 0) { fclose(fp); return -4; } + void *bufs[4]; + size_t lens[4]; + int count = 0; - if (kvs_write_file(fp, n->key, n->key_len) < 0) { fclose(fp); return -4; } - if (n->value_len > 0) { - if (kvs_write_file(fp, n->value, n->value_len) < 0) { fclose(fp); return -4; } + bufs[count] = &klen; + lens[count] = sizeof(klen); + count++; + + bufs[count] = &vlen; + lens[count] = sizeof(vlen); + count++; + + if (n->key_len > 0){ + bufs[count] = n->key; + lens[count] = n->key_len; + count++; } + + if (n->value_len > 0) { + bufs[count] = n->value; + lens[count] = n->value_len; + count++; + } + + size_t total = 0; + for (int i = 0; i < count; i++) total += lens[i]; + + + task_t *t = submit_write(&global_uring_ctx, fd, bufs, lens, count, current_off); + + if (!t) { close(fd); return -4; } + int res = task_wait(t); + task_destroy(t); + + if (res < 0) { + close(fd); + return -5; + } + + current_off += (off_t) total; } } - fclose(fp); + close(fd); return 0; } diff --git a/kvs_inc_log.c b/kvs_oplog.c similarity index 70% rename from kvs_inc_log.c rename to kvs_oplog.c index 5f559d9..48601d9 100644 --- a/kvs_inc_log.c +++ b/kvs_oplog.c @@ -2,15 +2,21 @@ #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" #include "kvs_protocol_resp.h" +#include "diskuring/diskuring.h" #include #include #include +int global_cmd_log_fd = -1; +static off_t g_log_off = -1; + int init_cmd_log(const char *file, int *logfd){ if(!file) return -1; - int fd = open(file, O_RDWR | O_CREAT | O_APPEND, 0644); + int fd = open(file, O_RDWR | O_CREAT , 0644); if(fd < 0) return -2; + + g_log_off = lseek(fd, 0, SEEK_END); *logfd = fd; return 0; } @@ -18,29 +24,48 @@ int init_cmd_log(const char *file, int *logfd){ int destroy_cmd_log(int logfd){ fsync(logfd); close(logfd); + global_cmd_log_fd = -1; return 0; } -int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd){ - if (logfd < 0 || !cmd || len == 0) - return -1; +int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd){ + if (logfd < 0 || !cmd || len == 0) return -1; + if (len > UINT32_MAX) return -2; - if (len > UINT32_MAX) - return -2; uint32_t nlen = htonl((uint32_t)len); - if (write_full(logfd, &nlen, sizeof(nlen)) < 0) - return -3; + void *bufs[2]; + size_t lens[2]; - if (write_full(logfd, cmd, len) < 0) + bufs[0] = (void *)&nlen; + lens[0] = sizeof(nlen); + + bufs[1] = (void *)cmd; + lens[1] = len; + + size_t total = sizeof(nlen) + len; + + off_t myoff = g_log_off; + g_log_off += (off_t)total; + + task_t *t = submit_write(&global_uring_ctx, logfd, bufs, lens, 2, myoff); + if (!t) { return -4; + } + + task_wait(t); + task_destroy(t); return 0; } -int kvs_replay_log(const char *logfile, int logfd){ - if (!logfile|| logfd<0) return -1; +int kvs_replay_log(int logfd){ + if (logfd < 0) return -1; + + if (lseek(logfd, 0, SEEK_SET) < 0) { + return -1; + } for (;;) { uint32_t nlen = 0; @@ -90,6 +115,8 @@ int kvs_replay_log(const char *logfile, int logfd){ kvs_free(cmd_bytes); } + g_log_off = lseek(logfd, 0, SEEK_CUR); + return 0; } @@ -100,5 +127,6 @@ int ksv_clear_log(int logfd){ if(logfd < 0) return -1; ftruncate(logfd, 0); lseek(logfd, 0, SEEK_SET); + g_log_off = 0; return 0; } \ No newline at end of file diff --git a/kvs_inc_log.h b/kvs_oplog.h similarity index 62% rename from kvs_inc_log.h rename to kvs_oplog.h index e0833e7..19ca6f2 100644 --- a/kvs_inc_log.h +++ b/kvs_oplog.h @@ -7,8 +7,8 @@ int init_cmd_log(const char *file, int *logfd); int destroy_cmd_log(int logfd); -int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); -int kvs_replay_log(const char *logfile, int logfd); +int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd); +int kvs_replay_log(int logfd); int ksv_clear_log(int logfd); #endif \ No newline at end of file diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index d3fb400..37a90ba 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -432,6 +432,7 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { cmd->argv[1].ptr, cmd->argv[1].len, cmd->argv[2].ptr, cmd->argv[2].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } + else if (r == 1) { *out_value = resp_error("ERR key has exist"); return 0; } *out_value = resp_simple("OK"); return 0; } @@ -485,6 +486,7 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { cmd->argv[1].ptr, cmd->argv[1].len, cmd->argv[2].ptr, cmd->argv[2].len); if (r < 0) { *out_value = resp_error("ERR internal error"); return 0; } + else if (r == 1) { *out_value = resp_error("ERR key has exist"); return 0; } *out_value = resp_simple("OK"); return 0; } diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index cb13357..4cf35f0 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -1,15 +1,10 @@ -#include -#include -#include - - #include "kvstore.h" #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" -#include +#include "diskuring/diskuring.h" int kvs_keycmp(const uint8_t *a, uint32_t alen, const uint8_t *b, uint32_t blen) { @@ -469,31 +464,59 @@ int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len) { return 0; } -static int kvs_rbtree_save_node(FILE *fp, kvs_rbtree_t *inst, rbtree_node *node) { - if (!fp || !inst || !node) return -1; +static int kvs_rbtree_save_node(int fd, off_t *current_off, kvs_rbtree_t *inst, rbtree_node *node) { + if (!current_off || !inst || !node) return -1; if (node == inst->nil) return 0; int rc = 0; - rc = kvs_rbtree_save_node(fp, inst, node->left); + rc = kvs_rbtree_save_node(fd, current_off, inst, node->left); if (rc < 0) return rc; - uint32_t klen_n = htonl(node->key_len); - uint32_t vlen_n = htonl(node->value_len); + uint32_t klen = htonl(node->key_len); + uint32_t vlen = htonl(node->value_len); - if (kvs_write_file(fp, &klen_n, sizeof(klen_n)) < 0) return -1; - if (kvs_write_file(fp, &vlen_n, sizeof(vlen_n)) < 0) return -1; + void *bufs[4]; + size_t lens[4]; + int count = 0; - if (node->key_len) { - if (!node->key) return -1; - if (kvs_write_file(fp, node->key, node->key_len) < 0) return -1; - } - if (node->value_len) { - if (!node->value) return -1; - if (kvs_write_file(fp, node->value, node->value_len) < 0) return -1; - } + bufs[count] = &klen; + lens[count] = sizeof(klen); + count++; - rc = kvs_rbtree_save_node(fp, inst, node->right); + bufs[count] = &vlen; + lens[count] = sizeof(vlen); + count++; + + if (node->key_len > 0) { + bufs[count] = node->key; + lens[count] = node->key_len; + count++; + } + + if (node->value_len > 0) { + bufs[count] = node->value; + lens[count] = node->value_len; + count++; + } + + size_t total = 0; + for (int i = 0; i < count; i++) total += lens[i]; + + task_t *t = submit_write(&global_uring_ctx, fd, bufs, lens, count, *current_off); + + + if (!t) { return -4; } + int res = task_wait(t); + task_destroy(t); + + if (res < 0) { + return -5; + } + + *current_off += (off_t) total; + + rc = kvs_rbtree_save_node(fd, current_off, inst, node->right); if (rc < 0) return rc; return 0; @@ -503,14 +526,14 @@ static int kvs_rbtree_save_node(FILE *fp, kvs_rbtree_t *inst, rbtree_node *node) int kvs_rbtree_save(kvs_rbtree_t *inst, const char* filename){ if (!inst || !filename) return -1; - FILE *fp = fopen(filename, "wb"); - if (!fp) return -2; + int fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if(fd < 0) return -2; + off_t current_off = 0; - int rc = kvs_rbtree_save_node(fp, inst, inst->root); - if (fflush(fp) != 0) rc = -3; + int rc = kvs_rbtree_save_node(fd, ¤t_off, inst, inst->root); - fclose(fp); + close(fd); return rc; } diff --git a/kvs_rw_tools.c b/kvs_rw_tools.c index a7e5a4c..dd72283 100644 --- a/kvs_rw_tools.c +++ b/kvs_rw_tools.c @@ -1,6 +1,6 @@ #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" -#include "kvs_inc_log.h" +#include "kvs_oplog.h" #include #include #include @@ -62,67 +62,6 @@ int read_full(int fd, void *buf, size_t n) return 1; } -// 0 suc, -1 err -int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) { - return (p + n <= end) ? 0 : -1; -} - -// 注意u8类型不需要ntoh或者hton -// 0 suc, -1 err -int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) { - const uint8_t *p = *pp; - if (kvs_need(p, end, 1) < 0) return -1; - *out = *p; - *pp = p + 1; - return 0; -} - -int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out) { - const uint8_t *p = *pp; - if (kvs_need(p, end, 2) < 0) return -1; - uint16_t v; - memcpy(&v, p, 2); - *out = ntohs(v); - *pp = p + 2; - return 0; -} - -int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) { - const uint8_t *p = *pp; - if (kvs_need(p, end, 4) < 0) return -1; - uint32_t v; - memcpy(&v, p, 4); - *out = ntohl(v); - *pp = p + 4; - return 0; -} - -int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v) { - uint8_t *p = *pp; - if (kvs_need(p, end, 1) < 0) return -1; - *p = v; - *pp = p + 1; - return 0; -} - -int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v) { - uint8_t *p = *pp; - if (kvs_need(p, end, 2) < 0) return -1; - uint16_t be = htons(v); - memcpy(p, &be, 2); - *pp = p + 2; - return 0; -} - -int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) { - uint8_t *p = *pp; - if (kvs_need(p, end, 4) < 0) return -1; - uint32_t be = htonl(v); - memcpy(p, &be, 4); - *pp = p + 4; - return 0; -} - // -1 err, 0 suc int kvs_write_file(FILE *fp, const void *buf, size_t n) { const uint8_t *p = (const uint8_t *)buf; @@ -147,320 +86,3 @@ int kvs_read_file(FILE *fp, void *buf, size_t n){ return 0; } - - -// // return: -1 fail, 0 half, >0 consumed -// int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){ -// if (!request || request_length <= 0 || !req_out) return -1; - -// req_out->op = KVS_CMD_COUNT; -// req_out->argc = 0; -// req_out->args = NULL; - -// const uint8_t *p = request; -// const uint8_t *end = request + (size_t)request_length; - -// // OP + ARGC -// if (kvs_need(p, end, 2)) { -// return 0; // NEED_MORE -// } - -// uint8_t op = 0, argc = 0; -// if (kvs_read_u8(&p, end, &op) < 0) return -1; -// if (kvs_read_u8(&p, end, &argc) < 0) return -1; - -// if (argc > KVS_MAX_ARGC) return -1; - -// // 先扫描一遍确认整条命令数据都在 buffer 里 -// const uint8_t *scan = p; -// uint32_t lens[KVS_MAX_ARGC]; -// if (argc > 0) { -// for (uint8_t i = 0; i < argc; i++) { -// if (kvs_need(scan, end, 4)) { -// return 0; // NEED_MORE -// } -// uint32_t alen = 0; -// if (kvs_read_u32(&scan, end, &alen) < 0) return -1; - -// // 防御:单个参数长度限制 -// if (alen > KVS_MAX_ARGLEN) return -1; - -// // 防御:scan + alen 越界 / 半包 -// if (kvs_need(scan, end, (size_t)alen)) { -// return 0; // NEED_MORE -// } -// lens[i] = alen; -// scan += alen; -// } -// } - -// size_t total_len = (size_t)(scan - request); -// if (total_len > KVS_MAX_CMD_BYTES) return -1; - -// req_out->op = op; -// req_out->argc = argc; - -// if (argc == 0) { -// return (int)total_len; -// } - -// kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t)); -// if (!args) { -// kvs_free_request(req_out); -// return -1; -// } -// memset(args, 0, (size_t)argc * sizeof(kvs_arg_t)); - -// for (uint8_t i = 0; i < argc; i++) { -// uint32_t alen = 0; -// if (kvs_read_u32(&p, end, &alen) < 0) { -// kvs_free(args); -// kvs_free_request(req_out); -// return -1; -// } - -// // alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常 -// if (alen != lens[i]) { -// kvs_free(args); -// kvs_free_request(req_out); -// return -1; -// } - -// args[i].len = alen; -// args[i].data = p; // 直接指向输入 buffer(零拷贝) -// p += alen; -// } - - -// req_out->args = args; - -// return (int)(p - request); -// } - -// void kvs_free_request(kvs_req_t *req) { -// if (!req) return; -// if (req->args) { -// kvs_free(req->args); -// req->args = NULL; -// } -// req->op = KVS_CMD_COUNT; -// req->argc = 0; -// } - -// /** -// * 输入:req -// * 输出:rsp -// * 返回:-1 失败,参数错误,0 成功 -// */ -// int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) { -// if(!req || !rsp_out) return -1; -// rsp_out->op = req->op; -// rsp_out->status = KVS_STATUS_ERROR; -// rsp_out->data = NULL; -// rsp_out->dlen = 0; - -// int argc = req->argc; -// kvs_cmd_t op = req->op; -// kvs_arg_t *argv = req->args; - -// uint32_t key_len = 0; -// const void *key = NULL; -// uint32_t value_len = 0; -// const void *val = NULL; - -// if(argc == 1){ -// key_len = argv[0].len; -// key = argv[0].data; -// }else if(argc == 2){ -// key_len = argv[0].len; -// key = argv[0].data; -// value_len = argv[1].len; -// val = argv[1].data; -// } - -// // 基本参数校验(按你原有命令语义) -// switch (op) { -// case KVS_CMD_SET: -// case KVS_CMD_MOD: -// case KVS_CMD_RSET: -// case KVS_CMD_RMOD: -// case KVS_CMD_HSET: -// case KVS_CMD_HMOD: -// if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } -// break; -// case KVS_CMD_GET: -// case KVS_CMD_DEL: -// case KVS_CMD_EXIST: -// case KVS_CMD_RGET: -// case KVS_CMD_RDEL: -// case KVS_CMD_REXIST: -// case KVS_CMD_HGET: -// case KVS_CMD_HDEL: -// case KVS_CMD_HEXIST: -// case KVS_CMD_PSYNC: -// if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } -// break; -// case KVS_CMD_SAVE: -// if(argc != 0) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } -// break; -// default: -// rsp_out->status = KVS_STATUS_BADREQ; -// return -1; -// } - -// int ret = 0; -// const char *result = NULL; - -// switch (op) { -// #if ENABLE_ARRAY -// case KVS_CMD_SET: -// ret = kvs_array_set_bin(&global_array, key, key_len, val, value_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_EXIST; -// return 0; - -// case KVS_CMD_GET: -// result = kvs_array_get_bin(&global_array, key, key_len, &value_len); -// if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } -// rsp_out->status = KVS_STATUS_OK; -// rsp_out->data = (uint8_t*)result; -// rsp_out->dlen = (uint32_t)value_len; -// return 0; - -// case KVS_CMD_DEL: -// ret = kvs_array_del_bin(&global_array, key, key_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_NO_EXIST; -// return 0; - -// case KVS_CMD_MOD: -// ret = kvs_array_mod_bin(&global_array, key, key_len, val, value_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_NO_EXIST; -// return 0; - -// case KVS_CMD_EXIST: -// ret = kvs_array_exist_bin(&global_array, key, key_len); -// rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; -// return 0; -// #endif - -// #if ENABLE_RBTREE -// case KVS_CMD_RSET: -// ret = kvs_rbtree_set(&global_rbtree, key, key_len, val, value_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_EXIST; -// return 0; - -// case KVS_CMD_RGET: -// result = kvs_rbtree_get(&global_rbtree, key, key_len, &value_len); -// if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } -// rsp_out->status = KVS_STATUS_OK; -// rsp_out->data = (uint8_t*)result; -// rsp_out->dlen = (uint32_t)value_len; -// return 0; - -// case KVS_CMD_RDEL: -// ret = kvs_rbtree_del(&global_rbtree, key, key_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_NO_EXIST; -// return 0; - -// case KVS_CMD_RMOD: -// ret = kvs_rbtree_mod(&global_rbtree, key, key_len, val, value_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_NO_EXIST; -// return 0; - -// case KVS_CMD_REXIST: -// ret = kvs_rbtree_exist(&global_rbtree, key, key_len); -// rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; -// return 0; -// #endif - -// #if ENABLE_HASH -// case KVS_CMD_HSET: -// ret = kvs_hash_set_bin(&global_hash, key, key_len, val, value_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_EXIST; -// return 0; - -// case KVS_CMD_HGET: -// result = kvs_hash_get_bin(&global_hash, key, key_len, &value_len); -// if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } -// rsp_out->status = KVS_STATUS_OK; -// rsp_out->data = (uint8_t*)result; -// rsp_out->dlen = (uint32_t)value_len; -// return 0; - -// case KVS_CMD_HDEL: -// ret = kvs_hash_del_bin(&global_hash, key, key_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_NO_EXIST; -// return 0; - -// case KVS_CMD_HMOD: -// ret = kvs_hash_mod_bin(&global_hash, key, key_len, val, value_len); -// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; -// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_NO_EXIST; -// return 0; - -// case KVS_CMD_HEXIST: -// ret = kvs_hash_exist_bin(&global_hash, key, key_len); -// rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; -// return 0; -// #endif -// case KVS_CMD_SAVE: -// ret = kvs_save_to_file(); -// if(ret == 0) rsp_out->status = KVS_STATUS_OK; -// else rsp_out->status = KVS_STATUS_ERROR; -// return 0; -// case KVS_CMD_PSYNC: -// rsp_out->op = req->op; -// rsp_out->status = KVS_STATUS_OK; -// return 0; -// default: -// rsp_out->status = KVS_STATUS_BADREQ; -// return -1; -// } - -// return -1; -// } - -// /** -// * 构建单条响应 -// * 返回:-1 失败,>=0 响应长度 -// */ -// int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){ -// if (!results || !response) return -1; - -// const uint8_t *end = response + response_cap; -// uint8_t *p = response; - -// // 计算所需长度:1 + 1 + 4 + dlen -// // 注意防止 size_t 溢出 -// size_t need = 1u + 1u + 4u + (size_t)results->dlen; -// if (need > response_cap) return -1; - -// if (kvs_write_u8(&p, end, (uint8_t)results->op) < 0) return -1; -// if (kvs_write_u8(&p, end, results->status) < 0) return -1; -// if (kvs_write_u32(&p, end, results->dlen) < 0) return -1; - -// if (results->dlen > 0) { -// if (!results->data) return -1; // 有长度却没指针,视为错误 -// if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1; -// memcpy(p, results->data, results->dlen); -// p += results->dlen; -// } - -// return (int)(p - response); -// } \ No newline at end of file diff --git a/kvs_rw_tools.h b/kvs_rw_tools.h index 1b23468..1f8cfe4 100644 --- a/kvs_rw_tools.h +++ b/kvs_rw_tools.h @@ -3,15 +3,10 @@ #include #include -// -int kvs_need(const uint8_t *p, const uint8_t *end, size_t n); -int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out); -int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out); -int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out); - -int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v); -int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v); -int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v); +#include +#include +#include +#include int kvs_write_file(FILE *fp, const void *buf, size_t n); int kvs_read_file(FILE *fp, void *buf, size_t n); @@ -90,7 +85,7 @@ int read_full(int fd, void *buf, size_t n); // int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out); // int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap); -// int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); +// int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd); // int kvs_replay_log(const char *logfile, int logfd); // int ksv_clear_log(int logfd); diff --git a/kvstore.c b/kvstore.c index 2255aa3..61e371c 100644 --- a/kvstore.c +++ b/kvstore.c @@ -4,9 +4,11 @@ #include "kvstore.h" #include "kvs_rw_tools.h" #include "kvs_protocol_resp.h" -#include "kvs_inc_log.h" +#include "kvs_oplog.h" #include "mem_pool/mem_pool.h" #include "common/config.h" +#include "diskuring/diskuring.h" + #include #include #include @@ -35,111 +37,14 @@ extern mp_pool_t global_mempool; AppConfig global_cfg; -int global_cmd_log_fd = -1; +extern int global_cmd_log_fd; +extern iouring_ctx_t global_uring_ctx; char global_oplog_file[256] = "kvs_oplog.default.db"; char global_array_file[256] = "kvs_array.default.db"; char global_rbtree_file[256] = "kvs_rbtree.default.db"; char global_hash_file[256] = "kvs_hash.default.db"; -int is_update_cmd(kvs_cmd_t op){ - if(op == KVS_CMD_SET || op == KVS_CMD_RSET || op == KVS_CMD_HSET - || op == KVS_CMD_MOD || op == KVS_CMD_RMOD || op == KVS_CMD_HMOD - || op == KVS_CMD_DEL || op == KVS_CMD_RDEL || op == KVS_CMD_HDEL){ - return 1; - } - return 0; -} - -/** - * input : request request_length - * output : response response_length - * return : -1 error, =0 半包, 1 成功 - */ - - -// int kvs_protocol(char *request, int request_length, char *response, int *response_length){ -// int kvs_protocol(struct conn* conn){ -// if (!conn) return -1; -// char *request = conn->rbuffer; -// int request_length = conn->rlength; -// char *response = conn->wbuffer; -// int *response_length = &conn->wlength; - -// if (!request || request_length <= 0 || !response || !response_length) return -1; -// int consumed = 0; -// int out_len = 0; - -// static int i = 0; -// while(consumed < request_length ){ -// if(i > 33){ -// i = i+1; -// i = i-1; -// } -// if(i == 47) i = 0; -// ++i; - -// kvs_req_t req; -// memset(&req, 0, sizeof(kvs_req_t)); - -// const uint8_t *p = request+consumed; -// int remain = request_length - consumed; - -// int len = kvs_parse_one_cmd(p, remain, &req); -// if(len < 0){ -// // 解析失败 -// kvs_free_request(&req); -// *response_length = out_len; -// return -1; -// } -// else if(len == 0){ -// // 半包 -// kvs_free_request(&req); -// break; -// } - -// kvs_rsp_t rsp; -// memset(&rsp, 0, sizeof(kvs_rsp_t)); - -// // 执行失败 -// if (kvs_execute_one_cmd(&req, &rsp) < 0){ -// kvs_free_request(&req); -// *response_length = out_len; -// return -1; -// }else{ -// // 执行成功,在这里保存到日志中。 -// if(rsp.status == KVS_STATUS_OK){ -// if(is_update_cmd(req.op)){ -// kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); -// } -// } -// } - -// if(req.op == KVS_CMD_PSYNC){ -// build_thread_to_sync(req.args->data, conn); -// } - -// int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len); -// // 构建响应 <0 构建失败 -// kvs_free_request(&req); -// if (resp_len < 0) { -// *response_length = out_len; -// return -1; -// } - -// out_len += resp_len; -// consumed += len; -// } - -// // slave 暂时不需要回报,或者回一个new_offset -// if(conn->is_from_master){ -// conn->wlength = 0; -// return consumed; -// } -// *response_length = out_len; -// return consumed; -// } - int kvs_protocol(struct conn* conn){ if (!conn) return -1; char *request = conn->rbuffer; @@ -190,38 +95,30 @@ int kvs_protocol(struct conn* conn){ val = resp_error("ERR dispatch failed"); } } else { - // persist into o o + // persist into oplog if(global_cfg.persistence == PERSIST_INCREMENTAL){ /* 执行成功:在这里保存到日志中(只记录更新类命令) */ if (cmd.argc > 0 && cmd.argv[0].ptr) { - /* 仅当返回 OK 时记录 */ - int is_ok = (val.type == RESP_T_SIMPLE_STR && - val.bulk.ptr && val.bulk.len == 2 && - ((val.bulk.ptr[0] == 'O' || val.bulk.ptr[0] == 'o') && - (val.bulk.ptr[1] == 'K' || val.bulk.ptr[1] == 'k'))); - - if (is_ok) { - /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ - const resp_slice_t *c0 = &cmd.argv[0]; - int is_update = 0; - if (c0->ptr && c0->len) { - if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { - is_update = 1; - } + /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ + const resp_slice_t *c0 = &cmd.argv[0]; + int is_update = 0; + if (c0->ptr && c0->len) { + if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { + is_update = 1; } + } - if (is_update) { - kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); - } + if (is_update) { + kvs_oplog_append(p, len, global_cmd_log_fd); } } } @@ -453,7 +350,7 @@ int init_kvengine(void) { if(global_cfg.persistence == PERSIST_INCREMENTAL){ init_cmd_log(global_oplog_file, &global_cmd_log_fd); - kvs_replay_log(global_oplog_file, global_cmd_log_fd); + kvs_replay_log(global_cmd_log_fd); } return 0; @@ -550,6 +447,10 @@ int init_config(AppConfig *cfg){ return 0; } +void init_disk_uring(iouring_ctx_t *uring_ctx){ + iouring_init(uring_ctx, 256); +} + int main(int argc, char *argv[]) { if(-1 == init_config(&global_cfg)){ @@ -558,6 +459,8 @@ int main(int argc, char *argv[]) { } init_data_file(&global_cfg); + + init_disk_uring(&global_uring_ctx); int port = global_cfg.port; @@ -575,7 +478,7 @@ int main(int argc, char *argv[]) { init_memory_pool(); init_kvengine(); - + #if (NETWORK_SELECT == NETWORK_REACTOR) reactor_start(port, kvs_protocol, master_ip, master_port); //