Files
ldb/kvstore.c
2026-01-19 10:37:32 +00:00

385 lines
9.2 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "kvstore.h"
#include "kvs_rw_tools.h"
#include "mem_pool/mem_pool.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <libxml/parser.h>
#include "common/config.h"
#if ENABLE_ARRAY
extern kvs_array_t global_array;
#endif
#if ENABLE_RBTREE
extern kvs_rbtree_t global_rbtree;
#endif
#if ENABLE_HASH
extern kvs_hash_t global_hash;
#endif
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
extern mp_pool_t global_mempool;
#endif
AppConfig cfg;
ServerMode mode = MODE_MASTER;
int global_cmd_log_fd = -1;
int is_update_cmd(kvs_cmd_t op){
if(op == KVS_CMD_SET || op == KVS_CMD_RSET || op == KVS_CMD_HSET
|| op == KVS_CMD_MOD || op == KVS_CMD_RMOD || op == KVS_CMD_HMOD
|| op == KVS_CMD_DEL || op == KVS_CMD_RDEL || op == KVS_CMD_HDEL){
return 1;
}
return 0;
}
/**
* input : request request_length
* output : response response_length
* return : -1 error, =0 半包, 1 成功
*/
// int kvs_protocol(char *request, int request_length, char *response, int *response_length){
int kvs_protocol(struct conn* conn){
if (!conn) return -1;
char *request = conn->rbuffer;
int request_length = conn->rlength;
char *response = conn->wbuffer;
int *response_length = &conn->wlength;
if (!request || request_length <= 0 || !response || !response_length) return -1;
int consumed = 0;
int out_len = 0;
static int i = 0;
while(consumed < request_length ){
if(i > 33){
i = i+1;
i = i-1;
}
if(i == 47) i = 0;
++i;
kvs_req_t req;
memset(&req, 0, sizeof(kvs_req_t));
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
int len = kvs_parse_one_cmd(p, remain, &req);
if(len < 0){
// 解析失败
kvs_free_request(&req);
*response_length = out_len;
return -1;
}
else if(len == 0){
// 半包
kvs_free_request(&req);
break;
}
kvs_rsp_t rsp;
memset(&rsp, 0, sizeof(kvs_rsp_t));
// 执行失败
if (kvs_execute_one_cmd(&req, &rsp) < 0){
kvs_free_request(&req);
*response_length = out_len;
return -1;
}else{
// 执行成功,在这里保存到日志中。
if(rsp.status == KVS_STATUS_OK){
if(is_update_cmd(req.op)){
kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd);
}
}
}
if(req.op == KVS_CMD_PSYNC){
build_thread_to_sync(req.args->data, conn);
}
int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len);
// 构建响应 <0 构建失败
kvs_free_request(&req);
if (resp_len < 0) {
*response_length = out_len;
return -1;
}
out_len += resp_len;
consumed += len;
}
// slave 暂时不需要回报或者回一个new_offset
if(conn->is_from_master){
conn->wlength = 0;
return consumed;
}
*response_length = out_len;
return consumed;
}
extern void sync_wakeup(int fd);
static int g_slavefd = -1;
static uint64_t g_offset = 0;
static void *sync_thread_main(void *arg) {
struct conn *conn = (struct conn*) arg;
int logfd = open(KVS_CMD_LOG_FILE, O_RDONLY);
if (logfd < 0) {
printf("open replaylog failed: %s\n", strerror(errno));
return NULL;
}
pthread_mutex_lock(&conn->g_sync_lock);
uint64_t off = g_offset;
pthread_mutex_unlock(&conn->g_sync_lock);
while (1) {
// 单槽位:等 reactor 发完再填
pthread_mutex_lock(&conn->g_sync_lock);
int busy = (conn->wlength > 0);
pthread_mutex_unlock(&conn->g_sync_lock);
if (busy) { usleep(10 * 1000); continue; }
size_t filled = 0;
int records = 0;
// 试图攒一批
while (filled < (size_t)KVS_MAX_RESPONSE && records < 128) {
// 读 len 头
uint32_t nlen = 0;
ssize_t r = pread(logfd, &nlen, sizeof(nlen), (off_t)off);
if (r == 0) {
// EOF文件当前没更多数据
break;
}
if (r < 0) {
if (errno == EINTR) continue;
printf("pread len error: %s\n", strerror(errno));
close(logfd);
return NULL;
}
if (r < (ssize_t)sizeof(nlen)) {
// 半截 lenwriter 还没写完头
break;
}
uint32_t len = ntohl(nlen);
if(len <= 0) {
printf("sync error\n");
}
// 这一条放不进本批次,就先发已有的
if (filled + len > (size_t)KVS_MAX_RESPONSE) {
break;
}
// 读 payload(cmd)
ssize_t pr = pread(logfd, conn->wbuffer + filled, len,
(off_t)(off + sizeof(nlen)));
if (pr == 0) {
// payload 还没写到
break;
}
if (pr < 0) {
if (errno == EINTR) continue;
printf("pread payload error: %s\n", strerror(errno));
close(logfd);
return NULL;
}
if (pr < (ssize_t)len) {
// 半截 payloadwriter 还没写完这一条
break;
}
// 成功拿到一条完整记录:推进
off += sizeof(nlen) + (uint64_t)len;
filled += (size_t)len;
records++;
}
if (filled > 0) {
// 提交给 reactor 发送
pthread_mutex_lock(&conn->g_sync_lock);
conn->wlength = (int)filled;
g_offset = off;
pthread_mutex_unlock(&conn->g_sync_lock);
// 唤醒 reactor 发
sync_wakeup(conn->fd); // 或 g_slavefd
continue;
}
// 没攒到任何完整记录:说明真到末尾/半条记录,等一会儿
usleep(10*1000);
}
close(logfd);
return NULL;
}
void build_thread_to_sync(const uint8_t *offset, struct conn* conn){
uint64_t off64 = 0;
memcpy(&off64, offset, 8);
pthread_mutex_lock(&conn->g_sync_lock);
g_slavefd = conn->fd;
g_offset = (uint64_t)off64;
printf("offset:%ld\n", off64);
conn->wlength = 0;
pthread_mutex_unlock(&conn->g_sync_lock);
pthread_t tid;
int rc = pthread_create(&tid, NULL, sync_thread_main, conn);
if (rc != 0) {
printf("pthread_create failed: %s\n", strerror(rc));
return;
}
pthread_detach(tid);
}
int init_kvengine(void) {
#if ENABLE_ARRAY
memset(&global_array, 0, sizeof(kvs_array_t));
kvs_array_create(&global_array);
kvs_array_load(&global_array, KVS_ARRAY_FILE);
#endif
#if ENABLE_RBTREE
memset(&global_rbtree, 0, sizeof(kvs_rbtree_t));
kvs_rbtree_create(&global_rbtree);
kvs_rbtree_load(&global_rbtree, KVS_RBTREE_FILE);
#endif
#if ENABLE_HASH
memset(&global_hash, 0, sizeof(kvs_hash_t));
kvs_hash_create(&global_hash);
kvs_hash_load(&global_hash, KVS_HASH_FILE);
#endif
init_cmd_log(KVS_CMD_LOG_FILE, &global_cmd_log_fd);
kvs_replay_log(KVS_CMD_LOG_FILE, global_cmd_log_fd);
return 0;
}
void dest_kvengine(void) {
#if ENABLE_ARRAY
kvs_array_destroy(&global_array);
#endif
#if ENABLE_RBTREE
kvs_rbtree_destroy(&global_rbtree);
#endif
#if ENABLE_HASH
kvs_hash_destroy(&global_hash);
#endif
destroy_cmd_log(global_cmd_log_fd);
}
void init_memory_pool(void){
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
mp_init(&global_mempool);
#endif
}
void dest_memory_pool(void){
#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
mp_destroy(&global_mempool);
#endif
}
int init_config(AppConfig *cfg){
xmlInitParser();
if (config_load("config/config.xml", cfg) != 0) {
fprintf(stderr, "Failed to load config/config.xml\n");
xmlCleanupParser();
return -1;
}
printf("=== Config ===\n");
printf("IP : %s\n", cfg->ip);
printf("Port : %d\n", cfg->port);
printf("Log level : %s\n", log_level_to_string(cfg->log_level));
printf("Master IP : %s\n", cfg->master_ip);
printf("Master Port : %d\n", cfg->master_port);
printf("Mode : %s\n", server_mode_to_string(cfg->mode));
printf("Persistence : %s\n", persistence_to_string(cfg->persistence));
printf("Allocator : %s\n", allocator_to_string(cfg->allocator));
printf("=== Config ===\n");
xmlCleanupParser();
return 0;
}
int main(int argc, char *argv[]) {
if(-1 == init_config(&cfg)){
printf("Init Config error");
return -1;
}
int port = cfg.port;
mode = cfg.mode;
char *master_ip = NULL;
int master_port = -1;
if(mode == MODE_SLAVE){
master_ip = cfg.master_ip;
master_port = cfg.master_port;
}else if(mode == MODE_MASTER){
}
init_memory_pool();
init_kvengine();
#if (NETWORK_SELECT == NETWORK_REACTOR)
reactor_start(port, kvs_protocol, master_ip, master_port); //
#elif (NETWORK_SELECT == NETWORK_PROACTOR)
proactor_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
ntyco_start(port, kvs_protocol);
#endif
dest_kvengine();
dest_memory_pool();
}