rbtree和hash的全量持久化操作。rbtree的二进制安全。

粗略测试。
This commit is contained in:
2026-01-08 21:42:20 +08:00
parent de21fe94ec
commit 4b4e06b33d
16 changed files with 1997 additions and 1450 deletions

View File

@@ -17,6 +17,49 @@ extern kvs_hash_t global_hash;
extern int global_cmd_log_fd;
#include <errno.h>
int write_full(int fd, const void *buf, size_t len)
{
const uint8_t *p = buf;
while (len > 0) {
ssize_t n = write(fd, p, len);
if (n < 0) {
if (errno == EINTR)
continue;
return -1;
}
p += n;
len -= n;
}
return 0;
}
// 1 read n suc, 0 eof, -1 error
int read_full(int fd, void *buf, size_t n)
{
uint8_t *p = (uint8_t *)buf;
size_t got = 0;
while (got < n) {
ssize_t r = read(fd, p + got, n - got);
if (r > 0) {
got += (size_t)r;
continue;
}
if (r == 0) {
return (got == 0) ? 0 : -1;
}
/* r < 0 */
if (errno == EINTR) {
continue;
}
return -1;
}
return 1;
}
// 0 suc, -1 err
int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) {
return (p + n <= end) ? 0 : -1;
@@ -101,455 +144,3 @@ int kvs_read_file(FILE *fp, void *buf, size_t n){
}
return 0;
}
// return: -1 fail, 0 half, >0 consumed
int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){
if (!request || request_length <= 0 || !req_out) return -1;
req_out->op = 0;
req_out->argc = 0;
req_out->args = NULL;
const uint8_t *p = request;
const uint8_t *end = request + (size_t)request_length;
// OP + ARGC
if (kvs_need(p, end, 2)) {
return 0; // NEED_MORE
}
uint8_t op = 0, argc = 0;
if (kvs_read_u8(&p, end, &op) < 0) return -1;
if (kvs_read_u8(&p, end, &argc) < 0) return -1;
if (argc > KVS_MAX_ARGC) return -1;
// 先扫描一遍确认整条命令数据都在 buffer 里
const uint8_t *scan = p;
uint32_t lens[KVS_MAX_ARGC];
if (argc > 0) {
for (uint8_t i = 0; i < argc; i++) {
if (kvs_need(scan, end, 4)) {
return 0; // NEED_MORE
}
uint32_t alen = 0;
if (kvs_read_u32(&scan, end, &alen) < 0) return -1;
// 防御:单个参数长度限制
if (alen > KVS_MAX_ARGLEN) return -1;
// 防御scan + alen 越界 / 半包
if (kvs_need(scan, end, (size_t)alen)) {
return 0; // NEED_MORE
}
lens[i] = alen;
scan += alen;
}
}
size_t total_len = (size_t)(scan - request);
if (total_len > KVS_MAX_CMD_BYTES) return -1;
req_out->op = op;
req_out->argc = argc;
if (argc == 0) {
return (int)total_len;
}
kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t));
if (!args) {
kvs_free_request(req_out);
return -1;
}
memset(args, 0, (size_t)argc * sizeof(kvs_arg_t));
for (uint8_t i = 0; i < argc; i++) {
uint32_t alen = 0;
if (kvs_read_u32(&p, end, &alen) < 0) {
kvs_free(args);
kvs_free_request(req_out);
return -1;
}
// alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常
if (alen != lens[i]) {
kvs_free(args);
kvs_free_request(req_out);
return -1;
}
args[i].len = alen;
args[i].data = p; // 直接指向输入 buffer零拷贝
p += alen;
}
req_out->args = args;
return (int)(p - request);
}
void kvs_free_request(kvs_req_t *req) {
if (!req) return;
if (req->args) {
kvs_free(req->args);
req->args = NULL;
}
req->op = 0;
req->argc = 0;
}
/**
* 输入req
* 输出rsp
* 返回:-1 失败参数错误0 成功
*/
int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) {
if(!req || !rsp_out) return -1;
rsp_out->op = req->op;
rsp_out->status = KVS_STATUS_ERROR;
rsp_out->data = NULL;
rsp_out->dlen = 0;
int argc = req->argc;
int op = req->op;
kvs_arg_t *argv = req->args;
uint32_t key_len = 0;
const void *key = NULL;
uint32_t value_len = 0;
const void *val = NULL;
if(argc == 1){
key_len = argv[0].len;
key = argv[0].data;
}else if(argc == 2){
key_len = argv[0].len;
key = argv[0].data;
value_len = argv[1].len;
val = argv[1].data;
}
// 基本参数校验(按你原有命令语义)
switch (op) {
case KVS_CMD_SET:
case KVS_CMD_MOD:
case KVS_CMD_RSET:
case KVS_CMD_RMOD:
case KVS_CMD_HSET:
case KVS_CMD_HMOD:
if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break;
case KVS_CMD_GET:
case KVS_CMD_DEL:
case KVS_CMD_EXIST:
case KVS_CMD_RGET:
case KVS_CMD_RDEL:
case KVS_CMD_REXIST:
case KVS_CMD_HGET:
case KVS_CMD_HDEL:
case KVS_CMD_HEXIST:
if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break;
case KVS_CMD_SAVE:
if(argc != 0) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break;
default:
rsp_out->status = KVS_STATUS_BADREQ;
return -1;
}
int ret = 0;
const char *result = NULL;
switch (op) {
#if ENABLE_ARRAY
case KVS_CMD_SET:
ret = kvs_array_set_bin(&global_array, key, key_len, val, value_len);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_EXIST;
return 0;
case KVS_CMD_GET:
result = kvs_array_get_bin(&global_array, key, key_len, &value_len);
if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; }
rsp_out->status = KVS_STATUS_OK;
rsp_out->data = result;
rsp_out->dlen = (uint32_t)value_len;
return 0;
case KVS_CMD_DEL:
ret = kvs_array_del_bin(&global_array, key, key_len);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_NO_EXIST;
return 0;
case KVS_CMD_MOD:
ret = kvs_array_mod_bin(&global_array, key, key_len, val, value_len);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_NO_EXIST;
return 0;
case KVS_CMD_EXIST:
ret = kvs_array_exist_bin(&global_array, key, key_len);
rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return 0;
#endif
#if ENABLE_RBTREE
case KVS_CMD_RSET:
ret = kvs_rbtree_set(&global_rbtree, (char*)key, (char*)val);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_EXIST;
return 0;
case KVS_CMD_RGET:
result = kvs_rbtree_get(&global_rbtree, (char*)key);
if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; }
rsp_out->status = KVS_STATUS_OK;
rsp_out->data = result;
rsp_out->dlen = (uint32_t)strlen(result);
return 0;
case KVS_CMD_RDEL:
ret = kvs_rbtree_del(&global_rbtree, (char*)key);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_NO_EXIST;
return 0;
case KVS_CMD_RMOD:
ret = kvs_rbtree_mod(&global_rbtree, (char*)key, (char*)val);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_NO_EXIST;
return 0;
case KVS_CMD_REXIST:
ret = kvs_rbtree_exist(&global_rbtree, (char*)key);
rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return 0;
#endif
#if ENABLE_HASH
case KVS_CMD_HSET:
ret = kvs_hash_set_bin(&global_hash, key, key_len, val, value_len);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_EXIST;
return 0;
case KVS_CMD_HGET:
result = kvs_hash_get_bin(&global_hash, key, key_len, &value_len);
if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; }
rsp_out->status = KVS_STATUS_OK;
rsp_out->data = result;
rsp_out->dlen = (uint32_t)value_len;
return 0;
case KVS_CMD_HDEL:
ret = kvs_hash_del_bin(&global_hash, key, key_len);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_NO_EXIST;
return 0;
case KVS_CMD_HMOD:
ret = kvs_hash_mod_bin(&global_hash, key, key_len, val, value_len);
if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_NO_EXIST;
return 0;
case KVS_CMD_HEXIST:
ret = kvs_hash_exist_bin(&global_hash, key, key_len);
rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return 0;
#endif
case KVS_CMD_SAVE:
ret = kvs_save_to_file();
if(ret == 0) rsp_out->status = KVS_STATUS_OK;
else rsp_out->status = KVS_STATUS_ERROR;
return 0;
default:
rsp_out->status = KVS_STATUS_BADREQ;
return -1;
}
return -1;
}
/**
* 构建单条响应
* 返回:-1 失败,>=0 响应长度
*/
int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){
if (!results || !response) return -1;
const uint8_t *end = response + response_cap;
uint8_t *p = response;
// 计算所需长度1 + 1 + 4 + dlen
// 注意防止 size_t 溢出
size_t need = 1u + 1u + 4u + (size_t)results->dlen;
if (need > response_cap) return -1;
if (kvs_write_u8(&p, end, results->op) < 0) return -1;
if (kvs_write_u8(&p, end, results->status) < 0) return -1;
if (kvs_write_u32(&p, end, results->dlen) < 0) return -1;
if (results->dlen > 0) {
if (!results->data) return -1; // 有长度却没指针,视为错误
if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1;
memcpy(p, results->data, results->dlen);
p += results->dlen;
}
return (int)(p - response);
}
int kvs_save_to_file(){
#if ENABLE_ARRAY
int ret = kvs_array_save(&global_array, KVS_ARRAY_FILE);
#endif
#if ENABLE_RBTREE
#endif
#if ENABLE_HASH
#endif
ksv_clear_log(global_cmd_log_fd);
return ret;
}
#include <errno.h>
int write_full(int fd, const void *buf, size_t len)
{
const uint8_t *p = buf;
while (len > 0) {
ssize_t n = write(fd, p, len);
if (n < 0) {
if (errno == EINTR)
continue;
return -1;
}
p += n;
len -= n;
}
return 0;
}
// 1 read n suc, 0 eof, -1 error
int read_full(int fd, void *buf, size_t n)
{
uint8_t *p = (uint8_t *)buf;
size_t got = 0;
while (got < n) {
ssize_t r = read(fd, p + got, n - got);
if (r > 0) {
got += (size_t)r;
continue;
}
if (r == 0) {
return (got == 0) ? 0 : -1;
}
/* r < 0 */
if (errno == EINTR) {
continue;
}
return -1;
}
return 1;
}
int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd){
if (logfd < 0 || !cmd || len == 0)
return -1;
if (len > UINT32_MAX)
return -2;
uint32_t nlen = htonl((uint32_t)len);
if (write_full(logfd, &nlen, sizeof(nlen)) < 0)
return -3;
if (write_full(logfd, cmd, len) < 0)
return -4;
if (fsync(logfd) < 0)
return -5;
return 0;
}
int kvs_replay_log(const char *logfile, int logfd){
if (!logfile|| logfd<0) return -1;
for (;;) {
uint32_t nlen = 0;
int hr = read_full(logfd, &nlen, sizeof(nlen));
if (hr == 0) break; /* EOF正常结束 */
if (hr < 0) { return -2; } /* 半截头 */
uint32_t len = ntohl(nlen);
if (len == 0) { return -3; }
uint8_t *cmd = (uint8_t *)kvs_malloc(len);
if (!cmd) { return -5; }
int pr = read_full(logfd, cmd, len);
if (pr <= 0) { /* 半截 payload */
kvs_free(cmd);
return -6;
}
kvs_req_t req;
memset(&req, 0, sizeof(req));
int clen = kvs_parse_one_cmd(cmd, (int)len, &req);
if (clen <= 0 || clen != (int)len) {
kvs_free_request(&req);
kvs_free(cmd);
return -7;
}
kvs_rsp_t rsp;
memset(&rsp, 0, sizeof(rsp));
if (kvs_execute_one_cmd(&req, &rsp) < 0) {
kvs_free_request(&req);
kvs_free(cmd);
return -8;
}
kvs_free_request(&req);
kvs_free(cmd);
}
return 0;
}
/**
* clear log file not close
*/
int ksv_clear_log(int logfd){
if(logfd < 0) return -1;
ftruncate(logfd, 0);
lseek(logfd, 0, SEEK_SET);
return 0;
}