#include "kvstore.h" #include "kvs_rw_tools.h" #include "mem_pool/mem_pool.h" #include #include #include #include #include #include #include #include #if ENABLE_ARRAY extern kvs_array_t global_array; #endif #if ENABLE_RBTREE extern kvs_rbtree_t global_rbtree; #endif #if ENABLE_HASH extern kvs_hash_t global_hash; #endif #if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC extern mp_pool_t global_mempool; #endif #define KVS_SERVER_NOT_INIT 0 #define KVS_SERVER_MASETER 1 #define KVS_SERVER_SLAVE 2 int kvs_server_role = KVS_SERVER_NOT_INIT; int global_cmd_log_fd = -1; const char *command[] = { "SET", "GET", "DEL", "MOD", "EXIST", "RSET", "RGET", "RDEL", "RMOD", "REXIST", "HSET", "HGET", "HDEL", "HMOD", "HEXIST" }; const char *response[] = { }; // return: -1 fail, 0 half, >0 consumed int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){ if (!request || request_length <= 0 || !req_out) return -1; req_out->op = KVS_CMD_COUNT; req_out->argc = 0; req_out->args = NULL; const uint8_t *p = request; const uint8_t *end = request + (size_t)request_length; // OP + ARGC if (kvs_need(p, end, 2)) { return 0; // NEED_MORE } uint8_t op = 0, argc = 0; if (kvs_read_u8(&p, end, &op) < 0) return -1; if (kvs_read_u8(&p, end, &argc) < 0) return -1; if (argc > KVS_MAX_ARGC) return -1; // 先扫描一遍确认整条命令数据都在 buffer 里 const uint8_t *scan = p; uint32_t lens[KVS_MAX_ARGC]; if (argc > 0) { for (uint8_t i = 0; i < argc; i++) { if (kvs_need(scan, end, 4)) { return 0; // NEED_MORE } uint32_t alen = 0; if (kvs_read_u32(&scan, end, &alen) < 0) return -1; // 防御:单个参数长度限制 if (alen > KVS_MAX_ARGLEN) return -1; // 防御:scan + alen 越界 / 半包 if (kvs_need(scan, end, (size_t)alen)) { return 0; // NEED_MORE } lens[i] = alen; scan += alen; } } size_t total_len = (size_t)(scan - request); if (total_len > KVS_MAX_CMD_BYTES) return -1; req_out->op = op; req_out->argc = argc; if (argc == 0) { return (int)total_len; } kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t)); if (!args) { kvs_free_request(req_out); return -1; } memset(args, 0, (size_t)argc * sizeof(kvs_arg_t)); for (uint8_t i = 0; i < argc; i++) { uint32_t alen = 0; if (kvs_read_u32(&p, end, &alen) < 0) { kvs_free(args); kvs_free_request(req_out); return -1; } // alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常 if (alen != lens[i]) { kvs_free(args); kvs_free_request(req_out); return -1; } args[i].len = alen; args[i].data = p; // 直接指向输入 buffer(零拷贝) p += alen; } req_out->args = args; return (int)(p - request); } void kvs_free_request(kvs_req_t *req) { if (!req) return; if (req->args) { kvs_free(req->args); req->args = NULL; } req->op = KVS_CMD_COUNT; req->argc = 0; } /** * 输入:req * 输出:rsp * 返回:-1 失败,参数错误,0 成功 */ int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) { if(!req || !rsp_out) return -1; rsp_out->op = req->op; rsp_out->status = KVS_STATUS_ERROR; rsp_out->data = NULL; rsp_out->dlen = 0; int argc = req->argc; kvs_cmd_t op = req->op; kvs_arg_t *argv = req->args; uint32_t key_len = 0; const void *key = NULL; uint32_t value_len = 0; const void *val = NULL; if(argc == 1){ key_len = argv[0].len; key = argv[0].data; }else if(argc == 2){ key_len = argv[0].len; key = argv[0].data; value_len = argv[1].len; val = argv[1].data; } // 基本参数校验(按你原有命令语义) switch (op) { case KVS_CMD_SET: case KVS_CMD_MOD: case KVS_CMD_RSET: case KVS_CMD_RMOD: case KVS_CMD_HSET: case KVS_CMD_HMOD: if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } break; case KVS_CMD_GET: case KVS_CMD_DEL: case KVS_CMD_EXIST: case KVS_CMD_RGET: case KVS_CMD_RDEL: case KVS_CMD_REXIST: case KVS_CMD_HGET: case KVS_CMD_HDEL: case KVS_CMD_HEXIST: case KVS_CMD_PSYNC: if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } break; case KVS_CMD_SAVE: if(argc != 0) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } break; default: rsp_out->status = KVS_STATUS_BADREQ; return -1; } int ret = 0; const char *result = NULL; switch (op) { #if ENABLE_ARRAY case KVS_CMD_SET: ret = kvs_array_set_bin(&global_array, key, key_len, val, value_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_EXIST; return 0; case KVS_CMD_GET: result = kvs_array_get_bin(&global_array, key, key_len, &value_len); if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } rsp_out->status = KVS_STATUS_OK; rsp_out->data = (uint8_t*)result; rsp_out->dlen = (uint32_t)value_len; return 0; case KVS_CMD_DEL: ret = kvs_array_del_bin(&global_array, key, key_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_NO_EXIST; return 0; case KVS_CMD_MOD: ret = kvs_array_mod_bin(&global_array, key, key_len, val, value_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_NO_EXIST; return 0; case KVS_CMD_EXIST: ret = kvs_array_exist_bin(&global_array, key, key_len); rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; return 0; #endif #if ENABLE_RBTREE case KVS_CMD_RSET: ret = kvs_rbtree_set(&global_rbtree, key, key_len, val, value_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_EXIST; return 0; case KVS_CMD_RGET: result = kvs_rbtree_get(&global_rbtree, key, key_len, &value_len); if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } rsp_out->status = KVS_STATUS_OK; rsp_out->data = (uint8_t*)result; rsp_out->dlen = (uint32_t)value_len; return 0; case KVS_CMD_RDEL: ret = kvs_rbtree_del(&global_rbtree, key, key_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_NO_EXIST; return 0; case KVS_CMD_RMOD: ret = kvs_rbtree_mod(&global_rbtree, key, key_len, val, value_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_NO_EXIST; return 0; case KVS_CMD_REXIST: ret = kvs_rbtree_exist(&global_rbtree, key, key_len); rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; return 0; #endif #if ENABLE_HASH case KVS_CMD_HSET: ret = kvs_hash_set_bin(&global_hash, key, key_len, val, value_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_EXIST; return 0; case KVS_CMD_HGET: result = kvs_hash_get_bin(&global_hash, key, key_len, &value_len); if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } rsp_out->status = KVS_STATUS_OK; rsp_out->data = (uint8_t*)result; rsp_out->dlen = (uint32_t)value_len; return 0; case KVS_CMD_HDEL: ret = kvs_hash_del_bin(&global_hash, key, key_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_NO_EXIST; return 0; case KVS_CMD_HMOD: ret = kvs_hash_mod_bin(&global_hash, key, key_len, val, value_len); if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; else if (ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_NO_EXIST; return 0; case KVS_CMD_HEXIST: ret = kvs_hash_exist_bin(&global_hash, key, key_len); rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; return 0; #endif case KVS_CMD_SAVE: ret = kvs_save_to_file(); if(ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_ERROR; return 0; case KVS_CMD_PSYNC: rsp_out->op = req->op; rsp_out->status = KVS_STATUS_OK; return 0; default: rsp_out->status = KVS_STATUS_BADREQ; return -1; } return -1; } /** * 构建单条响应 * 返回:-1 失败,>=0 响应长度 */ int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){ if (!results || !response) return -1; const uint8_t *end = response + response_cap; uint8_t *p = response; // 计算所需长度:1 + 1 + 4 + dlen // 注意防止 size_t 溢出 size_t need = 1u + 1u + 4u + (size_t)results->dlen; if (need > response_cap) return -1; if (kvs_write_u8(&p, end, (uint8_t)results->op) < 0) return -1; if (kvs_write_u8(&p, end, results->status) < 0) return -1; if (kvs_write_u32(&p, end, results->dlen) < 0) return -1; if (results->dlen > 0) { if (!results->data) return -1; // 有长度却没指针,视为错误 if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1; memcpy(p, results->data, results->dlen); p += results->dlen; } return (int)(p - response); } int kvs_save_to_file(){ int ret = 0; int rc = 0; #if ENABLE_ARRAY rc = kvs_array_save(&global_array, KVS_ARRAY_FILE); if(rc < 0){ printf("kvs_engine_array save error\n"); ret = -1; } #endif #if ENABLE_RBTREE rc = kvs_rbtree_save(&global_rbtree, KVS_RBTREE_FILE); if(rc < 0){ printf("kvs_engine_rbtree save error\n"); ret = -1; } #endif #if ENABLE_HASH rc = kvs_hash_save(&global_hash, KVS_HASH_FILE); if(rc < 0){ printf("kvs_engine_hash save error\n"); ret = -1; } #endif ksv_clear_log(global_cmd_log_fd); return ret; } int is_update_cmd(kvs_cmd_t op){ if(op == KVS_CMD_SET || op == KVS_CMD_RSET || op == KVS_CMD_HSET || op == KVS_CMD_MOD || op == KVS_CMD_RMOD || op == KVS_CMD_HMOD || op == KVS_CMD_DEL || op == KVS_CMD_RDEL || op == KVS_CMD_HDEL){ return 1; } return 0; } /** * input : request request_length * output : response response_length * return : -1 error, =0 半包, 1 成功 */ // int kvs_protocol(char *request, int request_length, char *response, int *response_length){ int kvs_protocol(struct conn* conn){ if (!conn) return -1; char *request = conn->rbuffer; int request_length = conn->rlength; char *response = conn->wbuffer; int *response_length = &conn->wlength; if (!request || request_length <= 0 || !response || !response_length) return -1; int consumed = 0; int out_len = 0; static int i = 0; while(consumed < request_length ){ if(i > 33){ i = i+1; i = i-1; } if(i == 47) i = 0; ++i; kvs_req_t req; memset(&req, 0, sizeof(kvs_req_t)); const uint8_t *p = request+consumed; int remain = request_length - consumed; int len = kvs_parse_one_cmd(p, remain, &req); if(len < 0){ // 解析失败 kvs_free_request(&req); *response_length = out_len; return -1; } else if(len == 0){ // 半包 kvs_free_request(&req); break; } kvs_rsp_t rsp; memset(&rsp, 0, sizeof(kvs_rsp_t)); // 执行失败 if (kvs_execute_one_cmd(&req, &rsp) < 0){ kvs_free_request(&req); *response_length = out_len; return -1; }else{ // 执行成功,在这里保存到日志中。 if(rsp.status == KVS_STATUS_OK){ if(is_update_cmd(req.op)){ kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); } } } if(req.op == KVS_CMD_PSYNC){ build_thread_to_sync(req.args->data, conn); } int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len); // 构建响应 <0 构建失败 kvs_free_request(&req); if (resp_len < 0) { *response_length = out_len; return -1; } out_len += resp_len; consumed += len; } // slave 暂时不需要回报,或者回一个new_offset if(conn->is_from_master){ conn->wlength = 0; return consumed; } *response_length = out_len; return consumed; } extern void sync_wakeup(int fd); static int g_slavefd = -1; static uint64_t g_offset = 0; static void *sync_thread_main(void *arg) { struct conn *conn = (struct conn*) arg; int logfd = open(KVS_CMD_LOG_FILE, O_RDONLY); if (logfd < 0) { printf("open replaylog failed: %s\n", strerror(errno)); return NULL; } pthread_mutex_lock(&conn->g_sync_lock); uint64_t off = g_offset; pthread_mutex_unlock(&conn->g_sync_lock); while (1) { // 单槽位:等 reactor 发完再填 pthread_mutex_lock(&conn->g_sync_lock); int busy = (conn->wlength > 0); pthread_mutex_unlock(&conn->g_sync_lock); if (busy) { usleep(10 * 1000); continue; } size_t filled = 0; int records = 0; // 试图攒一批 while (filled < (size_t)KVS_MAX_RESPONSE && records < 128) { // 读 len 头 uint32_t nlen = 0; ssize_t r = pread(logfd, &nlen, sizeof(nlen), (off_t)off); if (r == 0) { // EOF:文件当前没更多数据 break; } if (r < 0) { if (errno == EINTR) continue; printf("pread len error: %s\n", strerror(errno)); close(logfd); return NULL; } if (r < (ssize_t)sizeof(nlen)) { // 半截 len:writer 还没写完头 break; } uint32_t len = ntohl(nlen); if(len <= 0) { printf("sync error\n"); } // 这一条放不进本批次,就先发已有的 if (filled + len > (size_t)KVS_MAX_RESPONSE) { break; } // 读 payload(cmd) ssize_t pr = pread(logfd, conn->wbuffer + filled, len, (off_t)(off + sizeof(nlen))); if (pr == 0) { // payload 还没写到 break; } if (pr < 0) { if (errno == EINTR) continue; printf("pread payload error: %s\n", strerror(errno)); close(logfd); return NULL; } if (pr < (ssize_t)len) { // 半截 payload:writer 还没写完这一条 break; } // 成功拿到一条完整记录:推进 off += sizeof(nlen) + (uint64_t)len; filled += (size_t)len; records++; } if (filled > 0) { // 提交给 reactor 发送 pthread_mutex_lock(&conn->g_sync_lock); conn->wlength = (int)filled; g_offset = off; pthread_mutex_unlock(&conn->g_sync_lock); // 唤醒 reactor 发 sync_wakeup(conn->fd); // 或 g_slavefd continue; } // 没攒到任何完整记录:说明真到末尾/半条记录,等一会儿 usleep(10*1000); } close(logfd); return NULL; } void build_thread_to_sync(const uint8_t *offset, struct conn* conn){ uint64_t off64 = 0; memcpy(&off64, offset, 8); pthread_mutex_lock(&conn->g_sync_lock); g_slavefd = conn->fd; g_offset = (uint64_t)off64; conn->wlength = 0; pthread_mutex_unlock(&conn->g_sync_lock); pthread_t tid; int rc = pthread_create(&tid, NULL, sync_thread_main, conn); if (rc != 0) { printf("pthread_create failed: %s\n", strerror(rc)); return; } pthread_detach(tid); } int init_kvengine(void) { #if ENABLE_ARRAY memset(&global_array, 0, sizeof(kvs_array_t)); kvs_array_create(&global_array); kvs_array_load(&global_array, KVS_ARRAY_FILE); #endif #if ENABLE_RBTREE memset(&global_rbtree, 0, sizeof(kvs_rbtree_t)); kvs_rbtree_create(&global_rbtree); kvs_rbtree_load(&global_rbtree, KVS_RBTREE_FILE); #endif #if ENABLE_HASH memset(&global_hash, 0, sizeof(kvs_hash_t)); kvs_hash_create(&global_hash); kvs_hash_load(&global_hash, KVS_HASH_FILE); #endif init_cmd_log(KVS_CMD_LOG_FILE, &global_cmd_log_fd); kvs_replay_log(KVS_CMD_LOG_FILE, global_cmd_log_fd); return 0; } void dest_kvengine(void) { #if ENABLE_ARRAY kvs_array_destroy(&global_array); #endif #if ENABLE_RBTREE kvs_rbtree_destroy(&global_rbtree); #endif #if ENABLE_HASH kvs_hash_destroy(&global_hash); #endif destroy_cmd_log(global_cmd_log_fd); } void init_memory_pool(void){ #if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC mp_init(&global_mempool); #endif } void dest_memory_pool(void){ #if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC mp_destroy(&global_mempool); #endif } /** * ./ksvtore [role] [port] * ./kvstore master 8888 * * ./ksvtore [role] [port] [masterip] [masterport] * ./kvstore slave 7000 192.168.10.129 8888 */ int main(int argc, char *argv[]) { if (argc < 3) return -1; char *role = argv[1]; int port = atoi(argv[2]); char *master_ip = NULL; int master_port = -1; if(strcmp(role, "master") == 0){ kvs_server_role = KVS_SERVER_SLAVE; }else if(strcmp(role, "slave") == 0){ kvs_server_role = KVS_SERVER_MASETER; if(argc < 5) return -1; master_ip = argv[3]; master_port = atoi(argv[4]); } init_memory_pool(); init_kvengine(); #if (NETWORK_SELECT == NETWORK_REACTOR) reactor_start(port, kvs_protocol, master_ip, master_port); // #elif (NETWORK_SELECT == NETWORK_PROACTOR) proactor_start(port, kvs_protocol); #elif (NETWORK_SELECT == NETWORK_NTYCO) ntyco_start(port, kvs_protocol); #endif dest_kvengine(); dest_memory_pool(); }