Files
ldb/kvstore.c
2026-01-20 11:51:38 +00:00

593 lines
16 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "kvstore.h"
#include "kvs_rw_tools.h"
#include "kvs_protocol_resp.h"
#include "kvs_inc_log.h"
#include "mem_pool/mem_pool.h"
#include "common/config.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <libxml/parser.h>
#if ENABLE_ARRAY
extern kvs_array_t global_array;
#endif
#if ENABLE_RBTREE
extern kvs_rbtree_t global_rbtree;
#endif
#if ENABLE_HASH
extern kvs_hash_t global_hash;
#endif
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
extern mp_pool_t global_mempool;
#endif
AppConfig global_cfg;
int global_cmd_log_fd = -1;
char global_oplog_file[256] = "kvs_oplog.default.db";
char global_array_file[256] = "kvs_array.default.db";
char global_rbtree_file[256] = "kvs_rbtree.default.db";
char global_hash_file[256] = "kvs_hash.default.db";
int is_update_cmd(kvs_cmd_t op){
if(op == KVS_CMD_SET || op == KVS_CMD_RSET || op == KVS_CMD_HSET
|| op == KVS_CMD_MOD || op == KVS_CMD_RMOD || op == KVS_CMD_HMOD
|| op == KVS_CMD_DEL || op == KVS_CMD_RDEL || op == KVS_CMD_HDEL){
return 1;
}
return 0;
}
/**
* input : request request_length
* output : response response_length
* return : -1 error, =0 半包, 1 成功
*/
// int kvs_protocol(char *request, int request_length, char *response, int *response_length){
// int kvs_protocol(struct conn* conn){
// if (!conn) return -1;
// char *request = conn->rbuffer;
// int request_length = conn->rlength;
// char *response = conn->wbuffer;
// int *response_length = &conn->wlength;
// if (!request || request_length <= 0 || !response || !response_length) return -1;
// int consumed = 0;
// int out_len = 0;
// static int i = 0;
// while(consumed < request_length ){
// if(i > 33){
// i = i+1;
// i = i-1;
// }
// if(i == 47) i = 0;
// ++i;
// kvs_req_t req;
// memset(&req, 0, sizeof(kvs_req_t));
// const uint8_t *p = request+consumed;
// int remain = request_length - consumed;
// int len = kvs_parse_one_cmd(p, remain, &req);
// if(len < 0){
// // 解析失败
// kvs_free_request(&req);
// *response_length = out_len;
// return -1;
// }
// else if(len == 0){
// // 半包
// kvs_free_request(&req);
// break;
// }
// kvs_rsp_t rsp;
// memset(&rsp, 0, sizeof(kvs_rsp_t));
// // 执行失败
// if (kvs_execute_one_cmd(&req, &rsp) < 0){
// kvs_free_request(&req);
// *response_length = out_len;
// return -1;
// }else{
// // 执行成功,在这里保存到日志中。
// if(rsp.status == KVS_STATUS_OK){
// if(is_update_cmd(req.op)){
// kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd);
// }
// }
// }
// if(req.op == KVS_CMD_PSYNC){
// build_thread_to_sync(req.args->data, conn);
// }
// int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len);
// // 构建响应 <0 构建失败
// kvs_free_request(&req);
// if (resp_len < 0) {
// *response_length = out_len;
// return -1;
// }
// out_len += resp_len;
// consumed += len;
// }
// // slave 暂时不需要回报或者回一个new_offset
// if(conn->is_from_master){
// conn->wlength = 0;
// return consumed;
// }
// *response_length = out_len;
// return consumed;
// }
int kvs_protocol(struct conn* conn){
if (!conn) return -1;
char *request = conn->rbuffer;
int request_length = conn->rlength;
char *response = conn->wbuffer;
int *response_length = &conn->wlength;
if (!request || request_length <= 0 || !response || !response_length) return -1;
int consumed = 0;
int out_len = 0;
while(consumed < request_length ){
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
resp_cmd_t cmd;
memset(&cmd, 0, sizeof(cmd));
int len = resp_parse_one_cmd(p, remain, &cmd);
if(len < 0){
/* 协议错误:直接返回,已构建的响应仍可写回 */
*response_length = out_len;
return -1;
}
else if(len == 0){
// 半包
break;
}
resp_value_t val;
memset(&val, 0, sizeof(val));
int dr = resp_dispatch(&cmd, &val);
/*
* 语义建议:
* - resp_dispatch() 即使返回 -1比如 unknown command / wrong argc
* 一般也已经把 out_value 设置成了 RESP error这样客户端能收到错误响应。
* - 如果 dr < 0 但 val.type 没被正确设置,兜底回一个通用错误。
*/
if(dr < 0){
if (val.type != RESP_T_SIMPLE_STR &&
val.type != RESP_T_ERROR &&
val.type != RESP_T_INTEGER &&
val.type != RESP_T_BULK_STR &&
val.type != RESP_T_NIL) {
val = resp_error("ERR dispatch failed");
}
} else {
// persist into o o
if(global_cfg.persistence == PERSIST_INCREMENTAL){
/* 执行成功:在这里保存到日志中(只记录更新类命令) */
if (cmd.argc > 0 && cmd.argv[0].ptr) {
/* 仅当返回 OK 时记录 */
int is_ok = (val.type == RESP_T_SIMPLE_STR &&
val.bulk.ptr && val.bulk.len == 2 &&
((val.bulk.ptr[0] == 'O' || val.bulk.ptr[0] == 'o') &&
(val.bulk.ptr[1] == 'K' || val.bulk.ptr[1] == 'k')));
if (is_ok) {
/* 更新类命令SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */
const resp_slice_t *c0 = &cmd.argv[0];
int is_update = 0;
if (c0->ptr && c0->len) {
if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) {
is_update = 1;
}
}
if (is_update) {
kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd);
}
}
}
}
}
/* PSYNC触发同步线程按你原来逻辑从 argv[1] 取参数) */
if (cmd.argc > 0 && cmd.argv[0].ptr &&
ascii_casecmp(cmd.argv[0].ptr, cmd.argv[0].len, "PSYNC") == 0) {
if (cmd.argc >= 2 && cmd.argv[1].ptr) {
build_thread_to_sync((const char *)cmd.argv[1].ptr, conn);
} else {
/* 如果你希望 PSYNC 无参也能触发,可以传 NULL 或空串 */
build_thread_to_sync(NULL, conn);
}
}
/* 构建响应 */
int cap = KVS_MAX_RESPONSE - out_len;
if (cap <= 0) {
*response_length = out_len;
return -1;
}
int resp_len = resp_build_value(&val, response + out_len, (size_t)cap);
if (resp_len < 0) {
*response_length = out_len;
return -1;
}
out_len += resp_len;
consumed += len;
}
// slave 暂时不需要回报或者回一个new_offset
if(conn->is_from_master){
conn->wlength = 0;
return consumed;
}
*response_length = out_len;
return consumed;
}
int kvs_save_to_file(){
int ret = 0;
int rc = 0;
#if ENABLE_ARRAY
rc = kvs_array_save(&global_array, global_array_file);
if(rc < 0){
printf("kvs_engine_array save error\n");
ret = -1;
}
#endif
#if ENABLE_RBTREE
rc = kvs_rbtree_save(&global_rbtree, global_rbtree_file);
if(rc < 0){
printf("kvs_engine_rbtree save error\n");
ret = -1;
}
#endif
#if ENABLE_HASH
rc = kvs_hash_save(&global_hash, global_hash_file);
if(rc < 0){
printf("kvs_engine_hash save error\n");
ret = -1;
}
#endif
ksv_clear_log(global_cmd_log_fd);
return ret;
}
extern void sync_wakeup(int fd);
static int g_slavefd = -1;
static uint64_t g_offset = 0;
static void *sync_thread_main(void *arg) {
struct conn *conn = (struct conn*) arg;
int logfd = open(global_oplog_file, O_RDONLY);
if (logfd < 0) {
printf("open replaylog failed: %s\n", strerror(errno));
return NULL;
}
pthread_mutex_lock(&conn->g_sync_lock);
uint64_t off = g_offset;
pthread_mutex_unlock(&conn->g_sync_lock);
while (1) {
// 单槽位:等 reactor 发完再填
pthread_mutex_lock(&conn->g_sync_lock);
int busy = (conn->wlength > 0);
pthread_mutex_unlock(&conn->g_sync_lock);
if (busy) { usleep(10 * 1000); continue; }
size_t filled = 0;
int records = 0;
// 试图攒一批
while (filled < (size_t)KVS_MAX_RESPONSE && records < 128) {
// 读 len 头
uint32_t nlen = 0;
ssize_t r = pread(logfd, &nlen, sizeof(nlen), (off_t)off);
if (r == 0) {
// EOF文件当前没更多数据
break;
}
if (r < 0) {
if (errno == EINTR) continue;
printf("pread len error: %s\n", strerror(errno));
close(logfd);
return NULL;
}
if (r < (ssize_t)sizeof(nlen)) {
// 半截 lenwriter 还没写完头
break;
}
uint32_t len = ntohl(nlen);
if(len <= 0) {
printf("sync error\n");
}
// 这一条放不进本批次,就先发已有的
if (filled + len > (size_t)KVS_MAX_RESPONSE) {
break;
}
// 读 payload(cmd)
ssize_t pr = pread(logfd, conn->wbuffer + filled, len,
(off_t)(off + sizeof(nlen)));
if (pr == 0) {
// payload 还没写到
break;
}
if (pr < 0) {
if (errno == EINTR) continue;
printf("pread payload error: %s\n", strerror(errno));
close(logfd);
return NULL;
}
if (pr < (ssize_t)len) {
// 半截 payloadwriter 还没写完这一条
break;
}
// 成功拿到一条完整记录:推进
off += sizeof(nlen) + (uint64_t)len;
filled += (size_t)len;
records++;
}
if (filled > 0) {
// 提交给 reactor 发送
pthread_mutex_lock(&conn->g_sync_lock);
conn->wlength = (int)filled;
g_offset = off;
pthread_mutex_unlock(&conn->g_sync_lock);
// 唤醒 reactor 发
sync_wakeup(conn->fd); // 或 g_slavefd
continue;
}
// 没攒到任何完整记录:说明真到末尾/半条记录,等一会儿
usleep(10*1000);
}
close(logfd);
return NULL;
}
void build_thread_to_sync(const uint8_t *offset, struct conn* conn){
uint64_t off64 = 0;
memcpy(&off64, offset, 8);
pthread_mutex_lock(&conn->g_sync_lock);
g_slavefd = conn->fd;
g_offset = (uint64_t)off64;
printf("offset:%ld\n", off64);
conn->wlength = 0;
pthread_mutex_unlock(&conn->g_sync_lock);
pthread_t tid;
int rc = pthread_create(&tid, NULL, sync_thread_main, conn);
if (rc != 0) {
printf("pthread_create failed: %s\n", strerror(rc));
return;
}
pthread_detach(tid);
}
int init_kvengine(void) {
#if ENABLE_ARRAY
memset(&global_array, 0, sizeof(kvs_array_t));
kvs_array_create(&global_array);
kvs_array_load(&global_array, global_array_file);
#endif
#if ENABLE_RBTREE
memset(&global_rbtree, 0, sizeof(kvs_rbtree_t));
kvs_rbtree_create(&global_rbtree);
kvs_rbtree_load(&global_rbtree, global_rbtree_file);
#endif
#if ENABLE_HASH
memset(&global_hash, 0, sizeof(kvs_hash_t));
kvs_hash_create(&global_hash);
kvs_hash_load(&global_hash, global_hash_file);
#endif
if(global_cfg.persistence == PERSIST_INCREMENTAL){
init_cmd_log(global_oplog_file, &global_cmd_log_fd);
kvs_replay_log(global_oplog_file, global_cmd_log_fd);
}
return 0;
}
void dest_kvengine(void) {
#if ENABLE_ARRAY
kvs_array_destroy(&global_array);
#endif
#if ENABLE_RBTREE
kvs_rbtree_destroy(&global_rbtree);
#endif
#if ENABLE_HASH
kvs_hash_destroy(&global_hash);
#endif
destroy_cmd_log(global_cmd_log_fd);
}
void init_memory_pool(void){
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
mp_init(&global_mempool);
#endif
}
void dest_memory_pool(void){
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
mp_destroy(&global_mempool);
#endif
}
static int ensure_dir_exists(const char *dir)
{
struct stat st;
if (stat(dir, &st) == 0) {
return S_ISDIR(st.st_mode) ? 0 : -2; // 存在但不是目录
}
if (mkdir(dir, 0755) == 0) return 0;
if (errno == EEXIST) return 0;
return -1;
}
static int join_path(char *out, size_t out_sz, const char *dir, const char *file)
{
if (!out || out_sz == 0 || !dir || !file) return -1;
size_t dlen = strlen(dir);
if (dlen == 0) return -1;
int need_slash = (dir[dlen - 1] != '/');
int n = snprintf(out, out_sz, need_slash ? "%s/%s" : "%s%s", dir, file);
if (n < 0 || (size_t)n >= out_sz) return -2; // 截断了
return 0;
}
void init_data_file(AppConfig *cfg){
ensure_dir_exists(cfg->persist_dir);
join_path(global_oplog_file, sizeof(global_oplog_file), cfg->persist_dir, cfg->oplog_file);
join_path(global_array_file, sizeof(global_array_file), cfg->persist_dir, cfg->array_file);
join_path(global_rbtree_file, sizeof(global_rbtree_file), cfg->persist_dir, cfg->rbtree_file);
join_path(global_hash_file, sizeof(global_hash_file), cfg->persist_dir, cfg->hash_file);
}
int init_config(AppConfig *cfg){
xmlInitParser();
if (config_load("config/config.xml", cfg) != 0) {
fprintf(stderr, "Failed to load config/config.xml\n");
xmlCleanupParser();
return -1;
}
printf("=============== Config ===============\n");
printf("IP : %s\n", cfg->ip);
printf("Port : %d\n", cfg->port);
printf("Mode : %s\n", server_mode_to_string(cfg->mode));
printf("|—— Master IP : %s\n", cfg->master_ip);
printf("|—— Master Port : %d\n", cfg->master_port);
printf("Persistence : %s\n", persistence_to_string(cfg->persistence));
printf("|—— Persist-dir : %s\n", cfg->persist_dir);
printf("|—— Persist-oplog : %s\n", cfg->oplog_file);
printf("|—— Persist-array : %s\n", cfg->array_file);
printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file);
printf("|—— Persist-hash : %s\n", cfg->hash_file);
printf("Log level : %s\n", log_level_to_string(cfg->log_level));
printf("Allocator : %s\n", allocator_to_string(cfg->allocator));
printf("=============== Config ===============\n");
xmlCleanupParser();
return 0;
}
int main(int argc, char *argv[]) {
if(-1 == init_config(&global_cfg)){
printf("Init Config error");
return -1;
}
init_data_file(&global_cfg);
int port = global_cfg.port;
char *master_ip = NULL;
int master_port = -1;
if(global_cfg.mode == MODE_SLAVE){
master_ip = global_cfg.master_ip;
master_port = global_cfg.master_port;
}else if(global_cfg.mode == MODE_MASTER){
}
init_memory_pool();
init_kvengine();
#if (NETWORK_SELECT == NETWORK_REACTOR)
reactor_start(port, kvs_protocol, master_ip, master_port); //
#elif (NETWORK_SELECT == NETWORK_PROACTOR)
proactor_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
ntyco_start(port, kvs_protocol);
#endif
dest_kvengine();
dest_memory_pool();
}