n*spsc uring_pool

This commit is contained in:
2026-03-03 12:56:07 +00:00
parent ff924b033c
commit 2ec61bdf85
9 changed files with 747 additions and 616 deletions

212
kvstore.c
View File

@@ -41,14 +41,120 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){
}
#include <sys/time.h>
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec))
#include <sys/time.h>
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec))
static int checked_size_add(size_t a, size_t b, size_t *out) {
if (!out || a > SIZE_MAX - b) {
return -1;
}
*out = a + b;
return 0;
}
static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) {
size_t len = 0;
if (!v || !out_len) {
return -1;
}
switch (v->type) {
case RESP_T_SIMPLE_STR:
case RESP_T_ERROR:
if (checked_size_add(1, (size_t)v->bulk.len, &len) < 0 ||
checked_size_add(len, 2, &len) < 0) {
return -1;
}
break;
case RESP_T_INTEGER: {
char tmp[64];
int n = snprintf(tmp, sizeof(tmp), "%lld", (long long)v->i64);
if (n <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)n, &len) < 0 ||
checked_size_add(len, 2, &len) < 0) {
return -1;
}
break;
}
case RESP_T_NIL:
len = 5; /* "$-1\r\n" */
break;
case RESP_T_BULK_STR: {
char tmp[32];
int n;
size_t t;
if (v->bulk.len > 0 && !v->bulk.ptr) {
return -1;
}
n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->bulk.len);
if (n <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)n, &t) < 0 || /* '$' + len digits */
checked_size_add(t, 2, &t) < 0 || /* \r\n */
checked_size_add(t, (size_t)v->bulk.len, &t) < 0 ||
checked_size_add(t, 2, &len) < 0) { /* trailing \r\n */
return -1;
}
break;
}
default:
return -1;
}
*out_len = len;
return 0;
}
static int flush_pending_response(struct conn *conn, uint8_t *buf, size_t *out_len) {
if (!conn || !buf || !out_len) {
return -1;
}
if (*out_len == 0) {
return 0;
}
if (chain_buffer_append(&conn->wbuf, buf, *out_len) < 0) {
return -1;
}
*out_len = 0;
return 0;
}
static int is_update_cmd(const resp_cmd_t *cmd) {
const resp_slice_t *c0;
if (!cmd || cmd->argc == 0 || !cmd->argv[0].ptr || cmd->argv[0].len == 0) {
return 0;
}
c0 = &cmd->argv[0];
return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 ||
ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0;
}
int kvs_protocol(struct conn* conn){
#if TIME_COLLECT == 1
struct timeval func_start;
gettimeofday(&func_start, NULL);
long total_oplog_us = 0;
#if TIME_COLLECT == 1
struct timeval func_start;
gettimeofday(&func_start, NULL);
long total_oplog_us = 0;
#endif
if (!conn) return -1;
@@ -62,11 +168,11 @@ int kvs_protocol(struct conn* conn){
uint8_t response[KVS_MAX_RESPONSE];
int consumed = 0;
int out_len = 0;
while(consumed < request_length ){
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
size_t out_len = 0;
while(consumed < request_length ){
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
resp_cmd_t cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -155,13 +261,15 @@ int kvs_protocol(struct conn* conn){
// }
// }
if(global_cfg.persistence == PERSIST_INCREMENTAL){
kvs_oplog_append(p, len, global_oplog_fd);
}
int need_persist = is_update_cmd(&cmd);
if(global_cfg.persistence == PERSIST_INCREMENTAL && need_persist){
kvs_oplog_append(p, len, global_oplog_fd);
}
// __completed_cmd(p, len, global_seq);
// global_seq ++;
if (global_cfg.replica_mode == REPLICA_ENABLE) {
if (global_cfg.replica_mode == REPLICA_ENABLE && need_persist) {
uint32_t off = 0;
int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off);
if (ar == 0) {
@@ -178,38 +286,60 @@ int kvs_protocol(struct conn* conn){
gettimeofday(&oplog_end, NULL);
total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 +
(oplog_end.tv_usec - oplog_start.tv_usec);
#endif
/* 构建响应 */
int cap = KVS_MAX_RESPONSE - out_len;
if (cap <= 0) {
return consumed;
#endif
/* 构建响应 */
int resp_len = resp_build_value(&val, response + out_len, sizeof(response) - out_len);
if (resp_len < 0) {
/* 当前批次剩余空间不够,先把已拼好的刷到发送队列再重试 */
if (flush_pending_response(conn, response, &out_len) < 0) {
return -1;
}
resp_len = resp_build_value(&val, response, sizeof(response));
if (resp_len < 0) {
size_t resp_need = 0;
uint8_t *resp_heap = NULL;
if (resp_value_encoded_len(&val, &resp_need) < 0) {
return -1;
}
resp_heap = (uint8_t *)malloc(resp_need);
if (!resp_heap) {
return -1;
}
resp_len = resp_build_value(&val, resp_heap, resp_need);
if (resp_len < 0 ||
chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) {
free(resp_heap);
return -1;
}
free(resp_heap);
resp_len = 0;
}
}
int resp_len = resp_build_value(&val, response + out_len, (size_t)cap);
if (resp_len < 0) {
return consumed;
}
out_len += (size_t)resp_len;
__completed_cmd(request, consumed, 0);
consumed += len;
}
__completed_cmd(request, consumed, 0);
out_len += resp_len;
consumed += len;
}
#if TIME_COLLECT == 1
#if TIME_COLLECT == 1
struct timeval func_end;
gettimeofday(&func_end, NULL);
long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 +
(func_end.tv_usec - func_start.tv_usec);
fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us);
#endif
if (out_len > 0) {
if (chain_buffer_append(&conn->wbuf, response, (size_t)out_len) < 0) {
return -1;
}
}
fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us);
#endif
if (flush_pending_response(conn, response, &out_len) < 0) {
return -1;
}
return consumed;
}