Files
ldb/kvstore.c
2026-03-06 11:54:30 +00:00

495 lines
14 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "kvstore.h"
#include "kvs_rw_tools.h"
#include "kvs_protocol_resp.h"
#include "dump/kvs_dump.h"
#include "memory/alloc_dispatch.h"
#include "common/config.h"
#include "diskuring/diskuring.h"
#include "replica_shm.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <libxml/parser.h>
#include <limits.h>
#define TIME_COLLECT 0
extern int slave_bootstrap(const char *listen_ip, int listen_port, const char *master_ip, int master_port);
extern mp_pool_t global_mempool;
AppConfig global_cfg;
iouring_ctx_t global_uring_ctx;
unsigned long long global_seq;
extern int global_oplog_fd;
replica_shm_t g_rep_shm;
__attribute__((noinline))
void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){
asm volatile("" ::: "memory");
}
#include <sys/time.h>
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec))
static int checked_size_add(size_t a, size_t b, size_t *out) {
if (!out || a > SIZE_MAX - b) {
return -1;
}
*out = a + b;
return 0;
}
static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) {
size_t len = 0;
if (!v || !out_len) {
return -1;
}
switch (v->type) {
case RESP_T_SIMPLE_STR:
case RESP_T_ERROR:
if (checked_size_add(1, (size_t)v->bulk.len, &len) < 0 ||
checked_size_add(len, 2, &len) < 0) {
return -1;
}
break;
case RESP_T_INTEGER: {
char tmp[64];
int n = snprintf(tmp, sizeof(tmp), "%lld", (long long)v->i64);
if (n <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)n, &len) < 0 ||
checked_size_add(len, 2, &len) < 0) {
return -1;
}
break;
}
case RESP_T_NIL:
len = 5; /* "$-1\r\n" */
break;
case RESP_T_BULK_STR: {
char tmp[32];
int n;
size_t t;
if (v->bulk.len > 0 && !v->bulk.ptr) {
return -1;
}
n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->bulk.len);
if (n <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)n, &t) < 0 || /* '$' + len digits */
checked_size_add(t, 2, &t) < 0 || /* \r\n */
checked_size_add(t, (size_t)v->bulk.len, &t) < 0 ||
checked_size_add(t, 2, &len) < 0) { /* trailing \r\n */
return -1;
}
break;
}
default:
return -1;
}
*out_len = len;
return 0;
}
static int flush_pending_response(struct conn *conn, uint8_t *buf, size_t *out_len) {
if (!conn || !buf || !out_len) {
return -1;
}
if (*out_len == 0) {
return 0;
}
if (chain_buffer_append(&conn->wbuf, buf, *out_len) < 0) {
return -1;
}
*out_len = 0;
return 0;
}
static int is_update_cmd(const resp_cmd_t *cmd) {
const resp_slice_t *c0;
if (!cmd || cmd->argc == 0 || !cmd->argv[0].ptr || cmd->argv[0].len == 0) {
return 0;
}
c0 = &cmd->argv[0];
return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0;
}
int kvs_protocol(struct conn* conn){
#if TIME_COLLECT == 1
struct timeval func_start;
gettimeofday(&func_start, NULL);
long total_oplog_us = 0;
#endif
if (!conn) return -1;
size_t request_size = 0;
const uint8_t *request = chain_buffer_linearize(&conn->rbuf, &request_size);
if (!request || request_size == 0) return 0;
if (request_size > (size_t)INT_MAX) return -1;
int request_length = (int)request_size;
uint8_t response[KVS_MAX_RESPONSE];
int consumed = 0;
size_t out_len = 0;
while(consumed < request_length ){
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
resp_cmd_t cmd;
memset(&cmd, 0, sizeof(cmd));
int len = resp_parse_one_cmd(p, remain, &cmd);
if(len < 0){
/* 协议错误:直接返回 */
return -1;
}
else if(len == 0){
// 半包
break;
}
resp_value_t val;
memset(&val, 0, sizeof(val));
int dr = resp_dispatch(&cmd, &val);
/*
* 语义建议:
* - resp_dispatch() 即使返回 -1比如 unknown command / wrong argc
* 一般也已经把 out_value 设置成了 RESP error这样客户端能收到错误响应。
* - 如果 dr < 0 但 val.type 没被正确设置,兜底回一个通用错误。
*/
#if TIME_COLLECT == 1
struct timeval oplog_start, oplog_end;
gettimeofday(&oplog_start, NULL);
#endif
int need_persist = is_update_cmd(&cmd);
if(global_cfg.persistence == PERSIST_INCREMENTAL && need_persist){
int ar = kvs_oplog_buffer_append(p, (size_t)len, global_oplog_fd);
if (ar < 0) {
return -1;
}
if (ar == KVS_OPLOG_BUF_FULL && kvs_oplog_flush(global_oplog_fd, 0) < 0) {
return -1;
}
}
// __completed_cmd(p, len, global_seq);
// global_seq ++;
if (global_cfg.replica_mode == REPLICA_ENABLE && need_persist) {
uint32_t off = 0;
int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off);
if (ar == 0) {
// __replica_notify(global_seq, off, (uint32_t)len);
global_seq++;
} else {
// shm 满或异常:你可以选择降级(比如直接跳过复制,或阻塞/丢弃)
// 为了不影响主路径,这里先打印并跳过
fprintf(stderr, "replica_shm_append failed %d\n", ar);
}
}
#if TIME_COLLECT == 1
gettimeofday(&oplog_end, NULL);
total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 +
(oplog_end.tv_usec - oplog_start.tv_usec);
#endif
/* 构建响应 */
int resp_len = resp_build_value(&val, response + out_len, sizeof(response) - out_len);
if (resp_len < 0) {
/* 当前批次剩余空间不够,先把已拼好的刷到发送队列再重试 */
if (flush_pending_response(conn, response, &out_len) < 0) {
return -1;
}
resp_len = resp_build_value(&val, response, sizeof(response));
if (resp_len < 0) {
size_t resp_need = 0;
uint8_t *resp_heap = NULL;
if (resp_value_encoded_len(&val, &resp_need) < 0) {
return -1;
}
resp_heap = (uint8_t *)kvs_malloc(resp_need);
if (!resp_heap) {
return -1;
}
resp_len = resp_build_value(&val, resp_heap, resp_need);
if (resp_len < 0 ||
chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) {
free(resp_heap);
return -1;
}
free(resp_heap);
resp_len = 0;
}
}
out_len += (size_t)resp_len;
// __completed_cmd(request, consumed, 0);
consumed += len;
}
if (global_cfg.persistence == PERSIST_INCREMENTAL) {
if (kvs_oplog_flush(global_oplog_fd, 1) < 0) {
return -1;
}
}
#if TIME_COLLECT == 1
struct timeval func_end;
gettimeofday(&func_end, NULL);
long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 +
(func_end.tv_usec - func_start.tv_usec);
fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us);
#endif
if (flush_pending_response(conn, response, &out_len) < 0) {
return -1;
}
return consumed;
}
int init_kvengine(void) {
#if ENABLE_ARRAY
memset(&global_array, 0, sizeof(kvs_array_t));
kvs_array_create(&global_array);
kvs_array_load(&global_array, global_array_file);
#endif
#if ENABLE_RBTREE
memset(&global_rbtree, 0, sizeof(kvs_rbtree_t));
kvs_rbtree_create(&global_rbtree);
kvs_rbtree_load(&global_rbtree, global_rbtree_file);
#endif
#if ENABLE_HASH
memset(&global_hash, 0, sizeof(kvs_hash_t));
kvs_hash_create(&global_hash);
kvs_hash_load(&global_hash, global_hash_file);
#endif
if(global_cfg.persistence == PERSIST_INCREMENTAL){
init_cmd_log(global_oplog_file, &global_oplog_fd);
kvs_replay_log(global_oplog_fd);
}
printf("kvengine init complete\n");
return 0;
}
void dest_kvengine(void) {
#if ENABLE_ARRAY
kvs_array_destroy(&global_array);
#endif
#if ENABLE_RBTREE
kvs_rbtree_destroy(&global_rbtree);
#endif
#if ENABLE_HASH
kvs_hash_destroy(&global_hash);
#endif
destroy_cmd_log(global_oplog_fd);
}
void init_memory_pool(AppConfig *cfg){
if(cfg->allocator == ALLOC_MYPOOL)
mp_create(&global_mempool);
kvs_set_memleak_detect(cfg->leak_mode);
kvs_set_alloc_type(cfg->allocator);
printf("mempool init complete\n");
}
void dest_memory_pool(void){
mp_destroy(&global_mempool);
}
static int ensure_dir_exists(const char *dir)
{
struct stat st;
if (stat(dir, &st) == 0) {
return S_ISDIR(st.st_mode) ? 0 : -2; // 存在但不是目录
}
if (mkdir(dir, 0755) == 0) return 0;
if (errno == EEXIST) return 0;
return -1;
}
static int join_path(char *out, size_t out_sz, const char *dir, const char *file)
{
if (!out || out_sz == 0 || !dir || !file) return -1;
size_t dlen = strlen(dir);
if (dlen == 0) return -1;
int need_slash = (dir[dlen - 1] != '/');
int n = snprintf(out, out_sz, need_slash ? "%s/%s" : "%s%s", dir, file);
if (n < 0 || (size_t)n >= out_sz) return -2; // 截断了
return 0;
}
void init_data_file(AppConfig *cfg){
ensure_dir_exists(cfg->persist_dir);
join_path(global_oplog_file, sizeof(global_oplog_file), cfg->persist_dir, cfg->oplog_file);
join_path(global_array_file, sizeof(global_array_file), cfg->persist_dir, cfg->array_file);
join_path(global_rbtree_file, sizeof(global_rbtree_file), cfg->persist_dir, cfg->rbtree_file);
join_path(global_hash_file, sizeof(global_hash_file), cfg->persist_dir, cfg->hash_file);
}
int init_config(AppConfig *cfg){
xmlInitParser();
if (config_load("config/config.xml", cfg) != 0) {
fprintf(stderr, "Failed to load config/config.xml\n");
xmlCleanupParser();
return -1;
}
printf("=============== Config ===============\n");
printf("IP : %s\n", cfg->ip);
printf("Port : %d\n", cfg->port);
printf("Replica-Mode : %s\n", replica_to_string(cfg->replica_mode));
printf("Mode : %s\n", server_mode_to_string(cfg->mode));
printf("|—— Master IP : %s\n", cfg->master_ip);
printf("|—— Master Port : %d\n", cfg->master_port);
printf("Persistence : %s\n", persistence_to_string(cfg->persistence));
printf("|—— Persist-dir : %s\n", cfg->persist_dir);
printf("|—— Persist-oplog : %s\n", cfg->oplog_file);
printf("|—— Oplog-sync : %s\n", oplog_sync_mode_to_string(cfg->oplog_sync_mode));
printf("|—— Persist-array : %s\n", cfg->array_file);
printf("|—— Persist-rbtree : %s\n", cfg->rbtree_file);
printf("|—— Persist-hash : %s\n", cfg->hash_file);
printf("Log level : %s\n", log_level_to_string(cfg->log_level));
printf("Memory : \n");
printf("|——Allocator : %s\n", allocator_to_string(cfg->allocator));
printf("|——MemLeakDetectMode : %s\n", leakage_to_string(cfg->leak_mode));
printf("=============== Config ===============\n");
xmlCleanupParser();
return 0;
}
void init_disk_uring(iouring_ctx_t *uring_ctx){
// iouring_init(uring_ctx, 4096);
iouring_init(uring_ctx, (1024*8));
}
void dest_disk_uring(iouring_ctx_t *uring_ctx){
iouring_shutdown(uring_ctx);
}
int kvs_replica_init(void)
{
if (global_cfg.replica_mode == REPLICA_ENABLE) {
int rc = replica_shm_open(&g_rep_shm, REPLICA_SHM_NAME, REPLICA_SHM_SIZE, /*create=*/ 1);
if (rc != 0) {
fprintf(stderr, "replica_shm_open failed rc=%d\n", rc);
return rc;
}
}
return 0;
}
int main(int argc, char *argv[]) {
if(-1 == init_config(&global_cfg)){
printf("Init Config error");
return -1;
}
global_seq = 0;
kvs_replica_init();
init_memory_pool(&global_cfg);
init_data_file(&global_cfg);
init_disk_uring(&global_uring_ctx);
int port = global_cfg.port;
char *master_ip = NULL;
int master_port = -1;
if(global_cfg.mode == MODE_SLAVE){
master_ip = global_cfg.master_ip;
master_port = global_cfg.master_port;
slave_bootstrap(global_cfg.ip, port, master_ip, master_port);
}else if(global_cfg.mode == MODE_MASTER){
}
init_kvengine();
#if (NETWORK_SELECT == NETWORK_REACTOR)
reactor_start(port, kvs_protocol); //
#elif (NETWORK_SELECT == NETWORK_PROACTOR)
proactor_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
ntyco_start(port, kvs_protocol);
#endif
dest_kvengine();
dest_memory_pool();
}