Files
ldb/kvstore.c
1iaan de21fe94ec bugfix: reactor网络模型的的半包解析错误问题。
全量持久化时清除增量持久化的记录。
2026-01-08 16:20:00 +08:00

428 lines
9.1 KiB
C

#include "kvstore.h"
#include "kvs_rw_tools.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#if ENABLE_ARRAY
extern kvs_array_t global_array;
#endif
#if ENABLE_RBTREE
extern kvs_rbtree_t global_rbtree;
#endif
#if ENABLE_HASH
extern kvs_hash_t global_hash;
#endif
int global_cmd_log_fd = -1;
void *kvs_malloc(size_t size) {
return malloc(size);
}
void kvs_free(void *ptr) {
return free(ptr);
}
const char *command[] = {
"SET", "GET", "DEL", "MOD", "EXIST",
"RSET", "RGET", "RDEL", "RMOD", "REXIST",
"HSET", "HGET", "HDEL", "HMOD", "HEXIST"
};
const char *response[] = {
};
int kvs_split_token(char *msg, char *tokens[]) {
if (msg == NULL || tokens == NULL) return -1;
int idx = 0;
char *token = strtok(msg, " ");
while (token != NULL) {
//printf("idx: %d, %s\n", idx, token);
tokens[idx ++] = token;
token = strtok(NULL, " ");
}
return idx;
}
// SET Key Value
// tokens[0] : SET
// tokens[1] : Key
// tokens[2] : Value
#if !BIN_SAFE
int kvs_filter_protocol(char **tokens, int count, char *response) {
if (tokens[0] == NULL || count == 0 || response == NULL) return -1;
int cmd = KVS_CMD_START;
for (cmd = KVS_CMD_START;cmd < KVS_CMD_COUNT;cmd ++) {
if (strcmp(tokens[0], command[cmd]) == 0) {
break;
}
}
int length = 0;
int ret = 0;
char *key = tokens[1];
char *value = tokens[2];
switch(cmd) {
#if ENABLE_ARRAY
case KVS_CMD_SET:
ret = kvs_array_set(&global_array ,key, value);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "EXIST\r\n");
}
break;
case KVS_CMD_GET: {
char *result = kvs_array_get(&global_array, key);
if (result == NULL) {
length = sprintf(response, "NO EXIST\r\n");
} else {
length = sprintf(response, "%s\r\n", result);
}
break;
}
case KVS_CMD_DEL:
ret = kvs_array_del(&global_array ,key);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
case KVS_CMD_MOD:
ret = kvs_array_mod(&global_array ,key, value);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
case KVS_CMD_EXIST:
ret = kvs_array_exist(&global_array ,key);
if (ret == 0) {
length = sprintf(response, "EXIST\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
#endif
// rbtree
#if ENABLE_RBTREE
case KVS_CMD_RSET:
ret = kvs_rbtree_set(&global_rbtree ,key, value);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "EXIST\r\n");
}
break;
case KVS_CMD_RGET: {
char *result = kvs_rbtree_get(&global_rbtree, key);
if (result == NULL) {
length = sprintf(response, "NO EXIST\r\n");
} else {
length = sprintf(response, "%s\r\n", result);
}
break;
}
case KVS_CMD_RDEL:
ret = kvs_rbtree_del(&global_rbtree ,key);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
case KVS_CMD_RMOD:
ret = kvs_rbtree_mod(&global_rbtree ,key, value);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
case KVS_CMD_REXIST:
ret = kvs_rbtree_exist(&global_rbtree ,key);
if (ret == 0) {
length = sprintf(response, "EXIST\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
#endif
#if ENABLE_HASH
case KVS_CMD_HSET:
ret = kvs_hash_set(&global_hash ,key, value);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "EXIST\r\n");
}
break;
case KVS_CMD_HGET: {
char *result = kvs_hash_get(&global_hash, key);
if (result == NULL) {
length = sprintf(response, "NO EXIST\r\n");
} else {
length = sprintf(response, "%s\r\n", result);
}
break;
}
case KVS_CMD_HDEL:
ret = kvs_hash_del(&global_hash ,key);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
case KVS_CMD_HMOD:
ret = kvs_hash_mod(&global_hash ,key, value);
if (ret < 0) {
length = sprintf(response, "ERROR\r\n");
} else if (ret == 0) {
length = sprintf(response, "OK\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
case KVS_CMD_HEXIST:
ret = kvs_hash_exist(&global_hash ,key);
if (ret == 0) {
length = sprintf(response, "EXIST\r\n");
} else {
length = sprintf(response, "NO EXIST\r\n");
}
break;
#endif
default:
assert(0);
}
return length;
}
#endif
#if NEW_KVSTORE
/**
* input : request request_length
* output : response response_length
* return : -1 error, =0 半包, 1 成功
*/
int kvs_protocol(char *request, int request_length, char *response, int *response_length){
if (!request || request_length <= 0 || !response || !response_length) return -1;
int consumed = 0;
int out_len = 0;
static int i = 0;
while(consumed < request_length ){
if(i > 33){
i = i+1;
i = i-1;
}
if(i == 47) i = 0;
++i;
kvs_req_t req;
memset(&req, 0, sizeof(kvs_req_t));
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
int len = kvs_parse_one_cmd(p, remain, &req);
if(len < 0){
// 解析失败
kvs_free_request(&req);
*response_length = out_len;
return -1;
}
else if(len == 0){
// 半包
kvs_free_request(&req);
break;
}
kvs_rsp_t rsp;
memset(&rsp, 0, sizeof(kvs_rsp_t));
// 执行失败
if (kvs_execute_one_cmd(&req, &rsp) < 0){
kvs_free_request(&req);
*response_length = out_len;
return -1;
}else{
// 执行成功,在这里保存到日志中。
if(rsp.status == KVS_STATUS_OK){
if(req.op == KVS_CMD_SET || req.op == KVS_CMD_MOD || req.op == KVS_CMD_DEL){
kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd);
}
}
}
int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len);
// 构建响应 <0 构建失败
kvs_free_request(&req);
if (resp_len < 0) {
*response_length = out_len;
return -1;
}
// printf("resp_len:%d\n", resp_len);
// printf("consumed:%d\n", len);
out_len += resp_len;
consumed += len;
}
*response_length = out_len;
return consumed;
}
#else
/*
* msg: request message
* length: length of request message
* response: need to send
* @return : length of response
*/
int kvs_protocol(char *msg, int length, char *response) { //
// SET Key Value
// GET Key
// DEL Key
if (msg == NULL || length <= 0 || response == NULL) return -1;
//printf("recv %d : %s\n", length, msg);
char *tokens[KVS_MAX_TOKENS] = {0};
int count = kvs_split_token(msg, tokens);
if (count == -1) return -1;
//memcpy(response, msg, length);
return kvs_filter_protocol(tokens, count, response);
}
#endif
int init_cmd_log(const char *file, int *logfd){
if(!file) return -1;
int fd = open(file, O_RDWR | O_CREAT | O_APPEND, 0644);
if(fd < 0) return -2;
*logfd = fd;
return 0;
}
int destroy_cmd_log(int logfd){
close(logfd);
return 0;
}
int init_kvengine(void) {
#if ENABLE_ARRAY
memset(&global_array, 0, sizeof(kvs_array_t));
kvs_array_create(&global_array);
kvs_array_load(&global_array, KVS_ARRAY_FILE);
#endif
#if ENABLE_RBTREE
memset(&global_rbtree, 0, sizeof(kvs_rbtree_t));
kvs_rbtree_create(&global_rbtree);
#endif
#if ENABLE_HASH
memset(&global_hash, 0, sizeof(kvs_hash_t));
kvs_hash_create(&global_hash);
#endif
init_cmd_log(KVS_CMD_LOG_FILE, &global_cmd_log_fd);
kvs_replay_log(KVS_CMD_LOG_FILE, global_cmd_log_fd);
return 0;
}
void dest_kvengine(void) {
#if ENABLE_ARRAY
kvs_array_destroy(&global_array);
#endif
#if ENABLE_RBTREE
kvs_rbtree_destroy(&global_rbtree);
#endif
#if ENABLE_HASH
kvs_hash_destroy(&global_hash);
#endif
destroy_cmd_log(global_cmd_log_fd);
}
int main(int argc, char *argv[]) {
if (argc != 2) return -1;
int port = atoi(argv[1]);
init_kvengine();
#if (NETWORK_SELECT == NETWORK_REACTOR)
reactor_start(port, kvs_protocol); //
#elif (NETWORK_SELECT == NETWORK_PROACTOR)
proactor_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
ntyco_start(port, kvs_protocol);
#endif
dest_kvengine();
}