实现全量持久化:save操作落盘,启动时读取到内存

增量持久化:执行修改操作时将cmd追加到log中,启动时逐条取出顺序执行
This commit is contained in:
2026-01-07 18:43:28 +08:00
parent cb0134a852
commit 3cc97b9454
9 changed files with 373 additions and 74 deletions

View File

@@ -1,6 +1,7 @@
#include "kvstore.h"
#include "kvs_rw_tools.h"
#include <arpa/inet.h>
#include <unistd.h>
#if ENABLE_ARRAY
extern kvs_array_t global_array;
@@ -14,11 +15,13 @@ extern kvs_rbtree_t global_rbtree;
extern kvs_hash_t global_hash;
#endif
// 0 suc, -1 err
int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) {
return (p + n <= end) ? 0 : -1;
}
// 注意u8类型不需要ntoh或者hton
// 0 suc, -1 err
int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 1) < 0) return -1;
@@ -73,6 +76,31 @@ int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) {
return 0;
}
// -1 err, 0 suc
int kvs_write_file(FILE *fp, const void *buf, size_t n) {
const uint8_t *p = (const uint8_t *)buf;
while (n > 0) {
size_t w = fwrite(p, 1, n, fp);
if (w == 0) return -1;
p += w;
n -= w;
}
return 0;
}
// -1 err, 0 suc
int kvs_read_file(FILE *fp, void *buf, size_t n){
uint8_t *p = (uint8_t *)buf;
while (n > 0) {
size_t r = fread(p, 1, n, fp);
if (r == 0) return -1; // EOF or error
p += r;
n -= r;
}
return 0;
}
// return: -1 fail, 0 half, >0 consumed
int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){
if (!request || request_length <= 0 || !req_out) return -1;
@@ -186,9 +214,9 @@ void kvs_free_request(kvs_req_t *req) {
int op = req->op;
kvs_arg_t *argv = req->args;
size_t key_len = 0;
uint32_t key_len = 0;
const void *key = NULL;
size_t value_len = 0;
uint32_t value_len = 0;
const void *val = NULL;
if(argc == 1){
@@ -222,6 +250,9 @@ void kvs_free_request(kvs_req_t *req) {
case KVS_CMD_HEXIST:
if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break;
case KVS_CMD_SAVE:
if(argc != 0) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break;
default:
rsp_out->status = KVS_STATUS_BADREQ;
return -1;
@@ -338,6 +369,11 @@ void kvs_free_request(kvs_req_t *req) {
rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return 0;
#endif
case KVS_CMD_SAVE:
ret = kvs_save_to_file();
if(ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_ERROR;
return 0;
default:
rsp_out->status = KVS_STATUS_BADREQ;
return -1;
@@ -371,3 +407,130 @@ int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t respon
return (int)(p - response);
}
int kvs_save_to_file(){
#if ENABLE_ARRAY
int ret = kvs_array_save(&global_array, KVS_ARRAY_FILE);
#endif
#if ENABLE_RBTREE
#endif
#if ENABLE_HASH
#endif
return ret;
}
#include <errno.h>
int write_full(int fd, const void *buf, size_t len)
{
const uint8_t *p = buf;
while (len > 0) {
ssize_t n = write(fd, p, len);
if (n < 0) {
if (errno == EINTR)
continue;
return -1;
}
p += n;
len -= n;
}
return 0;
}
// 1 read n suc, 0 eof, -1 error
int read_full(int fd, void *buf, size_t n)
{
uint8_t *p = (uint8_t *)buf;
size_t got = 0;
while (got < n) {
ssize_t r = read(fd, p + got, n - got);
if (r > 0) {
got += (size_t)r;
continue;
}
if (r == 0) {
return (got == 0) ? 0 : -1;
}
/* r < 0 */
if (errno == EINTR) {
continue;
}
return -1;
}
return 1;
}
int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd){
if (logfd < 0 || !cmd || len == 0)
return -1;
if (len > UINT32_MAX)
return -2;
uint32_t nlen = htonl((uint32_t)len);
if (write_full(logfd, &nlen, sizeof(nlen)) < 0)
return -3;
if (write_full(logfd, cmd, len) < 0)
return -4;
if (fsync(logfd) < 0)
return -5;
return 0;
}
int kvs_replay_log(const char *logfile, int logfd){
if (!logfile|| logfd<0) return -1;
for (;;) {
uint32_t nlen = 0;
int hr = read_full(logfd, &nlen, sizeof(nlen));
if (hr == 0) break; /* EOF正常结束 */
if (hr < 0) { return -2; } /* 半截头 */
uint32_t len = ntohl(nlen);
if (len == 0) { return -3; }
uint8_t *cmd = (uint8_t *)kvs_malloc(len);
if (!cmd) { return -5; }
int pr = read_full(logfd, cmd, len);
if (pr <= 0) { /* 半截 payload */
kvs_free(cmd);
return -6;
}
kvs_req_t req;
memset(&req, 0, sizeof(req));
int clen = kvs_parse_one_cmd(cmd, (int)len, &req);
if (clen <= 0 || clen != (int)len) {
kvs_free_request(&req);
kvs_free(cmd);
return -7;
}
kvs_rsp_t rsp;
memset(&rsp, 0, sizeof(rsp));
if (kvs_execute_one_cmd(&req, &rsp) < 0) {
kvs_free_request(&req);
kvs_free(cmd);
return -8;
}
kvs_free_request(&req);
kvs_free(cmd);
}
return 0;
}