From f031e107b5a016d08c6152ebbcb24aa6af032e70 Mon Sep 17 00:00:00 2001 From: 1iaan <139833683+1iaan@users.noreply.github.com> Date: Tue, 20 Jan 2026 11:51:38 +0000 Subject: [PATCH] =?UTF-8?q?resp=E5=8D=8F=E8=AE=AE=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=92=8C=E4=BD=BF=E7=94=A8hiredis=E8=BF=9B=E8=A1=8C=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 22 +- README.md | 3 + common/config.c | 67 +++- common/config.h | 7 + config/config.xml | 12 +- kvs_cmd_log.c => kvs_inc_log.c | 47 +-- kvs_inc_log.h | 14 + kvs_protocol_resp.c | 4 +- kvs_protocol_resp.h | 2 + kvs_rw_tools.c | 571 ++++++++++++++++----------------- kvs_rw_tools.h | 107 +++--- kvstore.c | 338 +++++++++++++++---- kvstore.h | 16 +- reactor.c | 83 +++-- test-redis/test.c | 215 +++++++++++++ 15 files changed, 1010 insertions(+), 498 deletions(-) rename kvs_cmd_log.c => kvs_inc_log.c (53%) create mode 100644 kvs_inc_log.h create mode 100644 test-redis/test.c diff --git a/Makefile b/Makefile index e476c1e..bc295ad 100644 --- a/Makefile +++ b/Makefile @@ -1,21 +1,24 @@ CC = gcc -# FLAGS = -g -DJEMALLOC_NO_DEMANGLE -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -luring -ldl -ljemalloc -FLAGS = -g -DJEMALLOC_NO_DEMANGLE -I./NtyCo/core/ -I/usr/include/libxml2 -L ./NtyCo/ -lntyco -lpthread -luring -ldl -lxml2 -# SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array.c kvs_rbtree.c kvs_hash.c kvs_rw_tools.c -SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_cmd_log.c ./mem_pool/mem_pool.c kvs_slave.c ./common/config.c -TESTCASE_SRCS = testcase.c +FLAGS = -g -DJEMALLOC_NO_DEMANGLE -I./NtyCo/core/ -I/usr/include/libxml2 -L ./NtyCo/ +LDFLAGS = -lntyco -lpthread -luring -ldl -lxml2 +SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_protocol_resp.c kvs_inc_log.c kvs_slave.c ./mem_pool/mem_pool.c ./common/config.c TARGET = kvstore SUBDIR = ./NtyCo/ + TESTCASE = testcase +TESTCASE_SRCS = testcase.c TESTCASE2 = ./test/testcase TESTCASE2_SRCS = ./test/testcase.c ./test/test_client.c +TEST_REDIS = ./test-redis/testcase +TEST_REDIS_SRCS = ./test-redis/test.c +TEST_REDIS_LDFLAGS = -lhiredis OBJS = $(SRCS:.c=.o) -all: $(SUBDIR) $(TARGET) $(TESTCASE) $(TESTCASE2) +all: $(SUBDIR) $(TARGET) $(TESTCASE) $(TESTCASE2) $(TEST_REDIS) $(SUBDIR): ECHO make -C $@ @@ -24,7 +27,7 @@ ECHO: @echo $(SUBDIR) $(TARGET): $(OBJS) - $(CC) -o $@ $^ $(FLAGS) + $(CC) -o $@ $^ $(FLAGS) $(LDFLAGS) $(TESTCASE): $(TESTCASE_SRCS) $(CC) -g -o $@ $^ @@ -32,10 +35,13 @@ $(TESTCASE): $(TESTCASE_SRCS) $(TESTCASE2): $(TESTCASE2_SRCS) $(CC) -g -o $@ $^ +$(TEST_REDIS): $(TEST_REDIS_SRCS) + $(CC) -g -o $@ $^ $(TEST_REDIS_LDFLAGS) + %.o: %.c $(CC) $(FLAGS) -c $^ -g -o $@ clean: - rm -rf $(OBJS) $(TARGET) $(TESTCASE) $(TESTCASE2) + rm -rf $(OBJS) $(TARGET) $(TESTCASE) $(TESTCASE2) $(TEST_REDIS) diff --git a/README.md b/README.md index c6fe511..6335b4e 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,10 @@ ## 环境安装 ```shell +# xml sudo apt install libxml2 libxml2-dev +# hiredis client +sudo apt install -y libhiredis-dev ``` ## 需求 diff --git a/common/config.c b/common/config.c index 184c9e5..d0da03e 100644 --- a/common/config.c +++ b/common/config.c @@ -30,6 +30,21 @@ static void set_default_config(AppConfig *cfg) strncpy(cfg->master_ip, "127.0.0.1", sizeof(cfg->master_ip) - 1); cfg->master_ip[sizeof(cfg->master_ip) - 1] = '\0'; + strncpy(cfg->persist_dir, "data.default", sizeof(cfg->persist_dir) - 1); + cfg->persist_dir[sizeof(cfg->persist_dir) - 1] = '\0'; + + strncpy(cfg->oplog_file, "kvs_oplog.default.db", sizeof(cfg->oplog_file) - 1); + cfg->oplog_file[sizeof(cfg->oplog_file) - 1] = '\0'; + + strncpy(cfg->array_file, "kvs_array.default.db", sizeof(cfg->array_file) - 1); + cfg->array_file[sizeof(cfg->array_file) - 1] = '\0'; + + strncpy(cfg->rbtree_file, "kvs_rbtree.default.db", sizeof(cfg->rbtree_file) - 1); + cfg->rbtree_file[sizeof(cfg->rbtree_file) - 1] = '\0'; + + strncpy(cfg->hash_file, "kvs_hash.default.db", sizeof(cfg->hash_file) - 1); + cfg->hash_file[sizeof(cfg->hash_file) - 1] = '\0'; + cfg->port = 8888; cfg->log_level = LOG_LEVEL_INFO; cfg->mode = MODE_MASTER; @@ -182,7 +197,7 @@ int config_load(const char *filename, AppConfig *out_cfg) xmlNodePtr mport_node = find_child(master, "port"); if (mport_node) { - xmlChar *txt = xmlNodeGetContent(port_node); + xmlChar *txt = xmlNodeGetContent(mport_node); if (txt) { out_cfg->master_port = atoi((char *)txt); xmlFree(txt); @@ -215,6 +230,56 @@ int config_load(const char *filename, AppConfig *out_cfg) xmlFree(txt); } } + + xmlNodePtr dir_node = find_child(pers, "dir"); + if (dir_node) { + xmlChar *txt = xmlNodeGetContent(dir_node); + if (txt) { + strncpy(out_cfg->persist_dir, (char *)txt, sizeof(out_cfg->persist_dir) - 1); + out_cfg->persist_dir[sizeof(out_cfg->persist_dir) - 1] = '\0'; + xmlFree(txt); + } + } + + xmlNodePtr wal_node = find_child(pers, "wal"); + if (wal_node) { + xmlChar *txt = xmlNodeGetContent(wal_node); + if (txt) { + strncpy(out_cfg->oplog_file, (char *)txt, sizeof(out_cfg->oplog_file) - 1); + out_cfg->oplog_file[sizeof(out_cfg->oplog_file) - 1] = '\0'; + xmlFree(txt); + } + } + + xmlNodePtr array_node = find_child(pers, "array"); + if (array_node) { + xmlChar *txt = xmlNodeGetContent(array_node); + if (txt) { + strncpy(out_cfg->array_file, (char *)txt, sizeof(out_cfg->array_file) - 1); + out_cfg->array_file[sizeof(out_cfg->array_file) - 1] = '\0'; + xmlFree(txt); + } + } + + xmlNodePtr rbtree_node = find_child(pers, "rbtree"); + if (rbtree_node) { + xmlChar *txt = xmlNodeGetContent(rbtree_node); + if (txt) { + strncpy(out_cfg->rbtree_file, (char *)txt, sizeof(out_cfg->rbtree_file) - 1); + out_cfg->rbtree_file[sizeof(out_cfg->rbtree_file) - 1] = '\0'; + xmlFree(txt); + } + } + + xmlNodePtr hash_node = find_child(pers, "hash"); + if (hash_node) { + xmlChar *txt = xmlNodeGetContent(hash_node); + if (txt) { + strncpy(out_cfg->hash_file, (char *)txt, sizeof(out_cfg->hash_file) - 1); + out_cfg->hash_file[sizeof(out_cfg->hash_file) - 1] = '\0'; + xmlFree(txt); + } + } } /* memory 部分 */ diff --git a/common/config.h b/common/config.h index 2261530..15f01da 100644 --- a/common/config.h +++ b/common/config.h @@ -34,7 +34,14 @@ typedef struct { int master_port; // slave 才需要 LogLevel log_level; + PersistenceType persistence; + char persist_dir[256]; + char oplog_file[256]; + char array_file[256]; + char rbtree_file[256]; + char hash_file[256]; + AllocatorType allocator; } AppConfig; diff --git a/config/config.xml b/config/config.xml index b17cbe9..7830ec5 100644 --- a/config/config.xml +++ b/config/config.xml @@ -3,7 +3,7 @@ 127.0.0.1 8888 - master + master @@ -13,11 +13,17 @@ - INFO + INFO - none + incremental + data + + kvs_oplog.db + kvs_array.db + kvs_rbtree.db + kvs_hash.db diff --git a/kvs_cmd_log.c b/kvs_inc_log.c similarity index 53% rename from kvs_cmd_log.c rename to kvs_inc_log.c index 1c0155d..5f559d9 100644 --- a/kvs_cmd_log.c +++ b/kvs_inc_log.c @@ -1,6 +1,7 @@ #include "kvstore.h" #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" +#include "kvs_protocol_resp.h" #include #include #include @@ -20,8 +21,6 @@ int destroy_cmd_log(int logfd){ return 0; } - - int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd){ if (logfd < 0 || !cmd || len == 0) return -1; @@ -47,42 +46,48 @@ int kvs_replay_log(const char *logfile, int logfd){ uint32_t nlen = 0; int hr = read_full(logfd, &nlen, sizeof(nlen)); - if (hr == 0) break; /* EOF:正常结束 */ - if (hr < 0) { return -2; } /* 半截头 */ + if (hr == 0) break; /* EOF:正常结束 */ + if (hr < 0) { return -2; } /* 半截头 */ uint32_t len = ntohl(nlen); if (len == 0) { return -3; } - uint8_t *cmd = (uint8_t *)kvs_malloc(len); - if (!cmd) { return -5; } + uint8_t *cmd_bytes = (uint8_t *)kvs_malloc(len); + if (!cmd_bytes ) { return -5; } - int pr = read_full(logfd, cmd, len); - if (pr <= 0) { /* 半截 payload */ - kvs_free(cmd); + int pr = read_full(logfd, cmd_bytes, len); + if (pr <= 0) { /* 半截 payload */ + kvs_free(cmd_bytes ); return -6; } - kvs_req_t req; - memset(&req, 0, sizeof(req)); + /* -------- RESP parse -------- */ + resp_cmd_t cmd; + memset(&cmd, 0, sizeof(cmd)); + + int clen = resp_parse_one_cmd(cmd_bytes, (int)len, &cmd); - int clen = kvs_parse_one_cmd(cmd, (int)len, &req); if (clen <= 0 || clen != (int)len) { - kvs_free_request(&req); - kvs_free(cmd); + /* clen==0: need more data,但日志记录必须是一条完整命令,所以视为坏日志 */ + kvs_free(cmd_bytes); return -7; } - kvs_rsp_t rsp; - memset(&rsp, 0, sizeof(rsp)); + /* -------- execute -------- */ + resp_value_t outvalue; + memset(&outvalue, 0, sizeof(outvalue)); - if (kvs_execute_one_cmd(&req, &rsp) < 0) { - kvs_free_request(&req); - kvs_free(cmd); + int dr = resp_dispatch(&cmd, &outvalue); + if (dr < 0) { + kvs_free(cmd_bytes); return -8; } - kvs_free_request(&req); - kvs_free(cmd); + /* 注意: + * outv 可能引用存储内存,但我们不 build response,因此无需处理。 + * cmd_bytes 可以释放,因为 cmd slice 指向 cmd_bytes,仅在 dispatch 期间使用。 + * */ + kvs_free(cmd_bytes); } return 0; diff --git a/kvs_inc_log.h b/kvs_inc_log.h new file mode 100644 index 0000000..e0833e7 --- /dev/null +++ b/kvs_inc_log.h @@ -0,0 +1,14 @@ +#ifndef __KVS_INC_LOG_H__ +#define __KVS_INC_LOG_H__ + +#include +#include + +int init_cmd_log(const char *file, int *logfd); +int destroy_cmd_log(int logfd); + +int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); +int kvs_replay_log(const char *logfile, int logfd); +int ksv_clear_log(int logfd); + +#endif \ No newline at end of file diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index ac65b55..d3fb400 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -1,5 +1,5 @@ #include "kvs_protocol_resp.h" - +#include "kvs_rw_tools.h" #if ENABLE_ARRAY extern kvs_array_t global_array; @@ -58,7 +58,7 @@ static int parse_i64(const uint8_t *p, const uint8_t *line_end, int64_t *out) { } // 字符串比对 -static int ascii_casecmp(const uint8_t *a, uint32_t alen, const char *b) { +int ascii_casecmp(const uint8_t *a, uint32_t alen, const char *b) { size_t blen = strlen(b); if (alen != (uint32_t)blen) return -1; for (uint32_t i = 0; i < alen; i++) { diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h index 20412ed..27a2141 100644 --- a/kvs_protocol_resp.h +++ b/kvs_protocol_resp.h @@ -9,6 +9,8 @@ #define RESP_MAX_LINE 1024 #define RESP_MAX_BULK (16u * 1024u * 1024u) /* 16MB default */ +int ascii_casecmp(const uint8_t *a, uint32_t alen, const char *b); + /** * SET */ diff --git a/kvs_rw_tools.c b/kvs_rw_tools.c index d3daf5c..a7e5a4c 100644 --- a/kvs_rw_tools.c +++ b/kvs_rw_tools.c @@ -1,8 +1,9 @@ -#include "kvstore.h" #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" +#include "kvs_inc_log.h" #include #include +#include #if ENABLE_ARRAY extern kvs_array_t global_array; @@ -148,350 +149,318 @@ int kvs_read_file(FILE *fp, void *buf, size_t n){ -// return: -1 fail, 0 half, >0 consumed -int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){ - if (!request || request_length <= 0 || !req_out) return -1; +// // return: -1 fail, 0 half, >0 consumed +// int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){ +// if (!request || request_length <= 0 || !req_out) return -1; - req_out->op = KVS_CMD_COUNT; - req_out->argc = 0; - req_out->args = NULL; +// req_out->op = KVS_CMD_COUNT; +// req_out->argc = 0; +// req_out->args = NULL; - const uint8_t *p = request; - const uint8_t *end = request + (size_t)request_length; +// const uint8_t *p = request; +// const uint8_t *end = request + (size_t)request_length; - // OP + ARGC - if (kvs_need(p, end, 2)) { - return 0; // NEED_MORE - } +// // OP + ARGC +// if (kvs_need(p, end, 2)) { +// return 0; // NEED_MORE +// } - uint8_t op = 0, argc = 0; - if (kvs_read_u8(&p, end, &op) < 0) return -1; - if (kvs_read_u8(&p, end, &argc) < 0) return -1; +// uint8_t op = 0, argc = 0; +// if (kvs_read_u8(&p, end, &op) < 0) return -1; +// if (kvs_read_u8(&p, end, &argc) < 0) return -1; - if (argc > KVS_MAX_ARGC) return -1; +// if (argc > KVS_MAX_ARGC) return -1; - // 先扫描一遍确认整条命令数据都在 buffer 里 - const uint8_t *scan = p; - uint32_t lens[KVS_MAX_ARGC]; - if (argc > 0) { - for (uint8_t i = 0; i < argc; i++) { - if (kvs_need(scan, end, 4)) { - return 0; // NEED_MORE - } - uint32_t alen = 0; - if (kvs_read_u32(&scan, end, &alen) < 0) return -1; +// // 先扫描一遍确认整条命令数据都在 buffer 里 +// const uint8_t *scan = p; +// uint32_t lens[KVS_MAX_ARGC]; +// if (argc > 0) { +// for (uint8_t i = 0; i < argc; i++) { +// if (kvs_need(scan, end, 4)) { +// return 0; // NEED_MORE +// } +// uint32_t alen = 0; +// if (kvs_read_u32(&scan, end, &alen) < 0) return -1; - // 防御:单个参数长度限制 - if (alen > KVS_MAX_ARGLEN) return -1; +// // 防御:单个参数长度限制 +// if (alen > KVS_MAX_ARGLEN) return -1; - // 防御:scan + alen 越界 / 半包 - if (kvs_need(scan, end, (size_t)alen)) { - return 0; // NEED_MORE - } - lens[i] = alen; - scan += alen; - } - } +// // 防御:scan + alen 越界 / 半包 +// if (kvs_need(scan, end, (size_t)alen)) { +// return 0; // NEED_MORE +// } +// lens[i] = alen; +// scan += alen; +// } +// } - size_t total_len = (size_t)(scan - request); - if (total_len > KVS_MAX_CMD_BYTES) return -1; +// size_t total_len = (size_t)(scan - request); +// if (total_len > KVS_MAX_CMD_BYTES) return -1; - req_out->op = op; - req_out->argc = argc; +// req_out->op = op; +// req_out->argc = argc; - if (argc == 0) { - return (int)total_len; - } +// if (argc == 0) { +// return (int)total_len; +// } - kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t)); - if (!args) { - kvs_free_request(req_out); - return -1; - } - memset(args, 0, (size_t)argc * sizeof(kvs_arg_t)); +// kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t)); +// if (!args) { +// kvs_free_request(req_out); +// return -1; +// } +// memset(args, 0, (size_t)argc * sizeof(kvs_arg_t)); - for (uint8_t i = 0; i < argc; i++) { - uint32_t alen = 0; - if (kvs_read_u32(&p, end, &alen) < 0) { - kvs_free(args); - kvs_free_request(req_out); - return -1; - } +// for (uint8_t i = 0; i < argc; i++) { +// uint32_t alen = 0; +// if (kvs_read_u32(&p, end, &alen) < 0) { +// kvs_free(args); +// kvs_free_request(req_out); +// return -1; +// } - // alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常 - if (alen != lens[i]) { - kvs_free(args); - kvs_free_request(req_out); - return -1; - } +// // alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常 +// if (alen != lens[i]) { +// kvs_free(args); +// kvs_free_request(req_out); +// return -1; +// } - args[i].len = alen; - args[i].data = p; // 直接指向输入 buffer(零拷贝) - p += alen; - } +// args[i].len = alen; +// args[i].data = p; // 直接指向输入 buffer(零拷贝) +// p += alen; +// } - req_out->args = args; +// req_out->args = args; - return (int)(p - request); -} +// return (int)(p - request); +// } -void kvs_free_request(kvs_req_t *req) { - if (!req) return; - if (req->args) { - kvs_free(req->args); - req->args = NULL; - } - req->op = KVS_CMD_COUNT; - req->argc = 0; -} +// void kvs_free_request(kvs_req_t *req) { +// if (!req) return; +// if (req->args) { +// kvs_free(req->args); +// req->args = NULL; +// } +// req->op = KVS_CMD_COUNT; +// req->argc = 0; +// } -/** - * 输入:req - * 输出:rsp - * 返回:-1 失败,参数错误,0 成功 - */ - int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) { - if(!req || !rsp_out) return -1; - rsp_out->op = req->op; - rsp_out->status = KVS_STATUS_ERROR; - rsp_out->data = NULL; - rsp_out->dlen = 0; +// /** +// * 输入:req +// * 输出:rsp +// * 返回:-1 失败,参数错误,0 成功 +// */ +// int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) { +// if(!req || !rsp_out) return -1; +// rsp_out->op = req->op; +// rsp_out->status = KVS_STATUS_ERROR; +// rsp_out->data = NULL; +// rsp_out->dlen = 0; - int argc = req->argc; - kvs_cmd_t op = req->op; - kvs_arg_t *argv = req->args; +// int argc = req->argc; +// kvs_cmd_t op = req->op; +// kvs_arg_t *argv = req->args; - uint32_t key_len = 0; - const void *key = NULL; - uint32_t value_len = 0; - const void *val = NULL; +// uint32_t key_len = 0; +// const void *key = NULL; +// uint32_t value_len = 0; +// const void *val = NULL; - if(argc == 1){ - key_len = argv[0].len; - key = argv[0].data; - }else if(argc == 2){ - key_len = argv[0].len; - key = argv[0].data; - value_len = argv[1].len; - val = argv[1].data; - } +// if(argc == 1){ +// key_len = argv[0].len; +// key = argv[0].data; +// }else if(argc == 2){ +// key_len = argv[0].len; +// key = argv[0].data; +// value_len = argv[1].len; +// val = argv[1].data; +// } - // 基本参数校验(按你原有命令语义) - switch (op) { - case KVS_CMD_SET: - case KVS_CMD_MOD: - case KVS_CMD_RSET: - case KVS_CMD_RMOD: - case KVS_CMD_HSET: - case KVS_CMD_HMOD: - if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } - break; - case KVS_CMD_GET: - case KVS_CMD_DEL: - case KVS_CMD_EXIST: - case KVS_CMD_RGET: - case KVS_CMD_RDEL: - case KVS_CMD_REXIST: - case KVS_CMD_HGET: - case KVS_CMD_HDEL: - case KVS_CMD_HEXIST: - case KVS_CMD_PSYNC: - if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } - break; - case KVS_CMD_SAVE: - if(argc != 0) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } - break; - default: - rsp_out->status = KVS_STATUS_BADREQ; - return -1; - } +// // 基本参数校验(按你原有命令语义) +// switch (op) { +// case KVS_CMD_SET: +// case KVS_CMD_MOD: +// case KVS_CMD_RSET: +// case KVS_CMD_RMOD: +// case KVS_CMD_HSET: +// case KVS_CMD_HMOD: +// if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } +// break; +// case KVS_CMD_GET: +// case KVS_CMD_DEL: +// case KVS_CMD_EXIST: +// case KVS_CMD_RGET: +// case KVS_CMD_RDEL: +// case KVS_CMD_REXIST: +// case KVS_CMD_HGET: +// case KVS_CMD_HDEL: +// case KVS_CMD_HEXIST: +// case KVS_CMD_PSYNC: +// if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } +// break; +// case KVS_CMD_SAVE: +// if(argc != 0) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } +// break; +// default: +// rsp_out->status = KVS_STATUS_BADREQ; +// return -1; +// } - int ret = 0; - const char *result = NULL; +// int ret = 0; +// const char *result = NULL; - switch (op) { -#if ENABLE_ARRAY - case KVS_CMD_SET: - ret = kvs_array_set_bin(&global_array, key, key_len, val, value_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_EXIST; - return 0; +// switch (op) { +// #if ENABLE_ARRAY +// case KVS_CMD_SET: +// ret = kvs_array_set_bin(&global_array, key, key_len, val, value_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_EXIST; +// return 0; - case KVS_CMD_GET: - result = kvs_array_get_bin(&global_array, key, key_len, &value_len); - if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } - rsp_out->status = KVS_STATUS_OK; - rsp_out->data = (uint8_t*)result; - rsp_out->dlen = (uint32_t)value_len; - return 0; +// case KVS_CMD_GET: +// result = kvs_array_get_bin(&global_array, key, key_len, &value_len); +// if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } +// rsp_out->status = KVS_STATUS_OK; +// rsp_out->data = (uint8_t*)result; +// rsp_out->dlen = (uint32_t)value_len; +// return 0; - case KVS_CMD_DEL: - ret = kvs_array_del_bin(&global_array, key, key_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_NO_EXIST; - return 0; +// case KVS_CMD_DEL: +// ret = kvs_array_del_bin(&global_array, key, key_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_NO_EXIST; +// return 0; - case KVS_CMD_MOD: - ret = kvs_array_mod_bin(&global_array, key, key_len, val, value_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_NO_EXIST; - return 0; +// case KVS_CMD_MOD: +// ret = kvs_array_mod_bin(&global_array, key, key_len, val, value_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_NO_EXIST; +// return 0; - case KVS_CMD_EXIST: - ret = kvs_array_exist_bin(&global_array, key, key_len); - rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; - return 0; -#endif +// case KVS_CMD_EXIST: +// ret = kvs_array_exist_bin(&global_array, key, key_len); +// rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; +// return 0; +// #endif -#if ENABLE_RBTREE - case KVS_CMD_RSET: - ret = kvs_rbtree_set(&global_rbtree, key, key_len, val, value_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_EXIST; - return 0; +// #if ENABLE_RBTREE +// case KVS_CMD_RSET: +// ret = kvs_rbtree_set(&global_rbtree, key, key_len, val, value_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_EXIST; +// return 0; - case KVS_CMD_RGET: - result = kvs_rbtree_get(&global_rbtree, key, key_len, &value_len); - if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } - rsp_out->status = KVS_STATUS_OK; - rsp_out->data = (uint8_t*)result; - rsp_out->dlen = (uint32_t)value_len; - return 0; +// case KVS_CMD_RGET: +// result = kvs_rbtree_get(&global_rbtree, key, key_len, &value_len); +// if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } +// rsp_out->status = KVS_STATUS_OK; +// rsp_out->data = (uint8_t*)result; +// rsp_out->dlen = (uint32_t)value_len; +// return 0; - case KVS_CMD_RDEL: - ret = kvs_rbtree_del(&global_rbtree, key, key_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_NO_EXIST; - return 0; +// case KVS_CMD_RDEL: +// ret = kvs_rbtree_del(&global_rbtree, key, key_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_NO_EXIST; +// return 0; - case KVS_CMD_RMOD: - ret = kvs_rbtree_mod(&global_rbtree, key, key_len, val, value_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_NO_EXIST; - return 0; +// case KVS_CMD_RMOD: +// ret = kvs_rbtree_mod(&global_rbtree, key, key_len, val, value_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_NO_EXIST; +// return 0; - case KVS_CMD_REXIST: - ret = kvs_rbtree_exist(&global_rbtree, key, key_len); - rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; - return 0; -#endif +// case KVS_CMD_REXIST: +// ret = kvs_rbtree_exist(&global_rbtree, key, key_len); +// rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; +// return 0; +// #endif -#if ENABLE_HASH - case KVS_CMD_HSET: - ret = kvs_hash_set_bin(&global_hash, key, key_len, val, value_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_EXIST; - return 0; +// #if ENABLE_HASH +// case KVS_CMD_HSET: +// ret = kvs_hash_set_bin(&global_hash, key, key_len, val, value_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_EXIST; +// return 0; - case KVS_CMD_HGET: - result = kvs_hash_get_bin(&global_hash, key, key_len, &value_len); - if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } - rsp_out->status = KVS_STATUS_OK; - rsp_out->data = (uint8_t*)result; - rsp_out->dlen = (uint32_t)value_len; - return 0; +// case KVS_CMD_HGET: +// result = kvs_hash_get_bin(&global_hash, key, key_len, &value_len); +// if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } +// rsp_out->status = KVS_STATUS_OK; +// rsp_out->data = (uint8_t*)result; +// rsp_out->dlen = (uint32_t)value_len; +// return 0; - case KVS_CMD_HDEL: - ret = kvs_hash_del_bin(&global_hash, key, key_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_NO_EXIST; - return 0; +// case KVS_CMD_HDEL: +// ret = kvs_hash_del_bin(&global_hash, key, key_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_NO_EXIST; +// return 0; - case KVS_CMD_HMOD: - ret = kvs_hash_mod_bin(&global_hash, key, key_len, val, value_len); - if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; - else if (ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_NO_EXIST; - return 0; +// case KVS_CMD_HMOD: +// ret = kvs_hash_mod_bin(&global_hash, key, key_len, val, value_len); +// if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; +// else if (ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_NO_EXIST; +// return 0; - case KVS_CMD_HEXIST: - ret = kvs_hash_exist_bin(&global_hash, key, key_len); - rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; - return 0; -#endif - case KVS_CMD_SAVE: - ret = kvs_save_to_file(); - if(ret == 0) rsp_out->status = KVS_STATUS_OK; - else rsp_out->status = KVS_STATUS_ERROR; - return 0; - case KVS_CMD_PSYNC: - rsp_out->op = req->op; - rsp_out->status = KVS_STATUS_OK; - return 0; - default: - rsp_out->status = KVS_STATUS_BADREQ; - return -1; - } +// case KVS_CMD_HEXIST: +// ret = kvs_hash_exist_bin(&global_hash, key, key_len); +// rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; +// return 0; +// #endif +// case KVS_CMD_SAVE: +// ret = kvs_save_to_file(); +// if(ret == 0) rsp_out->status = KVS_STATUS_OK; +// else rsp_out->status = KVS_STATUS_ERROR; +// return 0; +// case KVS_CMD_PSYNC: +// rsp_out->op = req->op; +// rsp_out->status = KVS_STATUS_OK; +// return 0; +// default: +// rsp_out->status = KVS_STATUS_BADREQ; +// return -1; +// } - return -1; -} +// return -1; +// } -/** - * 构建单条响应 - * 返回:-1 失败,>=0 响应长度 - */ -int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){ - if (!results || !response) return -1; +// /** +// * 构建单条响应 +// * 返回:-1 失败,>=0 响应长度 +// */ +// int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){ +// if (!results || !response) return -1; - const uint8_t *end = response + response_cap; - uint8_t *p = response; +// const uint8_t *end = response + response_cap; +// uint8_t *p = response; - // 计算所需长度:1 + 1 + 4 + dlen - // 注意防止 size_t 溢出 - size_t need = 1u + 1u + 4u + (size_t)results->dlen; - if (need > response_cap) return -1; +// // 计算所需长度:1 + 1 + 4 + dlen +// // 注意防止 size_t 溢出 +// size_t need = 1u + 1u + 4u + (size_t)results->dlen; +// if (need > response_cap) return -1; - if (kvs_write_u8(&p, end, (uint8_t)results->op) < 0) return -1; - if (kvs_write_u8(&p, end, results->status) < 0) return -1; - if (kvs_write_u32(&p, end, results->dlen) < 0) return -1; +// if (kvs_write_u8(&p, end, (uint8_t)results->op) < 0) return -1; +// if (kvs_write_u8(&p, end, results->status) < 0) return -1; +// if (kvs_write_u32(&p, end, results->dlen) < 0) return -1; - if (results->dlen > 0) { - if (!results->data) return -1; // 有长度却没指针,视为错误 - if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1; - memcpy(p, results->data, results->dlen); - p += results->dlen; - } +// if (results->dlen > 0) { +// if (!results->data) return -1; // 有长度却没指针,视为错误 +// if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1; +// memcpy(p, results->data, results->dlen); +// p += results->dlen; +// } - return (int)(p - response); -} - -int kvs_save_to_file(){ - int ret = 0; - int rc = 0; - #if ENABLE_ARRAY - rc = kvs_array_save(&global_array, KVS_ARRAY_FILE); - if(rc < 0){ - printf("kvs_engine_array save error\n"); - ret = -1; - } - #endif - - #if ENABLE_RBTREE - rc = kvs_rbtree_save(&global_rbtree, KVS_RBTREE_FILE); - if(rc < 0){ - printf("kvs_engine_rbtree save error\n"); - ret = -1; - } - #endif - - #if ENABLE_HASH - rc = kvs_hash_save(&global_hash, KVS_HASH_FILE); - if(rc < 0){ - printf("kvs_engine_hash save error\n"); - ret = -1; - } - #endif - - ksv_clear_log(global_cmd_log_fd); - - return ret; -} \ No newline at end of file +// return (int)(p - response); +// } \ No newline at end of file diff --git a/kvs_rw_tools.h b/kvs_rw_tools.h index bf80a68..1b23468 100644 --- a/kvs_rw_tools.h +++ b/kvs_rw_tools.h @@ -33,66 +33,65 @@ int read_full(int fd, void *buf, size_t n); #define KVS_MAX_CMD_BYTES (1024) #define KVS_MAX_ARGC 3 -enum { - KVS_STATUS_OK = 0, - KVS_STATUS_ERROR = 1, - KVS_STATUS_NO_EXIST = 2, - KVS_STATUS_EXIST = 3, - KVS_STATUS_BADREQ = 4 -}; +// enum { +// KVS_STATUS_OK = 0, +// KVS_STATUS_ERROR = 1, +// KVS_STATUS_NO_EXIST = 2, +// KVS_STATUS_EXIST = 3, +// KVS_STATUS_BADREQ = 4 +// }; -typedef enum { - KVS_CMD_START = 0, - // array - KVS_CMD_SET = KVS_CMD_START, - KVS_CMD_GET, - KVS_CMD_DEL, - KVS_CMD_MOD, - KVS_CMD_EXIST, - // rbtree - KVS_CMD_RSET, - KVS_CMD_RGET, - KVS_CMD_RDEL, - KVS_CMD_RMOD, - KVS_CMD_REXIST, - // hash - KVS_CMD_HSET, - KVS_CMD_HGET, - KVS_CMD_HDEL, - KVS_CMD_HMOD, - KVS_CMD_HEXIST, +// typedef enum { +// KVS_CMD_START = 0, +// // array +// KVS_CMD_SET = KVS_CMD_START, +// KVS_CMD_GET, +// KVS_CMD_DEL, +// KVS_CMD_MOD, +// KVS_CMD_EXIST, +// // rbtree +// KVS_CMD_RSET, +// KVS_CMD_RGET, +// KVS_CMD_RDEL, +// KVS_CMD_RMOD, +// KVS_CMD_REXIST, +// // hash +// KVS_CMD_HSET, +// KVS_CMD_HGET, +// KVS_CMD_HDEL, +// KVS_CMD_HMOD, +// KVS_CMD_HEXIST, - KVS_CMD_PSYNC, - KVS_CMD_SAVE, - KVS_CMD_COUNT, -}kvs_cmd_t; +// KVS_CMD_PSYNC, +// KVS_CMD_SAVE, +// KVS_CMD_COUNT, +// }kvs_cmd_t; -typedef struct kvs_arg_s{ - uint32_t len; - const uint8_t *data; -} kvs_arg_t; +// typedef struct kvs_arg_s{ +// uint32_t len; +// const uint8_t *data; +// } kvs_arg_t; -typedef struct kvs_req_s{ - kvs_cmd_t op; - uint8_t argc; - kvs_arg_t *args; -}kvs_req_t; +// typedef struct kvs_req_s{ +// kvs_cmd_t op; +// uint8_t argc; +// kvs_arg_t *args; +// }kvs_req_t; -typedef struct kvs_rsp_s{ - kvs_cmd_t op; - uint8_t status; - uint32_t dlen; - const uint8_t *data; -} kvs_rsp_t; +// typedef struct kvs_rsp_s{ +// kvs_cmd_t op; +// uint8_t status; +// uint32_t dlen; +// const uint8_t *data; +// } kvs_rsp_t; -int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out); -void kvs_free_request(kvs_req_t *req); -int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out); -int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap); -int kvs_save_to_file(); +// int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out); +// void kvs_free_request(kvs_req_t *req); +// int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out); +// int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap); -int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); -int kvs_replay_log(const char *logfile, int logfd); -int ksv_clear_log(int logfd); +// int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); +// int kvs_replay_log(const char *logfile, int logfd); +// int ksv_clear_log(int logfd); #endif diff --git a/kvstore.c b/kvstore.c index 20ba029..2255aa3 100644 --- a/kvstore.c +++ b/kvstore.c @@ -3,7 +3,10 @@ #include "kvstore.h" #include "kvs_rw_tools.h" +#include "kvs_protocol_resp.h" +#include "kvs_inc_log.h" #include "mem_pool/mem_pool.h" +#include "common/config.h" #include #include #include @@ -13,7 +16,6 @@ #include #include #include -#include "common/config.h" #if ENABLE_ARRAY extern kvs_array_t global_array; @@ -31,13 +33,14 @@ extern kvs_hash_t global_hash; extern mp_pool_t global_mempool; #endif -AppConfig cfg; - -ServerMode mode = MODE_MASTER; +AppConfig global_cfg; int global_cmd_log_fd = -1; - +char global_oplog_file[256] = "kvs_oplog.default.db"; +char global_array_file[256] = "kvs_array.default.db"; +char global_rbtree_file[256] = "kvs_rbtree.default.db"; +char global_hash_file[256] = "kvs_hash.default.db"; int is_update_cmd(kvs_cmd_t op){ if(op == KVS_CMD_SET || op == KVS_CMD_RSET || op == KVS_CMD_HSET @@ -56,6 +59,87 @@ int is_update_cmd(kvs_cmd_t op){ // int kvs_protocol(char *request, int request_length, char *response, int *response_length){ +// int kvs_protocol(struct conn* conn){ +// if (!conn) return -1; +// char *request = conn->rbuffer; +// int request_length = conn->rlength; +// char *response = conn->wbuffer; +// int *response_length = &conn->wlength; + +// if (!request || request_length <= 0 || !response || !response_length) return -1; +// int consumed = 0; +// int out_len = 0; + +// static int i = 0; +// while(consumed < request_length ){ +// if(i > 33){ +// i = i+1; +// i = i-1; +// } +// if(i == 47) i = 0; +// ++i; + +// kvs_req_t req; +// memset(&req, 0, sizeof(kvs_req_t)); + +// const uint8_t *p = request+consumed; +// int remain = request_length - consumed; + +// int len = kvs_parse_one_cmd(p, remain, &req); +// if(len < 0){ +// // 解析失败 +// kvs_free_request(&req); +// *response_length = out_len; +// return -1; +// } +// else if(len == 0){ +// // 半包 +// kvs_free_request(&req); +// break; +// } + +// kvs_rsp_t rsp; +// memset(&rsp, 0, sizeof(kvs_rsp_t)); + +// // 执行失败 +// if (kvs_execute_one_cmd(&req, &rsp) < 0){ +// kvs_free_request(&req); +// *response_length = out_len; +// return -1; +// }else{ +// // 执行成功,在这里保存到日志中。 +// if(rsp.status == KVS_STATUS_OK){ +// if(is_update_cmd(req.op)){ +// kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); +// } +// } +// } + +// if(req.op == KVS_CMD_PSYNC){ +// build_thread_to_sync(req.args->data, conn); +// } + +// int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len); +// // 构建响应 <0 构建失败 +// kvs_free_request(&req); +// if (resp_len < 0) { +// *response_length = out_len; +// return -1; +// } + +// out_len += resp_len; +// consumed += len; +// } + +// // slave 暂时不需要回报,或者回一个new_offset +// if(conn->is_from_master){ +// conn->wlength = 0; +// return consumed; +// } +// *response_length = out_len; +// return consumed; +// } + int kvs_protocol(struct conn* conn){ if (!conn) return -1; char *request = conn->rbuffer; @@ -64,65 +148,109 @@ int kvs_protocol(struct conn* conn){ int *response_length = &conn->wlength; if (!request || request_length <= 0 || !response || !response_length) return -1; + int consumed = 0; int out_len = 0; - static int i = 0; while(consumed < request_length ){ - if(i > 33){ - i = i+1; - i = i-1; - } - if(i == 47) i = 0; - ++i; - - kvs_req_t req; - memset(&req, 0, sizeof(kvs_req_t)); - const uint8_t *p = request+consumed; int remain = request_length - consumed; + + resp_cmd_t cmd; + memset(&cmd, 0, sizeof(cmd)); - int len = kvs_parse_one_cmd(p, remain, &req); + int len = resp_parse_one_cmd(p, remain, &cmd); if(len < 0){ - // 解析失败 - kvs_free_request(&req); + /* 协议错误:直接返回,已构建的响应仍可写回 */ *response_length = out_len; return -1; } else if(len == 0){ - // 半包 - kvs_free_request(&req); + // 半包 break; } - kvs_rsp_t rsp; - memset(&rsp, 0, sizeof(kvs_rsp_t)); + resp_value_t val; + memset(&val, 0, sizeof(val)); - // 执行失败 - if (kvs_execute_one_cmd(&req, &rsp) < 0){ - kvs_free_request(&req); - *response_length = out_len; - return -1; - }else{ -// 执行成功,在这里保存到日志中。 - if(rsp.status == KVS_STATUS_OK){ - if(is_update_cmd(req.op)){ - kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); + int dr = resp_dispatch(&cmd, &val); + + /* + * 语义建议: + * - resp_dispatch() 即使返回 -1(比如 unknown command / wrong argc), + * 一般也已经把 out_value 设置成了 RESP error,这样客户端能收到错误响应。 + * - 如果 dr < 0 但 val.type 没被正确设置,兜底回一个通用错误。 + */ + if(dr < 0){ + if (val.type != RESP_T_SIMPLE_STR && + val.type != RESP_T_ERROR && + val.type != RESP_T_INTEGER && + val.type != RESP_T_BULK_STR && + val.type != RESP_T_NIL) { + val = resp_error("ERR dispatch failed"); + } + } else { + // persist into o o + if(global_cfg.persistence == PERSIST_INCREMENTAL){ + + /* 执行成功:在这里保存到日志中(只记录更新类命令) */ + if (cmd.argc > 0 && cmd.argv[0].ptr) { + /* 仅当返回 OK 时记录 */ + int is_ok = (val.type == RESP_T_SIMPLE_STR && + val.bulk.ptr && val.bulk.len == 2 && + ((val.bulk.ptr[0] == 'O' || val.bulk.ptr[0] == 'o') && + (val.bulk.ptr[1] == 'K' || val.bulk.ptr[1] == 'k'))); + + if (is_ok) { + /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ + const resp_slice_t *c0 = &cmd.argv[0]; + int is_update = 0; + if (c0->ptr && c0->len) { + if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { + is_update = 1; + } + } + + if (is_update) { + kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); + } + } } } } - if(req.op == KVS_CMD_PSYNC){ - build_thread_to_sync(req.args->data, conn); - } - int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len); - // 构建响应 <0 构建失败 - kvs_free_request(&req); - if (resp_len < 0) { + /* PSYNC:触发同步线程(按你原来逻辑:从 argv[1] 取参数) */ + if (cmd.argc > 0 && cmd.argv[0].ptr && + ascii_casecmp(cmd.argv[0].ptr, cmd.argv[0].len, "PSYNC") == 0) { + if (cmd.argc >= 2 && cmd.argv[1].ptr) { + build_thread_to_sync((const char *)cmd.argv[1].ptr, conn); + } else { + /* 如果你希望 PSYNC 无参也能触发,可以传 NULL 或空串 */ + build_thread_to_sync(NULL, conn); + } + } + + /* 构建响应 */ + int cap = KVS_MAX_RESPONSE - out_len; + if (cap <= 0) { *response_length = out_len; - return -1; - } + return -1; + } + + int resp_len = resp_build_value(&val, response + out_len, (size_t)cap); + if (resp_len < 0) { + *response_length = out_len; + return -1; + } out_len += resp_len; consumed += len; @@ -133,10 +261,44 @@ int kvs_protocol(struct conn* conn){ conn->wlength = 0; return consumed; } + *response_length = out_len; return consumed; } +int kvs_save_to_file(){ + int ret = 0; + int rc = 0; + #if ENABLE_ARRAY + rc = kvs_array_save(&global_array, global_array_file); + if(rc < 0){ + printf("kvs_engine_array save error\n"); + ret = -1; + } + #endif + + #if ENABLE_RBTREE + rc = kvs_rbtree_save(&global_rbtree, global_rbtree_file); + if(rc < 0){ + printf("kvs_engine_rbtree save error\n"); + ret = -1; + } + #endif + + #if ENABLE_HASH + rc = kvs_hash_save(&global_hash, global_hash_file); + if(rc < 0){ + printf("kvs_engine_hash save error\n"); + ret = -1; + } + #endif + + ksv_clear_log(global_cmd_log_fd); + + return ret; +} + + extern void sync_wakeup(int fd); static int g_slavefd = -1; static uint64_t g_offset = 0; @@ -144,7 +306,7 @@ static uint64_t g_offset = 0; static void *sync_thread_main(void *arg) { struct conn *conn = (struct conn*) arg; - int logfd = open(KVS_CMD_LOG_FILE, O_RDONLY); + int logfd = open(global_oplog_file, O_RDONLY); if (logfd < 0) { printf("open replaylog failed: %s\n", strerror(errno)); return NULL; @@ -272,25 +434,27 @@ int init_kvengine(void) { memset(&global_array, 0, sizeof(kvs_array_t)); kvs_array_create(&global_array); - kvs_array_load(&global_array, KVS_ARRAY_FILE); + kvs_array_load(&global_array, global_array_file); #endif #if ENABLE_RBTREE memset(&global_rbtree, 0, sizeof(kvs_rbtree_t)); kvs_rbtree_create(&global_rbtree); - kvs_rbtree_load(&global_rbtree, KVS_RBTREE_FILE); + kvs_rbtree_load(&global_rbtree, global_rbtree_file); #endif #if ENABLE_HASH memset(&global_hash, 0, sizeof(kvs_hash_t)); kvs_hash_create(&global_hash); - kvs_hash_load(&global_hash, KVS_HASH_FILE); + kvs_hash_load(&global_hash, global_hash_file); #endif - init_cmd_log(KVS_CMD_LOG_FILE, &global_cmd_log_fd); - kvs_replay_log(KVS_CMD_LOG_FILE, global_cmd_log_fd); + if(global_cfg.persistence == PERSIST_INCREMENTAL){ + init_cmd_log(global_oplog_file, &global_cmd_log_fd); + kvs_replay_log(global_oplog_file, global_cmd_log_fd); + } return 0; } @@ -321,6 +485,39 @@ void dest_memory_pool(void){ #endif } +static int ensure_dir_exists(const char *dir) +{ + struct stat st; + if (stat(dir, &st) == 0) { + return S_ISDIR(st.st_mode) ? 0 : -2; // 存在但不是目录 + } + if (mkdir(dir, 0755) == 0) return 0; + if (errno == EEXIST) return 0; + return -1; +} + +static int join_path(char *out, size_t out_sz, const char *dir, const char *file) +{ + if (!out || out_sz == 0 || !dir || !file) return -1; + + size_t dlen = strlen(dir); + if (dlen == 0) return -1; + + int need_slash = (dir[dlen - 1] != '/'); + + int n = snprintf(out, out_sz, need_slash ? "%s/%s" : "%s%s", dir, file); + if (n < 0 || (size_t)n >= out_sz) return -2; // 截断了 + return 0; +} + +void init_data_file(AppConfig *cfg){ + ensure_dir_exists(cfg->persist_dir); + join_path(global_oplog_file, sizeof(global_oplog_file), cfg->persist_dir, cfg->oplog_file); + join_path(global_array_file, sizeof(global_array_file), cfg->persist_dir, cfg->array_file); + join_path(global_rbtree_file, sizeof(global_rbtree_file), cfg->persist_dir, cfg->rbtree_file); + join_path(global_hash_file, sizeof(global_hash_file), cfg->persist_dir, cfg->hash_file); +} + int init_config(AppConfig *cfg){ xmlInitParser(); @@ -330,16 +527,24 @@ int init_config(AppConfig *cfg){ return -1; } - printf("=== Config ===\n"); - printf("IP : %s\n", cfg->ip); - printf("Port : %d\n", cfg->port); - printf("Log level : %s\n", log_level_to_string(cfg->log_level)); - printf("Master IP : %s\n", cfg->master_ip); - printf("Master Port : %d\n", cfg->master_port); - printf("Mode : %s\n", server_mode_to_string(cfg->mode)); - printf("Persistence : %s\n", persistence_to_string(cfg->persistence)); - printf("Allocator : %s\n", allocator_to_string(cfg->allocator)); - printf("=== Config ===\n"); + printf("=============== Config ===============\n"); + printf("IP : %s\n", cfg->ip); + printf("Port : %d\n", cfg->port); + + printf("Mode : %s\n", server_mode_to_string(cfg->mode)); + printf("|—— Master IP : %s\n", cfg->master_ip); + printf("|—— Master Port : %d\n", cfg->master_port); + + printf("Persistence : %s\n", persistence_to_string(cfg->persistence)); + printf("|—— Persist-dir : %s\n", cfg->persist_dir); + printf("|—— Persist-oplog : %s\n", cfg->oplog_file); + printf("|—— Persist-array : %s\n", cfg->array_file); + printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file); + printf("|—— Persist-hash : %s\n", cfg->hash_file); + + printf("Log level : %s\n", log_level_to_string(cfg->log_level)); + printf("Allocator : %s\n", allocator_to_string(cfg->allocator)); + printf("=============== Config ===============\n"); xmlCleanupParser(); return 0; @@ -347,24 +552,27 @@ int init_config(AppConfig *cfg){ int main(int argc, char *argv[]) { - if(-1 == init_config(&cfg)){ + if(-1 == init_config(&global_cfg)){ printf("Init Config error"); return -1; } - int port = cfg.port; - mode = cfg.mode; + init_data_file(&global_cfg); + + int port = global_cfg.port; char *master_ip = NULL; int master_port = -1; - if(mode == MODE_SLAVE){ - master_ip = cfg.master_ip; - master_port = cfg.master_port; - }else if(mode == MODE_MASTER){ + if(global_cfg.mode == MODE_SLAVE){ + master_ip = global_cfg.master_ip; + master_port = global_cfg.master_port; + }else if(global_cfg.mode == MODE_MASTER){ } + + init_memory_pool(); init_kvengine(); diff --git a/kvstore.h b/kvstore.h index cfaa28e..a74cec8 100644 --- a/kvstore.h +++ b/kvstore.h @@ -31,10 +31,10 @@ #define BIN_SAFE 1 -#define KVS_CMD_LOG_FILE "kvs_cmd_log.db" -#define KVS_ARRAY_FILE "kvs_snap_array.db" -#define KVS_RBTREE_FILE "kvs_snap_rbtree.db" -#define KVS_HASH_FILE "kvs_snap_hash.db" +// #define KVS_CMD_LOG_FILE "kvs_wal.db" +// #define KVS_ARRAY_FILE "kvs_array.db" +// #define KVS_RBTREE_FILE "kvs_rbtree.db" +// #define KVS_HASH_FILE "kvs_hash.db" @@ -46,12 +46,6 @@ extern int reactor_start(unsigned short port, msg_handler handler, const char *m extern int proactor_start(unsigned short port, msg_handler handler); extern int ntyco_start(unsigned short port, msg_handler handler); -extern int init_cmd_log(const char *file, int *logfd); -extern int destroy_cmd_log(int logfd); -extern int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); -extern int kvs_replay_log(const char *logfile, int logfd); -extern int ksv_clear_log(int logfd); - extern int try_connect_master(char *ip, int port); void build_thread_to_sync(const uint8_t *offset, struct conn* conn); @@ -275,7 +269,7 @@ int kvs_hash_exist(kvs_hash_t *hash, char *key); #endif - +int kvs_save_to_file(); #endif diff --git a/reactor.c b/reactor.c index c099693..60368b5 100644 --- a/reactor.c +++ b/reactor.c @@ -238,43 +238,62 @@ int send_cb(int fd) { #endif - int count = 0; + struct conn *c = &conn_list[fd]; + int sent_total = 0; -#if 0 - if (conn_list[fd].status == 1) { - //printf("SEND: %s\n", conn_list[fd].wbuffer); - count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); - set_event(fd, EPOLLOUT, 0); - } else if (conn_list[fd].status == 2) { - set_event(fd, EPOLLOUT, 0); - } else if (conn_list[fd].status == 0) { + pthread_mutex_lock(&c->g_sync_lock); - if (conn_list[fd].wlength != 0) { - count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); - } - - set_event(fd, EPOLLIN, 0); + while (c->wlength > 0) { + ssize_t n = send(fd, c->wbuffer, (size_t)c->wlength, MSG_NOSIGNAL); + if (n > 0) { + sent_total += (int)n; + + if (n == c->wlength) { + /* 全部发完 */ + c->wlength = 0; + break; + } + + /* 只发了一部分:把剩余数据搬到 buffer 头部 */ + int left = c->wlength - (int)n; + memmove(c->wbuffer, c->wbuffer + n, (size_t)left); + c->wlength = left; + + /* 不要在这里死循环占用 CPU,交给下一次 EPOLLOUT */ + break; + } + + if (n < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* 暂时发不出去,等下一次可写事件 */ + pthread_mutex_unlock(&c->g_sync_lock); + set_event(fd, EPOLLOUT, 0); + return sent_total; + } + + /* 对端断开 / 其他错误 */ + int e = errno; + pthread_mutex_unlock(&c->g_sync_lock); + + printf("send fd=%d errno=%d %s\n", fd, e, strerror(e)); + close(fd); + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + return 0; + } + + break; } -#else - // printf("wlength: %d\n", conn_list[fd].wlength); + pthread_mutex_unlock(&c->g_sync_lock); - pthread_mutex_lock(&conn_list[fd].g_sync_lock); - if (conn_list[fd].wlength != 0) { - // for(int i = 0;i < conn_list[fd].wlength; ++i){ - // printf("%02x", conn_list[fd].wbuffer[i]); - // } - // printf("\n"); - count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); - conn_list[fd].wlength = 0; - } - pthread_mutex_unlock(&conn_list[fd].g_sync_lock); - - set_event(fd, EPOLLIN, 0); + if (c->wlength > 0) { + /* 还有没发完,继续监听可写 */ + set_event(fd, EPOLLOUT, 0); + } else { + /* 发完了,回到读 */ + set_event(fd, EPOLLIN, 0); + } -#endif - //set_event(fd, EPOLLOUT, 0); - - return count; + return sent_total; } // wakup fd diff --git a/test-redis/test.c b/test-redis/test.c new file mode 100644 index 0000000..c71ebb1 --- /dev/null +++ b/test-redis/test.c @@ -0,0 +1,215 @@ +#include +#include +#include +#include +#include + +static void die(redisContext *c, const char *msg) { + fprintf(stderr, "%s: %s\n", msg, c && c->err ? c->errstr : "unknown"); + exit(1); +} + +static void must_ok(redisReply *r, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } + if (!(r->type == REDIS_REPLY_STATUS && r->str && strcasecmp(r->str, "OK") == 0)) { + fprintf(stderr, "%s: expect +OK, got type=%d str=%s\n", + what, r->type, r->str ? r->str : "(null)"); + freeReplyObject(r); + exit(1); + } + freeReplyObject(r); +} + +static void must_int(redisReply *r, long long expect, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } + if (r->type != REDIS_REPLY_INTEGER || r->integer != expect) { + fprintf(stderr, "%s: expect :%lld, got type=%d int=%lld\n", + what, expect, r->type, (long long)r->integer); + freeReplyObject(r); + exit(1); + } + freeReplyObject(r); +} + +static void must_bulk_eq(redisReply *r, const void *buf, size_t n, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } + if (r->type != REDIS_REPLY_STRING || r->len != n || memcmp(r->str, buf, n) != 0) { + fprintf(stderr, "%s: bulk mismatch. type=%d len=%zu\n", what, r->type, r->len); + freeReplyObject(r); + exit(1); + } + freeReplyObject(r); +} + +static void must_nil(redisReply *r, const char *what) { + if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } + if (r->type != REDIS_REPLY_NIL) { + fprintf(stderr, "%s: expect nil, got type=%d\n", what, r->type); + freeReplyObject(r); + exit(1); + } + freeReplyObject(r); +} + +void basic_command_test(redisContext *c){ + /* ---------- 1) 基本命令测试 ---------- */ + const char *k = "k1"; + const char *v1 = "v1"; + const char *v2 = "v2"; + + must_ok((redisReply*)redisCommand(c, "SET %b %b", k, strlen(k), v1, strlen(v1)), "SET"); + must_bulk_eq((redisReply*)redisCommand(c, "GET %b", k, strlen(k)), v1, strlen(v1), "GET"); + must_int((redisReply*)redisCommand(c, "EXIST %b", k, strlen(k)), 1, "EXIST=1"); + + must_ok((redisReply*)redisCommand(c, "MOD %b %b", k, strlen(k), v2, strlen(v2)), "MOD"); + must_bulk_eq((redisReply*)redisCommand(c, "GET %b", k, strlen(k)), v2, strlen(v2), "GET after MOD"); + + must_int((redisReply*)redisCommand(c, "DEL %b", k, strlen(k)), 1, "DEL=1"); + must_int((redisReply*)redisCommand(c, "EXIST %b", k, strlen(k)), 0, "EXIST=0"); + must_nil((redisReply*)redisCommand(c, "GET %b", k, strlen(k)), "GET nil"); + + printf("[OK] basic SET/GET/MOD/DEL/EXIST\n"); +} + +void special_char_test(redisContext *c){ + /* ---------- 2) 特殊字符/二进制测试 ---------- */ + uint8_t key2[] = { 'b','i','n',':',0x00,'K','\r','\n',0xFF }; + uint8_t val2[] = { 0x00,'A','\r','\n','B',0x00,0xFF }; + + must_ok((redisReply*)redisCommand(c, "SET %b %b", key2, sizeof(key2), val2, sizeof(val2)), + "SET binary"); + must_bulk_eq((redisReply*)redisCommand(c, "GET %b", key2, sizeof(key2)), + val2, sizeof(val2), "GET binary"); + (void)redisCommand(c, "DEL %b", key2, sizeof(key2)); /* 不强制检查返回 */ + + printf("[OK] binary/special chars\n"); +} + +void save(redisContext *c){ + must_ok((redisReply*)redisCommand(c, "SAVE"), "SET binary"); + + printf("[OK] SAVE\n"); +} + +void pipline_set_test(redisContext *c, int start, const char *op){ + /* ---------- 3) Pipeline 批处理测试 ---------- */ + const int N = 1000; + + /* 一次塞 N 个 SET */ + int end = start + N; + for (int i = start; i < end; i++) { + char kk[64], vv[64]; + int kn = snprintf(kk, sizeof(kk), "p:%d", i); + int vn = snprintf(vv, sizeof(vv), "v:%d", i); + if (redisAppendCommand( c, "%s %b %b", + op, + kk, (size_t)kn, + vv, (size_t)vn) != REDIS_OK) { + die(c, "redisAppendCommand SET failed"); + } + } + /* 再一次性把 N 个回复读出来 */ + for (int i = start; i < end; i++) { + redisReply *r = NULL; + if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply SET failed"); + must_ok(r, "pipeline SET reply"); + } + + printf("[OK] SET pipeline batch %d\n", N); +} + +void pipline_get_test(redisContext *c, int start, const char *op){ + const int N = 1000; + + /* pipeline GET + 校验 */ + int end = start + N; + for (int i = start; i < end; i++) { + char kk[64]; + int kn = snprintf(kk, sizeof(kk), "p:%d", i); + if (redisAppendCommand( c, "%s %b", + op, + kk, (size_t)kn) != REDIS_OK) { + die(c, "redisAppendCommand GET failed"); + } + } + for (int i = start; i < end; i++) { + redisReply *r = NULL; + if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply GET failed"); + char expect[64]; + int en = snprintf(expect, sizeof(expect), "v:%d", i); + must_bulk_eq(r, expect, (size_t)en, "pipeline GET reply"); + } + + printf("[OK] GET pipeline batch %d\n", N); +} + +void pipline_del_test(redisContext *c, int start, const char *op){ + const int N = 1000; + + /* cleanup:pipeline DEL */ + int end = start + N; + for (int i = start; i < end; i++) { + char kk[64]; + int kn = snprintf(kk, sizeof(kk), "p:%d", i); + if (redisAppendCommand( c, "%s %b", + op, + kk, (size_t)kn) != REDIS_OK) { + die(c, "redisAppendCommand DEL failed"); + } + } + for (int i = start; i < end; i++) { + redisReply *r = NULL; + if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply DEL failed"); + freeReplyObject(r); /* DEL 返回 int,这里不强制检查 */ + } + + printf("[OK] DEL pipeline batch %d\n", N); +} + + +int main(int argc, char **argv) { + if(argc < 4) { + printf("invalid input\n"); + return -1; + } + + const char *host = argv[1]; + int port = atoi(argv[2]); + int mode = atoi(argv[3]); + + redisContext *c = redisConnect(host, port); + if (!c || c->err) die(c, "connect failed"); + + printf("Connected to %s:%d\n", host, port); + + if(mode == 0){ + save(c); + }else if(mode == 1){ + basic_command_test(c); + + }else if(mode == 10){ + pipline_set_test(c, 0, "SET"); + }else if(mode == 11){ + pipline_get_test(c, 0, "GET"); + }else if(mode == 12){ + pipline_del_test(c, 0, "DEL"); + + }else if(mode == 20){ + pipline_set_test(c, 0, "RSET"); + }else if(mode == 21){ + pipline_get_test(c, 0, "RGET"); + }else if(mode == 22){ + pipline_del_test(c, 0, "RDEL"); + + }else if(mode == 30){ + pipline_set_test(c, 0, "HSET"); + }else if(mode == 31){ + pipline_get_test(c, 0, "HGET"); + }else if(mode == 32){ + pipline_del_test(c, 0, "HDEL"); + } + + redisFree(c); + printf("ALL TESTS PASSED.\n"); + return 0; +}