Files
ldb/kvstore.c

312 lines
8.4 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "kvstore.h"
#include "kvs_rw_tools.h"
#include "kvs_protocol_resp.h"
#include "dump/kvs_dump.h"
#include "memory/alloc_dispatch.h"
#include "common/config.h"
#include "diskuring/diskuring.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <libxml/parser.h>
extern int slave_bootstrap(const char *listen_ip, int listen_port, const char *master_ip, int master_port);
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
extern mp_pool_t global_mempool;
#endif
AppConfig global_cfg;
extern int global_oplog_fd;
extern iouring_ctx_t global_uring_ctx;
int kvs_protocol(struct conn* conn){
if (!conn) return -1;
char *request = conn->rbuffer;
int request_length = conn->rlength;
char *response = conn->wbuffer;
int *response_length = &conn->wlength;
if (!request || request_length <= 0 || !response || !response_length) return -1;
int consumed = 0;
int out_len = 0;
while(consumed < request_length ){
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
resp_cmd_t cmd;
memset(&cmd, 0, sizeof(cmd));
int len = resp_parse_one_cmd(p, remain, &cmd);
if(len < 0){
/* 协议错误:直接返回,已构建的响应仍可写回 */
*response_length = out_len;
return -1;
}
else if(len == 0){
// 半包
break;
}
resp_value_t val;
memset(&val, 0, sizeof(val));
int dr = resp_dispatch(&cmd, &val);
/*
* 语义建议:
* - resp_dispatch() 即使返回 -1比如 unknown command / wrong argc
* 一般也已经把 out_value 设置成了 RESP error这样客户端能收到错误响应。
* - 如果 dr < 0 但 val.type 没被正确设置,兜底回一个通用错误。
*/
if(dr < 0){
if (val.type != RESP_T_SIMPLE_STR &&
val.type != RESP_T_ERROR &&
val.type != RESP_T_INTEGER &&
val.type != RESP_T_BULK_STR &&
val.type != RESP_T_NIL) {
val = resp_error("ERR dispatch failed");
}
} else {
// persist into oplog
if(global_cfg.persistence == PERSIST_INCREMENTAL){
/* 执行成功:在这里保存到日志中(只记录更新类命令) */
if (cmd.argc > 0 && cmd.argv[0].ptr) {
/* 更新类命令SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */
const resp_slice_t *c0 = &cmd.argv[0];
int is_update = 0;
if (c0->ptr && c0->len) {
if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) {
is_update = 1;
}
}
if (is_update) {
kvs_oplog_append(p, len, global_oplog_fd);
}
}
}
}
/* 构建响应 */
int cap = KVS_MAX_RESPONSE - out_len;
if (cap <= 0) {
*response_length = out_len;
return consumed;
}
int resp_len = resp_build_value(&val, response + out_len, (size_t)cap);
if (resp_len < 0) {
*response_length = out_len;
return consumed;
}
out_len += resp_len;
consumed += len;
}
*response_length = out_len;
return consumed;
}
int init_kvengine(void) {
#if ENABLE_ARRAY
memset(&global_array, 0, sizeof(kvs_array_t));
kvs_array_create(&global_array);
kvs_array_load(&global_array, global_array_file);
#endif
#if ENABLE_RBTREE
memset(&global_rbtree, 0, sizeof(kvs_rbtree_t));
kvs_rbtree_create(&global_rbtree);
kvs_rbtree_load(&global_rbtree, global_rbtree_file);
#endif
#if ENABLE_HASH
memset(&global_hash, 0, sizeof(kvs_hash_t));
kvs_hash_create(&global_hash);
kvs_hash_load(&global_hash, global_hash_file);
#endif
if(global_cfg.persistence == PERSIST_INCREMENTAL){
init_cmd_log(global_oplog_file, &global_oplog_fd);
kvs_replay_log(global_oplog_fd);
}
printf("kvengine init complete\n");
return 0;
}
void dest_kvengine(void) {
#if ENABLE_ARRAY
kvs_array_destroy(&global_array);
#endif
#if ENABLE_RBTREE
kvs_rbtree_destroy(&global_rbtree);
#endif
#if ENABLE_HASH
kvs_hash_destroy(&global_hash);
#endif
destroy_cmd_log(global_oplog_fd);
}
void init_memory_pool(AppConfig *cfg){
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
mp_create(&global_mempool);
#endif
kvs_set_memleak_detect(cfg->leak_mode);
kvs_set_alloc_type(cfg->allocator);
printf("mempool init complete\n");
}
void dest_memory_pool(void){
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
mp_destroy(&global_mempool);
#endif
}
static int ensure_dir_exists(const char *dir)
{
struct stat st;
if (stat(dir, &st) == 0) {
return S_ISDIR(st.st_mode) ? 0 : -2; // 存在但不是目录
}
if (mkdir(dir, 0755) == 0) return 0;
if (errno == EEXIST) return 0;
return -1;
}
static int join_path(char *out, size_t out_sz, const char *dir, const char *file)
{
if (!out || out_sz == 0 || !dir || !file) return -1;
size_t dlen = strlen(dir);
if (dlen == 0) return -1;
int need_slash = (dir[dlen - 1] != '/');
int n = snprintf(out, out_sz, need_slash ? "%s/%s" : "%s%s", dir, file);
if (n < 0 || (size_t)n >= out_sz) return -2; // 截断了
return 0;
}
void init_data_file(AppConfig *cfg){
ensure_dir_exists(cfg->persist_dir);
join_path(global_oplog_file, sizeof(global_oplog_file), cfg->persist_dir, cfg->oplog_file);
join_path(global_array_file, sizeof(global_array_file), cfg->persist_dir, cfg->array_file);
join_path(global_rbtree_file, sizeof(global_rbtree_file), cfg->persist_dir, cfg->rbtree_file);
join_path(global_hash_file, sizeof(global_hash_file), cfg->persist_dir, cfg->hash_file);
}
int init_config(AppConfig *cfg){
xmlInitParser();
if (config_load("config/config.xml", cfg) != 0) {
fprintf(stderr, "Failed to load config/config.xml\n");
xmlCleanupParser();
return -1;
}
printf("=============== Config ===============\n");
printf("IP : %s\n", cfg->ip);
printf("Port : %d\n", cfg->port);
printf("Mode : %s\n", server_mode_to_string(cfg->mode));
printf("|—— Master IP : %s\n", cfg->master_ip);
printf("|—— Master Port : %d\n", cfg->master_port);
printf("Persistence : %s\n", persistence_to_string(cfg->persistence));
printf("|—— Persist-dir : %s\n", cfg->persist_dir);
printf("|—— Persist-oplog : %s\n", cfg->oplog_file);
printf("|—— Persist-array : %s\n", cfg->array_file);
printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file);
printf("|—— Persist-hash : %s\n", cfg->hash_file);
printf("Log level : %s\n", log_level_to_string(cfg->log_level));
printf("Memory : \n");
printf("|——Allocator : %s\n", allocator_to_string(cfg->allocator));
printf("|——MemLeakDetectMode : %s\n", leakage_to_string(cfg->leak_mode));
printf("=============== Config ===============\n");
xmlCleanupParser();
return 0;
}
void init_disk_uring(iouring_ctx_t *uring_ctx){
iouring_init(uring_ctx, 256);
}
int main(int argc, char *argv[]) {
if(-1 == init_config(&global_cfg)){
printf("Init Config error");
return -1;
}
init_data_file(&global_cfg);
init_disk_uring(&global_uring_ctx);
int port = global_cfg.port;
char *master_ip = NULL;
int master_port = -1;
if(global_cfg.mode == MODE_SLAVE){
master_ip = global_cfg.master_ip;
master_port = global_cfg.master_port;
slave_bootstrap(global_cfg.ip, port, master_ip, master_port);
}else if(global_cfg.mode == MODE_MASTER){
}
init_memory_pool(&global_cfg);
init_kvengine();
#if (NETWORK_SELECT == NETWORK_REACTOR)
reactor_start(port, kvs_protocol); //
#elif (NETWORK_SELECT == NETWORK_PROACTOR)
proactor_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
ntyco_start(port, kvs_protocol);
#endif
dest_kvengine();
dest_memory_pool();
}