Compare commits

...

2 Commits

Author SHA1 Message Date
1iaan
8fdefc2100 feat: add SCANPREFIX/KEYSPREFIX/SCANRANGE/KEYSRANGE 2026-04-07 12:39:55 +08:00
1iaan
aee84df665 容器化 2026-04-07 11:46:41 +08:00
8 changed files with 372 additions and 48 deletions

11
.dockerignore Normal file
View File

@@ -0,0 +1,11 @@
.git
.gitignore
data
mem_leak
*.o
kvstore
test-redis/testcase
test-redis/bench
libbpf-bootstrap
ebpf
img

47
Dockerfile Normal file
View File

@@ -0,0 +1,47 @@
ARG UBUNTU_VERSION=22.04
ARG APT_MIRROR=mirrors.aliyun.com
FROM ubuntu:${UBUNTU_VERSION} AS builder
ARG APT_MIRROR
ENV DEBIAN_FRONTEND=noninteractive
RUN sed -i "s|http://archive.ubuntu.com/ubuntu/|http://${APT_MIRROR}/ubuntu/|g; s|http://security.ubuntu.com/ubuntu/|http://${APT_MIRROR}/ubuntu/|g" /etc/apt/sources.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
ca-certificates \
libhiredis-dev \
liburing-dev \
libxml2-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY . .
RUN make kvstore CFLAGS="-g -D_DEFAULT_SOURCE -DJEMALLOC_NO_DEMANGLE"
FROM ubuntu:${UBUNTU_VERSION}
ARG APT_MIRROR
ENV DEBIAN_FRONTEND=noninteractive
RUN sed -i "s|http://archive.ubuntu.com/ubuntu/|http://${APT_MIRROR}/ubuntu/|g; s|http://security.ubuntu.com/ubuntu/|http://${APT_MIRROR}/ubuntu/|g" /etc/apt/sources.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
ca-certificates \
liburing2 \
libxml2 \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY --from=builder /app/kvstore /app/kvstore
RUN mkdir -p /app/config /app/data
EXPOSE 8888
VOLUME ["/app/data"]
CMD ["./kvstore"]

View File

@@ -1,7 +1,7 @@
<?xml version='1.0' encoding='UTF-8'?> <?xml version='1.0' encoding='UTF-8'?>
<config> <config>
<server> <server>
<ip>127.0.0.1</ip> <ip>0.0.0.0</ip>
<port>8888</port> <port>8888</port>
<mode>master</mode> <mode>master</mode>
@@ -18,7 +18,7 @@
</log> </log>
<persistence> <persistence>
<type>none</type> <type>incremental</type>
<dir>data</dir> <dir>data</dir>
<wal>kvs_oplog.db</wal> <wal>kvs_oplog.db</wal>

16
docker-compose.yml Normal file
View File

@@ -0,0 +1,16 @@
services:
kvstore:
build:
context: .
args:
APT_MIRROR: ${APT_MIRROR:-mirrors.aliyun.com}
image: kvstore:single
container_name: kvstore
restart: unless-stopped
security_opt:
- seccomp=unconfined
ports:
- "8888:8888"
volumes:
- ./config/config.xml:/app/config/config.xml:ro
- ./data:/app/data

View File

@@ -275,6 +275,21 @@ int resp_build_value(const resp_value_t *v, uint8_t *out, size_t cap) {
if (write_crlf(&p, end) < 0) return -1; if (write_crlf(&p, end) < 0) return -1;
break; break;
case RESP_T_ARRAY:
if (write_bytes(&p, end, "*", 1) < 0) return -1;
if (write_u32_ascii(&p, end, v->array.count) < 0) return -1;
if (write_crlf(&p, end) < 0) return -1;
for (uint32_t i = 0; i < v->array.count; ++i) {
if (write_bytes(&p, end, "$", 1) < 0) return -1;
if (write_u32_ascii(&p, end, v->array.items[i].len) < 0) return -1;
if (write_crlf(&p, end) < 0) return -1;
if (v->array.items[i].len > 0 && v->array.items[i].ptr) {
if (write_bytes(&p, end, v->array.items[i].ptr, v->array.items[i].len) < 0) return -1;
}
if (write_crlf(&p, end) < 0) return -1;
}
break;
default: default:
return -1; return -1;
} }
@@ -285,8 +300,8 @@ int resp_build_value(const resp_value_t *v, uint8_t *out, size_t cap) {
/* helpers */ /* helpers */
resp_value_t resp_simple(const char *s) { resp_value_t resp_simple(const char *s) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_SIMPLE_STR; v.type = RESP_T_SIMPLE_STR;
v.i64 = 0;
v.bulk.ptr = (const uint8_t*)s; v.bulk.ptr = (const uint8_t*)s;
v.bulk.len = (uint32_t)strlen(s); v.bulk.len = (uint32_t)strlen(s);
return v; return v;
@@ -294,8 +309,8 @@ resp_value_t resp_simple(const char *s) {
resp_value_t resp_error(const char *s) { resp_value_t resp_error(const char *s) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_ERROR; v.type = RESP_T_ERROR;
v.i64 = 0;
v.bulk.ptr = (const uint8_t*)s; v.bulk.ptr = (const uint8_t*)s;
v.bulk.len = (uint32_t)strlen(s); v.bulk.len = (uint32_t)strlen(s);
return v; return v;
@@ -303,17 +318,16 @@ resp_value_t resp_error(const char *s) {
resp_value_t resp_int(int64_t x) { resp_value_t resp_int(int64_t x) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_INTEGER; v.type = RESP_T_INTEGER;
v.i64 = x; v.i64 = x;
v.bulk.ptr = NULL;
v.bulk.len = 0;
return v; return v;
} }
resp_value_t resp_bulk(const uint8_t *p, uint32_t n) { resp_value_t resp_bulk(const uint8_t *p, uint32_t n) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_BULK_STR; v.type = RESP_T_BULK_STR;
v.i64 = 0;
v.bulk.ptr = p; v.bulk.ptr = p;
v.bulk.len = n; v.bulk.len = n;
return v; return v;
@@ -321,13 +335,31 @@ resp_value_t resp_bulk(const uint8_t *p, uint32_t n) {
resp_value_t resp_nil(void) { resp_value_t resp_nil(void) {
resp_value_t v; resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_NIL; v.type = RESP_T_NIL;
v.i64 = 0;
v.bulk.ptr = NULL;
v.bulk.len = 0;
return v; return v;
} }
resp_value_t resp_array(resp_slice_t *items, uint32_t count, int owns_items) {
resp_value_t v;
memset(&v, 0, sizeof(v));
v.type = RESP_T_ARRAY;
v.array.items = items;
v.array.count = count;
v.array.owns_items = owns_items ? 1 : 0;
return v;
}
void resp_value_release(resp_value_t *v) {
if (!v) return;
if (v->type == RESP_T_ARRAY && v->array.owns_items && v->array.items) {
kvs_free(v->array.items);
}
memset(v, 0, sizeof(*v));
v->type = RESP_T_NIL;
return;
}
/* ----------------- dispatcher (minimal) ----------------- */ /* ----------------- dispatcher (minimal) ----------------- */
static int expect_argv(const resp_cmd_t *cmd, uint32_t n) { static int expect_argv(const resp_cmd_t *cmd, uint32_t n) {
@@ -337,7 +369,7 @@ static int expect_argv(const resp_cmd_t *cmd, uint32_t n) {
const char *command[] = { const char *command[] = {
"PING", "AUTH", "PING", "AUTH",
"SET", "GET", "DEL", "SETEX", "MOD", "EXIST", "SET", "GET", "DEL", "SETEX", "MOD", "EXIST", "SCANPREFIX", "KEYSPREFIX", "SCANRANGE", "KEYSRANGE",
"RSET", "RGET", "RDEL", "RMOD", "REXIST", "RSET", "RGET", "RDEL", "RMOD", "REXIST",
"HSET", "HGET", "HDEL", "HMOD", "HEXIST", "HSET", "HGET", "HDEL", "HMOD", "HEXIST",
"SAVE", "SSYNC", "SREADY", "MEMPRINT" "SAVE", "SSYNC", "SREADY", "MEMPRINT"
@@ -382,6 +414,120 @@ static int match_auth_password(const resp_slice_t *password) {
return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0; return memcmp(global_cfg.redis_auth_password, password->ptr, expect_len) == 0;
} }
static int prefix_match(const uint8_t *key, uint32_t key_len,
const uint8_t *prefix, uint32_t prefix_len) {
if (!key || !prefix) return 0;
if (key_len < prefix_len) return 0;
return memcmp(key, prefix, prefix_len) == 0;
}
static rbtree_node *rbtree_lower_bound(rbtree *T, const uint8_t *key, uint32_t keylen) {
rbtree_node *cur;
rbtree_node *best;
if (!T || !key) return NULL;
cur = T->root;
best = T->nil;
while (cur != T->nil) {
const uint8_t *cur_key = kvs_rbtree_node_key_ptr(cur);
int cmp = kvs_keycmp(cur_key, cur->key_len, key, keylen);
if (cmp < 0) {
cur = cur->right;
} else {
best = cur;
cur = cur->left;
}
}
return best;
}
static int append_scan_item(resp_slice_t **items, uint32_t *count, uint32_t *cap,
const uint8_t *ptr, uint32_t len) {
resp_slice_t *new_items;
uint32_t new_cap;
if (!items || !count || !cap) return -1;
if (*count >= *cap) {
new_cap = (*cap == 0) ? 16u : (*cap * 2u);
new_items = (resp_slice_t *)kvs_malloc(sizeof(resp_slice_t) * (size_t)new_cap);
if (!new_items) return -1;
if (*items && *count > 0) {
memcpy(new_items, *items, sizeof(resp_slice_t) * (size_t)(*count));
kvs_free(*items);
}
*items = new_items;
*cap = new_cap;
}
(*items)[*count].ptr = ptr;
(*items)[*count].len = len;
(*count)++;
return 0;
}
static int build_rbtree_scan_response(const uint8_t *begin, uint32_t begin_len,
const uint8_t *end, uint32_t end_len,
int prefix_mode, int keys_only,
resp_value_t *out_value) {
resp_slice_t *items = NULL;
uint32_t count = 0;
uint32_t cap = 0;
rbtree_node *node;
uint64_t now_ms;
if (!out_value) return -1;
now_ms = kvs_now_ms();
if (begin && begin_len > 0) {
node = rbtree_lower_bound(&global_rbtree, begin, begin_len);
} else if (global_rbtree.root != global_rbtree.nil) {
node = rbtree_mini(&global_rbtree, global_rbtree.root);
} else {
node = global_rbtree.nil;
}
while (node && node != global_rbtree.nil) {
const uint8_t *key = kvs_rbtree_node_key_ptr(node);
const uint8_t *value = kvs_rbtree_node_value_ptr(node);
rbtree_node *next = rbtree_successor(&global_rbtree, node);
if (prefix_mode) {
if (begin && begin_len > 0 && !prefix_match(key, node->key_len, begin, begin_len)) {
break;
}
} else if (end && end_len > 0) {
int end_cmp = kvs_keycmp(key, node->key_len, end, end_len);
if (end_cmp > 0) {
break;
}
}
if (node->expire_at_ms != 0 && node->expire_at_ms <= now_ms) {
(void)ttl_delete_schedule(key, node->key_len);
node = next;
continue;
}
if (append_scan_item(&items, &count, &cap, key, node->key_len) < 0) {
if (items) kvs_free(items);
return -1;
}
if (!keys_only &&
append_scan_item(&items, &count, &cap, value, node->value_len) < 0) {
if (items) kvs_free(items);
return -1;
}
node = next;
}
*out_value = resp_array(items, count, 1);
return 0;
}
/** /**
* 输入cmd * 输入cmd
@@ -462,6 +608,44 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) {
return 0; return 0;
} }
case KVS_CMD_SCANPREFIX: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'scanprefix'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
NULL, 0, 1, 0, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_KEYSPREFIX: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'keysprefix'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
NULL, 0, 1, 1, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_SCANRANGE: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'scanrange'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len,
0, 0, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_KEYSRANGE: {
if (cmd->argc != 3) { *out_value = resp_error("ERR wrong number of arguments for 'keysrange'"); return -1; }
if (build_rbtree_scan_response(cmd->argv[1].ptr, cmd->argv[1].len,
cmd->argv[2].ptr, cmd->argv[2].len,
0, 1, out_value) < 0) {
*out_value = resp_error("ERR internal error");
}
return 0;
}
case KVS_CMD_DEL: { case KVS_CMD_DEL: {
if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; } if (cmd->argc != 2) { *out_value = resp_error("ERR wrong number of arguments for 'del'"); return -1; }
int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len); int r = kvs_rbtree_del(&global_rbtree, cmd->argv[1].ptr, cmd->argv[1].len);

View File

@@ -26,6 +26,10 @@ typedef enum {
KVS_CMD_SETEX, KVS_CMD_SETEX,
KVS_CMD_MOD, KVS_CMD_MOD,
KVS_CMD_EXIST, KVS_CMD_EXIST,
KVS_CMD_SCANPREFIX,
KVS_CMD_KEYSPREFIX,
KVS_CMD_SCANRANGE,
KVS_CMD_KEYSRANGE,
// rbtree // rbtree
KVS_CMD_RSET, KVS_CMD_RSET,
KVS_CMD_RGET, KVS_CMD_RGET,
@@ -62,12 +66,20 @@ typedef enum resp_type_e {
RESP_T_INTEGER, RESP_T_INTEGER,
RESP_T_BULK_STR, RESP_T_BULK_STR,
RESP_T_NIL, /* nil bulk string ($-1\r\n) */ RESP_T_NIL, /* nil bulk string ($-1\r\n) */
RESP_T_ARRAY,
} resp_type_t; } resp_type_t;
typedef struct resp_array_s {
resp_slice_t *items;
uint32_t count;
int owns_items;
} resp_array_t;
typedef struct resp_value_s { typedef struct resp_value_s {
resp_type_t type; resp_type_t type;
int64_t i64; /* for integer */ int64_t i64; /* for integer */
resp_slice_t bulk; /* for simple/error/bulk string (bytes) */ resp_slice_t bulk; /* for simple/error/bulk string (bytes) */
resp_array_t array; /* for arrays */
} resp_value_t; } resp_value_t;
/* ----------------- parsing ----------------- */ /* ----------------- parsing ----------------- */
@@ -86,6 +98,8 @@ resp_value_t resp_error(const char *s); /* -s\r\n */
resp_value_t resp_int(int64_t x); /* :x\r\n */ resp_value_t resp_int(int64_t x); /* :x\r\n */
resp_value_t resp_bulk(const uint8_t *p, uint32_t n); /* $n\r\n...\r\n */ resp_value_t resp_bulk(const uint8_t *p, uint32_t n); /* $n\r\n...\r\n */
resp_value_t resp_nil(void); /* $-1\r\n */ resp_value_t resp_nil(void); /* $-1\r\n */
resp_value_t resp_array(resp_slice_t *items, uint32_t count, int owns_items);
void resp_value_release(resp_value_t *v);
/* ----------------- dispatcher ----------------- */ /* ----------------- dispatcher ----------------- */
/* Execute one command and return a RESP value. /* Execute one command and return a RESP value.

View File

@@ -186,6 +186,51 @@ static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) {
break; break;
} }
case RESP_T_ARRAY: {
char tmp[32];
int n;
size_t t = 0;
if (v->array.count > 0 && !v->array.items) {
return -1;
}
n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->array.count);
if (n <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)n, &t) < 0 ||
checked_size_add(t, 2, &t) < 0) {
return -1;
}
for (uint32_t i = 0; i < v->array.count; ++i) {
size_t item_len = 0;
int item_digits;
if (v->array.items[i].len > 0 && !v->array.items[i].ptr) {
return -1;
}
item_digits = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->array.items[i].len);
if (item_digits <= 0) {
return -1;
}
if (checked_size_add(1, (size_t)item_digits, &item_len) < 0 ||
checked_size_add(item_len, 2, &item_len) < 0 ||
checked_size_add(item_len, (size_t)v->array.items[i].len, &item_len) < 0 ||
checked_size_add(item_len, 2, &item_len) < 0 ||
checked_size_add(t, item_len, &t) < 0) {
return -1;
}
}
len = t;
break;
}
default: default:
return -1; return -1;
} }
@@ -328,11 +373,13 @@ int kvs_protocol(struct conn* conn){
uint8_t *resp_heap = NULL; uint8_t *resp_heap = NULL;
if (resp_value_encoded_len(&val, &resp_need) < 0) { if (resp_value_encoded_len(&val, &resp_need) < 0) {
resp_value_release(&val);
return -1; return -1;
} }
resp_heap = (uint8_t *)kvs_malloc(resp_need); resp_heap = (uint8_t *)kvs_malloc(resp_need);
if (!resp_heap) { if (!resp_heap) {
resp_value_release(&val);
return -1; return -1;
} }
@@ -340,6 +387,7 @@ int kvs_protocol(struct conn* conn){
if (resp_len < 0 || if (resp_len < 0 ||
chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) {
free(resp_heap); free(resp_heap);
resp_value_release(&val);
return -1; return -1;
} }
@@ -349,6 +397,7 @@ int kvs_protocol(struct conn* conn){
} }
out_len += (size_t)resp_len; out_len += (size_t)resp_len;
resp_value_release(&val);
// __completed_cmd(request, consumed, 0); // __completed_cmd(request, consumed, 0);

View File

@@ -165,8 +165,11 @@ rbtree_node *rbtree_search(rbtree *T, const uint8_t *key, uint32_t keylen);
typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg); typedef int (*kvs_rbtree_visit_cb)(rbtree_node *node, void *arg);
rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen, rbtree_node *rbtree_search_with_visit(rbtree *T, const uint8_t *key, uint32_t keylen,
kvs_rbtree_visit_cb cb, void *arg); kvs_rbtree_visit_cb cb, void *arg);
rbtree_node *rbtree_mini(rbtree *T, rbtree_node *x);
rbtree_node *rbtree_successor(rbtree *T, rbtree_node *x);
const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node); const uint8_t *kvs_rbtree_node_key_ptr(const rbtree_node *node);
const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node); const uint8_t *kvs_rbtree_node_value_ptr(const rbtree_node *node);
int kvs_keycmp(const uint8_t *a, uint32_t alen, const uint8_t *b, uint32_t blen);
int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename);
int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename);