From fbdcff6878d4bafc3e90ec02e207c8cedca430d7 Mon Sep 17 00:00:00 2001
From: 1iaan <139833683+1iaan@users.noreply.github.com>
Date: Fri, 30 Jan 2026 16:00:06 +0000
Subject: [PATCH] =?UTF-8?q?ebpf=E7=9A=84=E4=B8=BB=E4=BB=8E=E5=90=8C?=
=?UTF-8?q?=E6=AD=A5=E5=AE=9E=E7=8E=B0=EF=BC=8CQPS=E6=B5=8B=E8=AF=95?=
=?UTF-8?q?=E4=B8=8E=E5=86=85=E5=AD=98=E6=B1=A0QPS=E6=B5=8B=E8=AF=95?=
=?UTF-8?q?=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
Makefile | 8 +-
README.md | 193 ++++++++++++++++++++-
config/config.xml | 4 +-
diskuring/diskuring.c | 7 +
dump/kvs_dump.h | 1 -
dump/kvs_oplog.c | 4 +-
dump/kvs_snapshot.c | 8 +-
ebpf/c/replica.bpf.c | 111 +++++--------
ebpf/c/replica.c | 359 ++++++++++++++++++++--------------------
ebpf/c/replica.h | 51 +++---
kvs_array_bin.c | 12 +-
kvs_hash_bin.c | 12 +-
kvs_protocol_resp.c | 19 +++
kvs_protocol_resp.h | 3 +
kvs_rbtree_bin.c | 8 +-
kvs_slave.c | 18 +-
kvstore.c | 25 ++-
kvstore.h | 3 +-
memory/alloc_dispatch.c | 44 ++---
memory/mempool.c | 2 +-
memory/mempool.h | 7 +-
reactor.c | 6 +
test-redis/test.c | 77 +++++----
23 files changed, 599 insertions(+), 383 deletions(-)
diff --git a/Makefile b/Makefile
index 1366e3b..02c5d99 100644
--- a/Makefile
+++ b/Makefile
@@ -12,7 +12,8 @@ SRCS = $(NET_SRCS) $(KV_SRCS) $(MEM_SRCS) $(COMMON_SRCS) $(DUMP_SRCS)
INC = -I./NtyCo/core/ -I/usr/include/libxml2 -I./
LIBDIRS = -L./NtyCo/
-LIBS = -lntyco -lpthread -luring -ldl -lxml2 -ljemalloc
+# LIBS = -lntyco -lpthread -luring -ldl -lxml2 -ljemalloc
+LIBS = -lntyco -lpthread -luring -ldl -lxml2
TARGET = kvstore
SUBDIR = ./NtyCo/
@@ -42,8 +43,11 @@ $(TEST_REDIS): $(TEST_REDIS_SRCS)
%.o: %.c
$(CC) $(CFLAGS) $(INC) -c $^ -g -o $@
-clean: clmem
+clean: clmem cldata
rm -rf $(OBJS) $(TARGET) $(TEST_REDIS)
clmem:
rm -rf mem_leak/*
+
+cldata:
+ rm -rf data/*
diff --git a/README.md b/README.md
index 2cdc6d7..630afcd 100644
--- a/README.md
+++ b/README.md
@@ -3,14 +3,14 @@
## 需求
1. ntyco需要作为kvstore的submodule,通过git clone一次下载。 **完成**。
2. README需要包含编译步骤,测试方案与可行性,性能数据。 **完成**。
-3. 全量持久化保存数据集。 **BUG FIX,完成**。
+3. 全量持久化保存数据集。 **BUG FIX**。
4. 持久化的性能数据。
5. 特殊字符,可以解决redis的resp协议。 **完成**。
6. 实现配置文件,把日志级别,端口ip,主从模式,持久化方案。 **完成**。
7. 持久化落盘用io_uring,加载配置文件用mmap。 **完成**。
-8. 主从同步的性能,开启与关闭性能做到。
-9. 主从同步600w条,出现的coredump。
-10. 主从同步用ebpf实现。
+8. 主从同步的性能,开启与关闭性能做到5%?。
+9. 主从同步600w条,出现的coredump。 **完成**。
+10. 主从同步用ebpf实现。 **BUG FIX**。
11. 内存池测试qps与虚拟内存,物理内存。
12. 实现一个内存泄露检测组件。 **完成**。
@@ -34,6 +34,191 @@ make
```
## 测试
+### 测试1:性能测试
+测试条件:
+1. 不启用持久化。
+2. 不启用主从同步。
+2. pipline:
+ 1. RSET 100w 条, p:i v:i -> +OK
+ 2. RGET 100w 条, p:i -> +v:i
+ 3. RDEL 100w 条。 p:i -> +OK
+3. 重复 15 次.
+4. 本机发送请求。
+
+内存分配: malloc
+```bash
+lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3
+Connected to 192.168.10.129:8888
+BATCH (N=3000000) --> time_used=3294 ms, qps=910746
+BATCH (N=3000000) --> time_used=3501 ms, qps=856898
+BATCH (N=3000000) --> time_used=3457 ms, qps=867804
+BATCH (N=3000000) --> time_used=3351 ms, qps=895255
+BATCH (N=3000000) --> time_used=3320 ms, qps=903614
+BATCH (N=3000000) --> time_used=3551 ms, qps=844832
+BATCH (N=3000000) --> time_used=3354 ms, qps=894454
+BATCH (N=3000000) --> time_used=3475 ms, qps=863309
+BATCH (N=3000000) --> time_used=3404 ms, qps=881316
+BATCH (N=3000000) --> time_used=3460 ms, qps=867052
+BATCH (N=3000000) --> time_used=3392 ms, qps=884433
+BATCH (N=3000000) --> time_used=3427 ms, qps=875401
+BATCH (N=3000000) --> time_used=3441 ms, qps=871839
+BATCH (N=3000000) --> time_used=3471 ms, qps=864304
+BATCH (N=3000000) --> time_used=3354 ms, qps=894454
+BATCH (N=3000000) --> time_used=3447 ms, qps=870322
+BATCH (N=3000000) --> time_used=3364 ms, qps=891795
+BATCH (N=3000000) --> time_used=3200 ms, qps=937500
+BATCH (N=3000000) --> time_used=3159 ms, qps=949667
+BATCH (N=3000000) --> time_used=3482 ms, qps=861573
+BATCH (N=3000000) --> time_used=3474 ms, qps=863557
+BATCH (N=3000000) --> time_used=3591 ms, qps=835421
+BATCH (N=3000000) --> time_used=3466 ms, qps=865551
+BATCH (N=3000000) --> time_used=3425 ms, qps=875912
+BATCH (N=3000000) --> time_used=3346 ms, qps=896592
+BATCH (N=3000000) --> time_used=3532 ms, qps=849377
+BATCH (N=3000000) --> time_used=3471 ms, qps=864304
+BATCH (N=3000000) --> time_used=3616 ms, qps=829646
+BATCH (N=3000000) --> time_used=3403 ms, qps=881575
+BATCH (N=3000000) --> time_used=3426 ms, qps=875656
+BATCH (N=3000000) --> time_used=3493 ms, qps=858860
+BATCH (N=3000000) --> time_used=3411 ms, qps=879507
+BATCH (N=3000000) --> time_used=3422 ms, qps=876680
+BATCH (N=3000000) --> time_used=3556 ms, qps=843644
+BATCH (N=3000000) --> time_used=3285 ms, qps=913242
+BATCH (N=3000000) --> time_used=3486 ms, qps=860585
+BATCH (N=3000000) --> time_used=3427 ms, qps=875401
+BATCH (N=3000000) --> time_used=3563 ms, qps=841987
+BATCH (N=3000000) --> time_used=3304 ms, qps=907990
+BATCH (N=3000000) --> time_used=3582 ms, qps=837520
+BATCH (N=3000000) --> time_used=3468 ms, qps=865051
+BATCH (N=3000000) --> time_used=3360 ms, qps=892857
+BATCH (N=3000000) --> time_used=3426 ms, qps=875656
+BATCH (N=3000000) --> time_used=3186 ms, qps=941619
+BATCH (N=3000000) --> time_used=3251 ms, qps=922792
+BATCH (N=3000000) --> time_used=3400 ms, qps=882352
+BATCH (N=3000000) --> time_used=3446 ms, qps=870574
+BATCH (N=3000000) --> time_used=3302 ms, qps=908540
+BATCH (N=3000000) --> time_used=3072 ms, qps=976562
+BATCH (N=3000000) --> time_used=3458 ms, qps=867553
+average qps:880462
+ALL TESTS PASSED.
+```
+
+内存分配: 自实现内存池
+```bash
+lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3
+Connected to 192.168.10.129:8888
+BATCH (N=3000000) --> time_used=3552 ms, qps=844594
+BATCH (N=3000000) --> time_used=3404 ms, qps=881316
+BATCH (N=3000000) --> time_used=3083 ms, qps=973078
+BATCH (N=3000000) --> time_used=3315 ms, qps=904977
+BATCH (N=3000000) --> time_used=3462 ms, qps=866551
+BATCH (N=3000000) --> time_used=3334 ms, qps=899820
+BATCH (N=3000000) --> time_used=3358 ms, qps=893388
+BATCH (N=3000000) --> time_used=3423 ms, qps=876424
+BATCH (N=3000000) --> time_used=3319 ms, qps=903886
+BATCH (N=3000000) --> time_used=3327 ms, qps=901713
+BATCH (N=3000000) --> time_used=3157 ms, qps=950269
+BATCH (N=3000000) --> time_used=3241 ms, qps=925640
+BATCH (N=3000000) --> time_used=3301 ms, qps=908815
+BATCH (N=3000000) --> time_used=3345 ms, qps=896860
+BATCH (N=3000000) --> time_used=3319 ms, qps=903886
+BATCH (N=3000000) --> time_used=3312 ms, qps=905797
+BATCH (N=3000000) --> time_used=3337 ms, qps=899011
+BATCH (N=3000000) --> time_used=3309 ms, qps=906618
+BATCH (N=3000000) --> time_used=3385 ms, qps=886262
+BATCH (N=3000000) --> time_used=3328 ms, qps=901442
+BATCH (N=3000000) --> time_used=3194 ms, qps=939261
+BATCH (N=3000000) --> time_used=3309 ms, qps=906618
+BATCH (N=3000000) --> time_used=3262 ms, qps=919681
+BATCH (N=3000000) --> time_used=3314 ms, qps=905250
+BATCH (N=3000000) --> time_used=3382 ms, qps=887049
+BATCH (N=3000000) --> time_used=3296 ms, qps=910194
+BATCH (N=3000000) --> time_used=3331 ms, qps=900630
+BATCH (N=3000000) --> time_used=3279 ms, qps=914913
+BATCH (N=3000000) --> time_used=2996 ms, qps=1001335
+BATCH (N=3000000) --> time_used=3387 ms, qps=885739
+BATCH (N=3000000) --> time_used=3346 ms, qps=896592
+BATCH (N=3000000) --> time_used=3241 ms, qps=925640
+BATCH (N=3000000) --> time_used=3353 ms, qps=894721
+BATCH (N=3000000) --> time_used=3366 ms, qps=891265
+BATCH (N=3000000) --> time_used=3387 ms, qps=885739
+BATCH (N=3000000) --> time_used=3382 ms, qps=887049
+BATCH (N=3000000) --> time_used=3358 ms, qps=893388
+BATCH (N=3000000) --> time_used=3372 ms, qps=889679
+BATCH (N=3000000) --> time_used=3467 ms, qps=865301
+BATCH (N=3000000) --> time_used=3243 ms, qps=925069
+BATCH (N=3000000) --> time_used=3191 ms, qps=940144
+BATCH (N=3000000) --> time_used=3365 ms, qps=891530
+BATCH (N=3000000) --> time_used=3218 ms, qps=932256
+BATCH (N=3000000) --> time_used=3332 ms, qps=900360
+BATCH (N=3000000) --> time_used=3172 ms, qps=945775
+BATCH (N=3000000) --> time_used=3211 ms, qps=934288
+BATCH (N=3000000) --> time_used=3124 ms, qps=960307
+BATCH (N=3000000) --> time_used=3043 ms, qps=985869
+BATCH (N=3000000) --> time_used=3086 ms, qps=972132
+BATCH (N=3000000) --> time_used=3201 ms, qps=937207
+average qps:911106
+ALL TESTS PASSED.
+```
+
+内存分配:jemalloc
+```shell
+lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3
+Connected to 192.168.10.129:8888
+BATCH (N=3000000) --> time_used=3197 ms, qps=938379
+BATCH (N=3000000) --> time_used=3221 ms, qps=931387
+BATCH (N=3000000) --> time_used=3360 ms, qps=892857
+BATCH (N=3000000) --> time_used=3292 ms, qps=911300
+BATCH (N=3000000) --> time_used=3407 ms, qps=880540
+BATCH (N=3000000) --> time_used=3317 ms, qps=904431
+BATCH (N=3000000) --> time_used=3337 ms, qps=899011
+BATCH (N=3000000) --> time_used=3384 ms, qps=886524
+BATCH (N=3000000) --> time_used=3355 ms, qps=894187
+BATCH (N=3000000) --> time_used=3379 ms, qps=887836
+BATCH (N=3000000) --> time_used=3243 ms, qps=925069
+BATCH (N=3000000) --> time_used=3377 ms, qps=888362
+BATCH (N=3000000) --> time_used=3212 ms, qps=933997
+BATCH (N=3000000) --> time_used=3248 ms, qps=923645
+BATCH (N=3000000) --> time_used=3234 ms, qps=927643
+BATCH (N=3000000) --> time_used=3152 ms, qps=951776
+BATCH (N=3000000) --> time_used=3089 ms, qps=971188
+BATCH (N=3000000) --> time_used=3287 ms, qps=912686
+BATCH (N=3000000) --> time_used=3079 ms, qps=974342
+BATCH (N=3000000) --> time_used=3261 ms, qps=919963
+BATCH (N=3000000) --> time_used=3123 ms, qps=960614
+BATCH (N=3000000) --> time_used=3234 ms, qps=927643
+BATCH (N=3000000) --> time_used=3056 ms, qps=981675
+BATCH (N=3000000) --> time_used=3040 ms, qps=986842
+BATCH (N=3000000) --> time_used=3187 ms, qps=941324
+BATCH (N=3000000) --> time_used=3311 ms, qps=906070
+BATCH (N=3000000) --> time_used=3155 ms, qps=950871
+BATCH (N=3000000) --> time_used=3318 ms, qps=904159
+BATCH (N=3000000) --> time_used=3372 ms, qps=889679
+BATCH (N=3000000) --> time_used=3254 ms, qps=921942
+BATCH (N=3000000) --> time_used=3386 ms, qps=886001
+BATCH (N=3000000) --> time_used=3413 ms, qps=878992
+BATCH (N=3000000) --> time_used=3474 ms, qps=863557
+BATCH (N=3000000) --> time_used=3412 ms, qps=879249
+BATCH (N=3000000) --> time_used=3414 ms, qps=878734
+BATCH (N=3000000) --> time_used=3325 ms, qps=902255
+BATCH (N=3000000) --> time_used=3346 ms, qps=896592
+BATCH (N=3000000) --> time_used=3345 ms, qps=896860
+BATCH (N=3000000) --> time_used=3582 ms, qps=837520
+BATCH (N=3000000) --> time_used=3412 ms, qps=879249
+BATCH (N=3000000) --> time_used=3370 ms, qps=890207
+BATCH (N=3000000) --> time_used=3375 ms, qps=888888
+BATCH (N=3000000) --> time_used=3190 ms, qps=940438
+BATCH (N=3000000) --> time_used=3324 ms, qps=902527
+BATCH (N=3000000) --> time_used=3253 ms, qps=922225
+BATCH (N=3000000) --> time_used=3230 ms, qps=928792
+BATCH (N=3000000) --> time_used=3294 ms, qps=910746
+BATCH (N=3000000) --> time_used=3295 ms, qps=910470
+BATCH (N=3000000) --> time_used=3148 ms, qps=952986
+BATCH (N=3000000) --> time_used=3228 ms, qps=929368
+average qps:914031
+ALL TESTS PASSED.
+```
+
### 面试题
1. 为什么会实现kvstore,使用场景在哪里?
diff --git a/config/config.xml b/config/config.xml
index f1e2e71..bf8a219 100644
--- a/config/config.xml
+++ b/config/config.xml
@@ -17,7 +17,7 @@
- incremental
+ none
data
kvs_oplog.db
@@ -27,7 +27,7 @@
- mypool
+ malloc
disable
diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c
index a0666ce..aafdd46 100644
--- a/diskuring/diskuring.c
+++ b/diskuring/diskuring.c
@@ -110,6 +110,13 @@ static void *worker_main(void *arg)
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
task_finish(done, cqe->res);
+ // todo: 失败应该通知主线程,提供一个链表,把失败的taskpush进去。主线程用定时器任务定时处理失败信息然后destory。
+ // 暂时用打印说明失败
+ if(done->res < 0){
+ printf("uring failed: fd:%d, offset:%ld\n", done->fd, done->off);
+ }
+ task_destroy(done);
+
io_uring_cqe_seen(&ctx->ring, cqe);
}
}
diff --git a/dump/kvs_dump.h b/dump/kvs_dump.h
index 95b9a51..bab43b3 100644
--- a/dump/kvs_dump.h
+++ b/dump/kvs_dump.h
@@ -11,7 +11,6 @@ extern char global_hash_file[256];
int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file);
int kvs_create_snapshot_async(const char *ip, int port);
-void __complete_snapshot(const char *ip, int port, const char *array_file, const char *rbtree_file, const char *hash_file);
extern int global_oplog_fd;
diff --git a/dump/kvs_oplog.c b/dump/kvs_oplog.c
index e6ccef1..6a9f857 100644
--- a/dump/kvs_oplog.c
+++ b/dump/kvs_oplog.c
@@ -54,8 +54,8 @@ int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd){
return -4;
}
- task_wait(t);
- task_destroy(t);
+ // task_wait(t);
+ // task_destroy(t);
return 0;
}
diff --git a/dump/kvs_snapshot.c b/dump/kvs_snapshot.c
index ba11b85..17d8564 100644
--- a/dump/kvs_snapshot.c
+++ b/dump/kvs_snapshot.c
@@ -1,5 +1,6 @@
#include "kvstore.h"
#include "diskuring/diskuring.h"
+#include "kvs_dump.h"
#include
#include
#include
@@ -39,10 +40,6 @@ int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char
return ret;
}
-void __complete_snapshot(const char *ip, int port, const char *array_file, const char *rbtree_file, const char *hash_file){
-
-}
-
static int send_file_to_ipport(const char *ip, int port, const char *filename) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) { perror("socket"); return -1; }
@@ -119,9 +116,6 @@ int kvs_create_snapshot_async(const char *ip, int port){
_exit(1);
}
- // hook 通知 eBPF
- __complete_snapshot(ip, port, tmp_array, tmp_rbtree, tmp_hash);
-
iouring_shutdown(&uring);
_exit(0);
} else {
diff --git a/ebpf/c/replica.bpf.c b/ebpf/c/replica.bpf.c
index b9e43b0..14c6778 100644
--- a/ebpf/c/replica.bpf.c
+++ b/ebpf/c/replica.bpf.c
@@ -1,103 +1,80 @@
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/* Copyright (c) 2020 Facebook */
-#include
-#include
+#include "vmlinux.h"
#include
#include
+#include
#include "replica.h"
char LICENSE[] SEC("license") = "Dual BSD/GPL";
-int my_pid = 0;
-
struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
- __uint(key_size, sizeof(int));
- __uint(value_size, sizeof(int));
-} channel SEC(".maps");
+ __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+ __uint(key_size, sizeof(int));
+ __uint(value_size, sizeof(int));
+} events SEC(".maps");
-SEC("uprobe/kvs_create_snapshot_async")
-int uprobe_create_snapshot_async(struct pt_regs *ctx)
+/* __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); */
+SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__completed_cmd")
+int BPF_KPROBE(handle_completed_cmd,
+ const __u8 *cmd, size_t len, __u64 seq)
{
- struct event ev;
- __builtin_memset(&ev, 0, sizeof(ev));
+ struct replica_event evt = {};
+ __u32 copy_len;
- const char *ip;
- __u32 port;
+ evt.type = EVENT_COMPLETED_CMD;
+ evt.complete.seq = seq;
- ev.type = EVENT_CREATE_SNAPSHOT_ASYNC;
+ copy_len = len;
+ if (copy_len > MAX_CMD_LEN)
+ copy_len = MAX_CMD_LEN;
- ip = (const char *)PT_REGS_PARM1(ctx);
- port = (__u32)PT_REGS_PARM2(ctx);
+ evt.complete.len = copy_len;
- bpf_probe_read_user_str(ev.data.sync.ip,
- sizeof(ev.data.sync.ip),
- ip);
- ev.data.sync.port = port;
+ if (cmd)
+ bpf_probe_read_user(evt.complete.cmd, copy_len, cmd);
- bpf_perf_event_output(ctx, &channel,
+ bpf_perf_event_output(ctx, &events,
BPF_F_CURRENT_CPU,
- &ev, sizeof(ev));
+ &evt, sizeof(evt));
return 0;
}
-SEC("uprobe/__compeleted_cmd")
-int uprobe_completed_cmd(struct pt_regs *ctx)
+/* __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); */
+SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__ssync")
+int BPF_KPROBE(handle_ssync,
+ const __u8 *ip, __u32 ip_len, int port, __u64 seq)
{
- struct event ev;
- __builtin_memset(&ev, 0, sizeof(ev));
+ struct replica_event evt = {};
- const __u8 *cmd;
- __u32 len;
+ evt.type = EVENT_SSYNC;
+ evt.sync.seq = seq;
+ evt.sync.port = port;
- ev.type = EVENT_COMPLETED_CMD;
+ __u32 copy_len = ip_len;
+ if (copy_len > sizeof(evt.sync.ip))
+ copy_len = sizeof(evt.sync.ip);
- cmd = (const __u8 *)PT_REGS_PARM1(ctx);
- len = (__u32)PT_REGS_PARM2(ctx);
+ if (ip)
+ bpf_probe_read_user(evt.sync.ip, copy_len, ip);
- if (len > sizeof(ev.data.cmd.cmd))
- len = sizeof(ev.data.cmd.cmd);
-
- ev.data.cmd.len = len;
-
- bpf_probe_read_user(ev.data.cmd.cmd, len, cmd);
-
- bpf_perf_event_output(ctx, &channel,
+ bpf_perf_event_output(ctx, &events,
BPF_F_CURRENT_CPU,
- &ev, sizeof(ev));
+ &evt, sizeof(evt));
return 0;
}
-
-SEC("uprobe/__create_snapshot_ok")
-int uprobe_create_snapshot_ok(struct pt_regs *ctx)
+/* __sready(void); */
+SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__sready")
+int BPF_KPROBE(handle_sready)
{
- struct event ev;
- __builtin_memset(&ev, 0, sizeof(ev));
-
- const char *array_file;
- const char *rbtree_file;
- const char *hash_file;
+ struct replica_event evt = {};
- ev.type = EVENT_CREATE_SNAPSHOT_OK;
+ evt.type = EVENT_SREADY;
- array_file = (const char *)PT_REGS_PARM1(ctx);
- rbtree_file = (const char *)PT_REGS_PARM2(ctx);
- hash_file = (const char *)PT_REGS_PARM3(ctx);
-
- bpf_probe_read_user_str(ev.data.ok.array_file,
- sizeof(ev.data.ok.array_file),
- array_file);
- bpf_probe_read_user_str(ev.data.ok.rbtree_file,
- sizeof(ev.data.ok.rbtree_file),
- rbtree_file);
- bpf_probe_read_user_str(ev.data.ok.hash_file,
- sizeof(ev.data.ok.hash_file),
- hash_file);
-
- bpf_perf_event_output(ctx, &channel,
+ bpf_perf_event_output(ctx, &events,
BPF_F_CURRENT_CPU,
- &ev, sizeof(ev));
+ &evt, sizeof(evt));
return 0;
}
\ No newline at end of file
diff --git a/ebpf/c/replica.c b/ebpf/c/replica.c
index c373ab2..87d454c 100644
--- a/ebpf/c/replica.c
+++ b/ebpf/c/replica.c
@@ -2,6 +2,8 @@
/* Copyright (c) 2020 Facebook */
#include
#include
+#include
+#include
#include
#include
#include "replica.skel.h"
@@ -12,10 +14,15 @@
#include "replica.h"
+typedef enum {
+ OFFLINE = 0,
+ ONLINE = 1,
+}replica_state_e ;
struct cmd_node {
+ __u64 seq;
+ __u32 len;
uint8_t *cmd;
- size_t len;
struct cmd_node *next;
};
@@ -25,164 +32,184 @@ struct pending_queue {
int count;
};
-static void queue_init(struct pending_queue *q) {
- q->head = q->tail = NULL;
- q->count = 0;
-}
+/* ================= 全局状态 ================= */
-static void queue_push(struct pending_queue *q, const uint8_t *cmd, size_t len) {
- struct cmd_node *node = malloc(sizeof(*node));
- if (!node) return;
- node->cmd = malloc(len);
- if (!node->cmd) { free(node); return; }
- memcpy(node->cmd, cmd, len);
- node->len = len;
- node->next = NULL;
- if (q->tail) q->tail->next = node;
- else q->head = node;
- q->tail = node;
- q->count++;
-}
+static replica_state_e state = OFFLINE;
+static int sockfd = -1;
-static void queue_send_and_clear(struct pending_queue *q, int sock) {
- struct cmd_node *node = q->head;
- int sent_count = 0;
- while (node) {
- if (send(sock, node->cmd, node->len, 0) > 0) {
- sent_count++;
- }
- struct cmd_node *tmp = node;
- node = node->next;
- free(tmp->cmd);
- free(tmp);
- }
- if (sent_count > 0) {
- printf("[QUEUE] Sent %d commands to slave\n", sent_count);
- }
- queue_init(q);
-}
+static char peer_ip[MAX_IP_LEN];
+static int peer_port;
+static __u64 peer_seq;
-static void queue_free(struct pending_queue *q) {
- struct cmd_node *node = q->head;
- while (node) {
- struct cmd_node *tmp = node;
- node = node->next;
- free(tmp->cmd);
- free(tmp);
- }
- queue_init(q);
-}
+static struct pending_queue pending = {
+ .head = NULL,
+ .tail = NULL,
+ .count = 0,
+};
-static int send_file(int sock, const char *path) {
- FILE *fp = fopen(path, "rb");
- if (!fp) {
- printf("[ERROR] Failed to open file: %s\n", path);
- return -1;
- }
-
- char buf[4096];
- size_t n, total = 0;
- while ((n = fread(buf, 1, sizeof(buf), fp)) > 0) {
- if (send(sock, buf, n, 0) < 0) {
- fclose(fp);
- printf("[ERROR] Failed to send file: %s (sent %zu bytes)\n", path, total);
- return -1;
- }
- total += n;
- }
- fclose(fp);
- printf("[FILE] Sent %s (%zu bytes)\n", path, total);
- return 0;
-}
-
-// 全局状态(单 Slave 简化)
-static enum state current_state = NOSLAVE;
-static char slave_ip[16] = {0};
-static int slave_port = 0;
-static char array_file[128] = {0};
-static char rbtree_file[128] = {0};
-static char hash_file[128] = {0};
-static struct pending_queue pending;
-static int slave_sock = -1; // 连接 Slave 的 socket
-
-// 连接 Slave
-static int connect_slave() {
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if (sock < 0) return -1;
- struct sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_port = htons(slave_port);
- inet_pton(AF_INET, slave_ip, &addr.sin_addr);
- if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
- close(sock);
- return -1;
- }
- return sock;
-}
-
-static void handle_event(void *ctx, int cpu, void *data, __u32 size) {
- struct event *ev = (struct event *)data;
-
- switch (ev->type) {
- case EVENT_CREATE_SNAPSHOT_ASYNC:
- printf("[EVENT] Type: CREATE_SNAPSHOT_ASYNC\n");
- printf("[EVENT] Slave IP: %s, Port: %u\n", ev->data.sync.ip, ev->data.sync.port);
-
- if (current_state == NOSLAVE) {
- current_state = START;
- strncpy(slave_ip, ev->data.sync.ip, sizeof(slave_ip));
- slave_port = ev->data.sync.port;
- queue_init(&pending);
- slave_sock = connect_slave(); // 连接 Slave
- if (slave_sock < 0) {
- printf("Failed to connect to Slave %s:%d\n", slave_ip, slave_port);
- current_state = NOSLAVE;
- }
- }
- break;
- case EVENT_COMPLETED_CMD:
- printf("[EVENT] Type: COMPLETED_CMD\n");
- printf("[EVENT] Command length: %llu bytes\n", ev->data.cmd.len);
-
- if (current_state != NOSLAVE) {
- queue_push(&pending, ev->data.cmd.cmd, ev->data.cmd.len);
- }
- break;
- case EVENT_CREATE_SNAPSHOT_OK:
- printf("[EVENT] Type: CREATE_SNAPSHOT_OK\n");
- printf("[EVENT] Array file: %s\n", ev->data.ok.array_file);
- printf("[EVENT] RBTree file: %s\n", ev->data.ok.rbtree_file);
- printf("[EVENT] Hash file: %s\n", ev->data.ok.hash_file);
-
- if (current_state == START) {
- current_state = DONE;
- strncpy(array_file, ev->data.ok.array_file, sizeof(array_file));
- strncpy(rbtree_file, ev->data.ok.rbtree_file, sizeof(rbtree_file));
- strncpy(hash_file, ev->data.ok.hash_file, sizeof(hash_file));
- }
- break;
- }
-}
-
-
-static void lost_event(void *ctx, int cpu, __u64 cnt) {
- printf("Lost %llu events\n", cnt);
-}
-
-static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args)
+/* ================= pending 队列操作 ================= */
+static void pending_free()
{
- return vfprintf(stderr, format, args);
+ struct pending_queue *q = &pending;
+ struct cmd_node *cur = q->head;
+ while (cur) {
+ struct cmd_node *tmp = cur;
+ cur = cur->next;
+ free(tmp->cmd);
+ free(tmp);
+ }
+ q->head = q->tail = NULL;
+ q->count = 0;
}
+static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd)
+{
+ struct cmd_node *node = malloc(sizeof(*node));
+ if (!node)
+ return;
+
+ node->cmd = malloc(len);
+ if (!node->cmd) {
+ free(node);
+ return;
+ }
+
+ memcpy(node->cmd, cmd, len);
+ node->seq = seq;
+ node->len = len;
+ node->next = NULL;
+
+ if (!pending.tail) {
+ pending.head = pending.tail = node;
+ } else {
+ pending.tail->next = node;
+ pending.tail = node;
+ }
+
+ pending.count++;
+}
+
+static void pending_gc(__u64 min_seq)
+{
+ struct cmd_node *cur = pending.head;
+
+ int n = pending.count;
+ while (cur && cur->seq < min_seq) {
+ struct cmd_node *tmp = cur;
+ cur = cur->next;
+
+ free(tmp->cmd);
+ free(tmp);
+ pending.count--;
+ }
+
+ printf("gc:%d\n", n-pending.count);
+
+ pending.head = cur;
+ if (!cur)
+ pending.tail = NULL;
+}
+
+static void pending_send_one(struct cmd_node *node){
+ int rt = send(sockfd, node->cmd, node->len, 0);
+ printf("send seq:%lld, rt=%d\n", node->seq, rt);
+}
+
+static void pending_send_all(void)
+{
+ struct cmd_node *cur = pending.head;
+
+ while (cur) {
+ pending_send_one(cur);
+ cur = cur->next;
+ }
+}
+
+/* ================= 网络逻辑 ================= */
+static void try_connect(void)
+{
+ if(sockfd > 0){
+ close(sockfd);
+ sockfd = -1;
+ }
+
+ struct sockaddr_in addr = {};
+ int i = 0;
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(peer_port);
+ inet_pton(AF_INET, peer_ip, &addr.sin_addr);
+
+ for(i = 0;i < 10; ++ i){
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ perror("socket");
+ return;
+ }
+
+ printf("connect try %d...\n", i + 1);
+ if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
+ printf("connect success: %s:%d\n", peer_ip, peer_port);
+ state = ONLINE;
+ pending_send_all();
+ return;
+ }
+
+ perror("connect");
+ close(sockfd);
+ sockfd = -1;
+
+ sleep(1);
+ }
+
+ printf("connect failed after 10 retries\n");
+}
+
+
+/* ================= perf buffer 回调 ================= */
+static void handle_event(void *ctx, int cpu, void *data, __u32 size)
+{
+ struct replica_event *evt = data;
+
+ switch (evt->type) {
+
+ case EVENT_SSYNC:
+ strncpy(peer_ip, evt->sync.ip, sizeof(peer_ip));
+ peer_port = evt->sync.port;
+ peer_seq = evt->sync.seq;
+ printf("SSYNC [seq:%lld], [%s:%d]\n", peer_seq, peer_ip, peer_port);
+
+ state = OFFLINE;
+ pending_gc(peer_seq);
+ break;
+
+ case EVENT_COMPLETED_CMD:
+ // printf("CMD [seq:%lld], cmd:\n[\n%s]\n", evt->complete.seq, evt->complete.cmd);
+ pending_push(evt->complete.seq,
+ evt->complete.len,
+ evt->complete.cmd);
+
+ if (state == ONLINE && pending.tail) {
+ struct cmd_node *n = pending.tail;
+ pending_send_one(n);
+ }
+ break;
+
+ case EVENT_SREADY:
+ printf("SREADY \n");
+ if (state == OFFLINE)
+ try_connect();
+ break;
+ }
+}
+
+/* ================= main ================= */
int main(int argc, char **argv)
{
struct replica_bpf *skel;
+ struct perf_buffer *pb = NULL;
int err;
- /* Set up libbpf errors and debug info callback */
- libbpf_set_print(libbpf_print_fn);
-
/* Open BPF application */
skel = replica_bpf__open();
if (!skel) {
@@ -190,9 +217,6 @@ int main(int argc, char **argv)
return 1;
}
- /* ensure BPF program only handles write() syscalls from our process */
- skel->bss->my_pid = getpid();
-
/* Load & verify BPF programs */
err = replica_bpf__load(skel);
if (err) {
@@ -211,47 +235,22 @@ int main(int argc, char **argv)
"to see output of the BPF programs.\n");
- struct perf_buffer *pb = perf_buffer__new(bpf_map__fd(skel->maps.channel), 8, handle_event, lost_event, NULL, NULL);
+ pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8,
+ handle_event, NULL, NULL, NULL);
+
if(!pb){
goto cleanup;
}
-#if 0
- while(1){
- perf_buffer__poll(pb, 1000);
- }
-#else
while (1) {
perf_buffer__poll(pb, 1000); // 处理事件
-
- // 循环中检查状态并发送
- if (current_state == DONE && slave_sock >= 0) {
- // 发送快照文件
- if (send_file(slave_sock, array_file) == 0 &&
- send_file(slave_sock, rbtree_file) == 0 &&
- send_file(slave_sock, hash_file) == 0) {
- current_state = ONLINE;
- printf("Snapshot sent, state to ONLINE\n");
- } else {
- printf("Failed to send snapshot\n");
- current_state = NOSLAVE;
- close(slave_sock);
- slave_sock = -1;
- }
- }
-
- if (current_state == ONLINE && slave_sock >= 0) {
- // 发送 pending
- queue_send_and_clear(&pending, slave_sock);
- }
}
-#endif
perf_buffer__free(pb);
cleanup:
- queue_free(&pending);
- if (slave_sock >= 0) close(slave_sock);
+ pending_free();
+ if (sockfd >= 0) close(sockfd);
replica_bpf__destroy(skel);
return -err;
}
diff --git a/ebpf/c/replica.h b/ebpf/c/replica.h
index 9cf3c1c..130cd0c 100644
--- a/ebpf/c/replica.h
+++ b/ebpf/c/replica.h
@@ -1,36 +1,37 @@
#ifndef __REPLICA_H__
#define __REPLICA_H__
+
+#define MAX_CMD_LEN 256
+#define MAX_IP_LEN 64
+
enum event_type {
- EVENT_CREATE_SNAPSHOT_ASYNC,
- EVENT_CREATE_SNAPSHOT_OK,
- EVENT_CREATE_SNAPSHOT_READY,
- EVENT_COMPLETED_CMD
+ EVENT_COMPLETED_CMD,
+ EVENT_SSYNC,
+ EVENT_SREADY,
};
-struct event {
- enum event_type type;
+struct complete_cmd_evt {
+ __u64 seq;
+ __u32 len;
+ __u8 cmd[MAX_CMD_LEN];
+};
+
+struct sync_evt {
+ __u64 seq;
+ char ip[MAX_IP_LEN];
+ __s32 port;
+};
+
+struct replica_event {
+ __u32 type;
+ __u32 _pad;
+
union {
- struct {
- char ip[16];
- __u32 port;
- } sync;
- struct {
- __u8 cmd[256];
- __u64 len;
- } cmd;
- struct {
- char array_file[128];
- char rbtree_file[128];
- char hash_file[128];
- } ok;
- } data;
+ struct complete_cmd_evt complete;
+ struct sync_evt sync;
+ };
};
-enum state {
- NOSLAVE,
- PREPARING,
- ONLINE
-};
#endif
\ No newline at end of file
diff --git a/kvs_array_bin.c b/kvs_array_bin.c
index 4a1046d..61e189f 100644
--- a/kvs_array_bin.c
+++ b/kvs_array_bin.c
@@ -237,13 +237,13 @@ int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename
task_t *t = submit_write(uring, fd, bufs, lens, count, current_off);
if (!t) { close(fd); return -4; }
- int res = task_wait(t);
- task_destroy(t);
+ // int res = task_wait(t);
+ // task_destroy(t);
- if (res < 0) {
- close(fd);
- return -5;
- }
+ // if (res < 0) {
+ // close(fd);
+ // return -5;
+ // }
current_off += (off_t) total;
}
diff --git a/kvs_hash_bin.c b/kvs_hash_bin.c
index 6a575c8..d99afd6 100755
--- a/kvs_hash_bin.c
+++ b/kvs_hash_bin.c
@@ -310,13 +310,13 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){
task_t *t = submit_write(uring, fd, bufs, lens, count, current_off);
if (!t) { close(fd); return -4; }
- int res = task_wait(t);
- task_destroy(t);
+ // int res = task_wait(t);
+ // task_destroy(t);
- if (res < 0) {
- close(fd);
- return -5;
- }
+ // if (res < 0) {
+ // close(fd);
+ // return -5;
+ // }
current_off += (off_t) total;
}
diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c
index 9c8ec7c..3418626 100644
--- a/kvs_protocol_resp.c
+++ b/kvs_protocol_resp.c
@@ -14,6 +14,8 @@ extern kvs_rbtree_t global_rbtree;
extern kvs_hash_t global_hash;
#endif
+extern unsigned long long global_seq;
+
static int need(const uint8_t *p, const uint8_t *end, size_t n) {
return (p + n <= end) ? 0 : -1;
}
@@ -543,10 +545,12 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) {
return 0;
}
case KVS_CMD_SSYNC:
+ __ssync(cmd->argv[1].ptr, cmd->argv[1].len, atoi(cmd->argv[2].ptr), global_seq);
kvs_create_snapshot_async(cmd->argv[1].ptr, atoi(cmd->argv[2].ptr));
*out_value = resp_simple("OK");
return 0;
case KVS_CMD_SREADY:
+ __sready();
*out_value = resp_simple("OK");
return 0;
default:
@@ -555,4 +559,19 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) {
*out_value = resp_error("ERR unknown command");
return 0;
+}
+
+void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq){
+ // hook 在这里,必须等待 ebpf实时同步进程 状态切换为 PREPARING 才允许返回
+ // 不这样做的话:快照立刻被创建,执行下一条命令(CmdX),如果此刻 ebpf 还没有被置为 PREPARING,即没有开始记录需要转发的命令
+ // 会导致:快照里没有(CmdX),ebpf实时同步进程里也没有(CmdX)
+ // 要怎么做?
+ // master 在内部维护seq,ebpf 从启动就一直记录数据。ebpf探测 __ssync 的seq来确定从哪里开始发送。
+ // ebpf: __ssync SSYNC ip port seq, 知道往哪里发,从哪里开始发
+ // ebpf: __sready SREADY, 可以开始发了,发往(ip,port)
+ // ebpf: __complete_cmd cmd,len,seq, 记录命令
+}
+
+void __sready(){
+
}
\ No newline at end of file
diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h
index b670737..dcbf2b7 100644
--- a/kvs_protocol_resp.h
+++ b/kvs_protocol_resp.h
@@ -90,4 +90,7 @@ resp_value_t resp_nil(void); /* $-1\r\n */
*/
int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value);
+void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq);
+void __sready();
+
#endif
\ No newline at end of file
diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c
index 9ee7a06..abe17e1 100644
--- a/kvs_rbtree_bin.c
+++ b/kvs_rbtree_bin.c
@@ -507,12 +507,12 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off
if (!t) { return -4; }
- int res = task_wait(t);
+ // int res = task_wait(t);
task_destroy(t);
- if (res < 0) {
- return -5;
- }
+ // if (res < 0) {
+ // return -5;
+ // }
*current_off += (off_t) total;
diff --git a/kvs_slave.c b/kvs_slave.c
index d0bdc9e..79bc1c3 100644
--- a/kvs_slave.c
+++ b/kvs_slave.c
@@ -192,17 +192,17 @@ int slave_bootstrap(
printf("Sent SREADY to master\n");
// 6. 接收回包+OK\r\n
- if (recv_exact(master_fd, resp, 5) < 0) {
- fprintf(stderr, "Failed to receive final OK from master\n");
- goto cleanup;
- }
+ // if (recv_exact(master_fd, resp, 5) < 0) {
+ // fprintf(stderr, "Failed to receive final OK from master\n");
+ // goto cleanup;
+ // }
- if (memcmp(resp, "+OK\r\n", 5) != 0) {
- fprintf(stderr, "Unexpected final response from master: %.5s\n", resp);
- goto cleanup;
- }
+ // if (memcmp(resp, "+OK\r\n", 5) != 0) {
+ // fprintf(stderr, "Unexpected final response from master: %.5s\n", resp);
+ // goto cleanup;
+ // }
- printf("Received final OK from master, bootstrap complete\n");
+ // printf("Received final OK from master, bootstrap complete\n");
ret = 0;
diff --git a/kvstore.c b/kvstore.c
index a426294..5fcd71e 100644
--- a/kvstore.c
+++ b/kvstore.c
@@ -21,16 +21,15 @@
extern int slave_bootstrap(const char *listen_ip, int listen_port, const char *master_ip, int master_port);
-#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
extern mp_pool_t global_mempool;
-#endif
AppConfig global_cfg;
iouring_ctx_t global_uring_ctx;
+unsigned long long global_seq;
extern int global_oplog_fd;
-void __completed_cmd(const uint8_t *cmd, size_t len){
+void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){
}
@@ -69,7 +68,8 @@ int kvs_protocol(struct conn* conn){
int dr = resp_dispatch(&cmd, &val);
- __completed_cmd(p, len);
+ __completed_cmd(p, len, global_seq);
+ global_seq ++;
/*
@@ -186,9 +186,10 @@ void dest_kvengine(void) {
}
void init_memory_pool(AppConfig *cfg){
-#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
- mp_create(&global_mempool);
-#endif
+
+ if(cfg->allocator == ALLOC_MYPOOL)
+ mp_create(&global_mempool);
+
kvs_set_memleak_detect(cfg->leak_mode);
kvs_set_alloc_type(cfg->allocator);
@@ -196,7 +197,7 @@ void init_memory_pool(AppConfig *cfg){
}
void dest_memory_pool(void){
-#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC
+#if MEMORY_USE_MYMALLOC
mp_destroy(&global_mempool);
#endif
}
@@ -278,9 +279,10 @@ int main(int argc, char *argv[]) {
printf("Init Config error");
return -1;
}
-
- init_data_file(&global_cfg);
+ global_seq = 0;
+ init_memory_pool(&global_cfg);
+ init_data_file(&global_cfg);
init_disk_uring(&global_uring_ctx);
int port = global_cfg.port;
@@ -296,9 +298,6 @@ int main(int argc, char *argv[]) {
}
-
-
- init_memory_pool(&global_cfg);
init_kvengine();
#if (NETWORK_SELECT == NETWORK_REACTOR)
diff --git a/kvstore.h b/kvstore.h
index 704c457..b0f386c 100644
--- a/kvstore.h
+++ b/kvstore.h
@@ -279,7 +279,8 @@ extern kvs_rbtree_t global_rbtree;
extern kvs_hash_t global_hash;
#endif
-void __completed_cmd(const uint8_t *cmd, size_t len);
+void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq);
+
#endif
diff --git a/memory/alloc_dispatch.c b/memory/alloc_dispatch.c
index ad4b9c9..4a1ee7e 100644
--- a/memory/alloc_dispatch.c
+++ b/memory/alloc_dispatch.c
@@ -9,7 +9,7 @@
mp_pool_t global_mempool;
-static atomic_int g_memory_mode = ATOMIC_VAR_INIT(MEMLEAK_DETECT_OFF);
+static atomic_int g_memory_mode = ATOMIC_VAR_INIT(ALLOC_OTHER);
static atomic_int g_memleak_detect_mode = ATOMIC_VAR_INIT(MEMLEAK_DETECT_OFF);
// 设置内存池类型
@@ -36,7 +36,7 @@ MemLeakDetectMode kvs_get_memleak_detect(void) {
void *kvs_malloc_impl(size_t size){
- switch (atomic_load(&g_memleak_detect_mode)){
+ switch (atomic_load(&g_memory_mode)){
case ALLOC_MALLOC:
return malloc(size);
case ALLOC_MYPOOL:
@@ -51,7 +51,7 @@ void *kvs_malloc_impl(size_t size){
}
void kvs_free_impl(void *ptr) {
- switch (atomic_load(&g_memleak_detect_mode)){
+ switch (atomic_load(&g_memory_mode)){
case ALLOC_MALLOC:
free(ptr);
break;
@@ -75,20 +75,20 @@ void kvs_free_impl(void *ptr) {
void *nMalloc(size_t size, const char * filename, const char *func, int line){
void *ptr = kvs_malloc_impl(size);
- if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) {
- char buff[128];
- snprintf(buff, 128, "./mem_leak/%p.mem", ptr);
- FILE* fp = fopen(buff, "w");
- if(!fp){
- kvs_free(ptr);
- return NULL;
- }
+ // if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) {
+ // char buff[128];
+ // snprintf(buff, 128, "./mem_leak/%p.mem", ptr);
+ // FILE* fp = fopen(buff, "w");
+ // if(!fp){
+ // kvs_free(ptr);
+ // return NULL;
+ // }
- fprintf(fp, "[+] [%s:%d:%s] [%p:%ld]\n", filename, line, func, ptr, size);
- fflush(fp);
- fclose(fp);
+ // fprintf(fp, "[+] [%s:%d:%s] [%p:%ld]\n", filename, line, func, ptr, size);
+ // fflush(fp);
+ // fclose(fp);
- }
+ // }
return ptr;
}
@@ -97,15 +97,15 @@ void nFree(void *ptr, const char * filename, const char *func, int line){
return ;
}
- if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) {
- char buff[128];
- snprintf(buff, 128, "./mem_leak/%p.mem", ptr);
+ // if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) {
+ // char buff[128];
+ // snprintf(buff, 128, "./mem_leak/%p.mem", ptr);
- if(unlink(buff) < 0) {
- return ;
- }
+ // if(unlink(buff) < 0) {
+ // return ;
+ // }
- }
+ // }
kvs_free_impl(ptr);
}
\ No newline at end of file
diff --git a/memory/mempool.c b/memory/mempool.c
index c4777d9..5f3dc93 100644
--- a/memory/mempool.c
+++ b/memory/mempool.c
@@ -74,7 +74,7 @@ static mp_page_t* mp_page_create(mp_bucket_t *owner){
pg->prev = NULL;
pg->next = NULL;
- bitmap_clear_all(pg->bitmap, 8);
+ bitmap_clear_all(pg->bitmap, 20);
char *p = (char*)page_payload(pg);
for(uint16_t i = 0;i < cap - 1; ++ i){
diff --git a/memory/mempool.h b/memory/mempool.h
index eae3d57..b3df5c0 100644
--- a/memory/mempool.h
+++ b/memory/mempool.h
@@ -6,11 +6,12 @@
#include
#include
-#define MEMPOOL_PAGE_SIZE 4096
+// #define MEMPOOL_PAGE_SIZE 4096
+#define MEMPOOL_PAGE_SIZE (1024*8)
#define MEMPOOL_BLOCK_MAX_SIZE 512
#define MEMPOOL_ALIGNMENT 8
#define MEMPOOL_NUM_CLASSES (MEMPOOL_BLOCK_MAX_SIZE / MEMPOOL_ALIGNMENT)
-#define MEMPOOL_CACHE_PAGE 2
+#define MEMPOOL_CACHE_PAGE 4
typedef struct mp_page_s mp_page_t;
typedef struct mp_bucket_s mp_bucket_t;
@@ -34,7 +35,7 @@ struct mp_page_s{
uint16_t free_count;
uint16_t capacity;
- uint64_t bitmap[8]; // 最多支持 512 个块 (64*8)
+ uint64_t bitmap[20]; // 最多支持 512/1280 个块 (64*20)
};
struct mp_bucket_s{
diff --git a/reactor.c b/reactor.c
index 1c4e490..4bb7ecd 100644
--- a/reactor.c
+++ b/reactor.c
@@ -336,6 +336,12 @@ int r_init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ int opt = 1;
+ if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
+ perror("setsockopt");
+ close(sockfd);
+ }
+
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
diff --git a/test-redis/test.c b/test-redis/test.c
index e44869a..555bb17 100644
--- a/test-redis/test.c
+++ b/test-redis/test.c
@@ -5,7 +5,8 @@
#include
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
-
+// #define PRINT printf
+#define PRINT
static void die(redisContext *c, const char *msg) {
fprintf(stderr, "%s: %s\n", msg, c && c->err ? c->errstr : "unknown");
@@ -38,6 +39,7 @@ static void must_bulk_eq(redisReply *r, const void *buf, size_t n, const char *w
if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); }
if (r->type != REDIS_REPLY_STRING || r->len != n || memcmp(r->str, buf, n) != 0) {
fprintf(stderr, "%s: bulk mismatch. type=%d len=%zu\n", what, r->type, r->len);
+ fprintf(stderr, "expect:%s, truely:%s\n", (const char*)buf, r->str);
freeReplyObject(r);
exit(1);
}
@@ -111,17 +113,18 @@ void pipline_set_test(redisContext *c, int start, int countN, const char *op){
die(c, "redisAppendCommand SET failed");
}
- if(i%10000 == 0) printf("%d\n", i);
-
+ if(i%10000 == 0) PRINT("SEND: %d\n", i);
}
/* 再一次性把 N 个回复读出来 */
for (int i = start; i < end; i++) {
redisReply *r = NULL;
if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply SET failed");
must_ok(r, "pipeline SET reply");
+
+ if(i%10000 == 0) PRINT("RECV: %d\n", i);
}
- printf("[OK] SET pipeline batch %d\n", N);
+ PRINT("[OK] SET pipeline batch %d\n", N);
}
void pipline_get_test(redisContext *c, int start, int countN, const char *op){
@@ -138,7 +141,7 @@ void pipline_get_test(redisContext *c, int start, int countN, const char *op){
die(c, "redisAppendCommand GET failed");
}
- if(i%10000 == 0) printf("%d\n", i);
+ if(i%10000 == 0) PRINT("SEND: %d\n", i);
}
for (int i = start; i < end; i++) {
redisReply *r = NULL;
@@ -146,9 +149,11 @@ void pipline_get_test(redisContext *c, int start, int countN, const char *op){
char expect[64];
int en = snprintf(expect, sizeof(expect), "v:%d", i);
must_bulk_eq(r, expect, (size_t)en, "pipeline GET reply");
+
+ if(i%10000 == 0) PRINT("RECV: %d\n", i);
}
- printf("[OK] GET pipeline batch %d\n", N);
+ PRINT("[OK] GET pipeline batch %d\n", N);
}
void pipline_del_test(redisContext *c, int start, int countN, const char *op){
@@ -165,17 +170,44 @@ void pipline_del_test(redisContext *c, int start, int countN, const char *op){
die(c, "redisAppendCommand DEL failed");
}
- if(i%10000 == 0) printf("%d\n", i);
+ if(i%10000 == 0) PRINT("SEND: %d\n", i);
}
for (int i = start; i < end; i++) {
redisReply *r = NULL;
if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply DEL failed");
freeReplyObject(r); /* DEL 返回 int,这里不强制检查 */
+
+ if(i%10000 == 0) PRINT("RECV: %d\n", i);
}
- printf("[OK] DEL pipeline batch %d\n", N);
+ PRINT("[OK] DEL pipeline batch %d\n", N);
}
+long long test_nopersist_noreplica(redisContext *c, int rounds, long long batch_size){
+ struct timeval tv_begin, tv_end;
+ gettimeofday(&tv_begin, NULL);
+
+ long long total_ops = batch_size*rounds;
+
+ for(int i = 0;i < total_ops ; i += batch_size){
+ pipline_set_test(c, i, batch_size, "RSET");
+ }
+
+ for(int i = 0;i < total_ops ; i += batch_size){
+ pipline_get_test(c, i, batch_size, "RGET");
+ }
+
+ for(int i = 0;i < total_ops ; i += batch_size){
+ pipline_del_test(c, i, batch_size, "RDEL");
+ }
+
+ gettimeofday(&tv_end, NULL);
+ int time_used = TIME_SUB_MS(tv_end, tv_begin);
+ long long qps = total_ops *3*1000/time_used;
+ printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n", total_ops *3, time_used, qps);
+
+ return qps;
+}
int main(int argc, char **argv) {
if(argc < 4) {
@@ -197,29 +229,18 @@ int main(int argc, char **argv) {
}else if(mode == 1){
basic_command_test(c);
}else if(mode == 3){
- struct timeval tv_begin, tv_end;
- gettimeofday(&tv_begin, NULL);
- long long ops = 100000;
-
- pipline_set_test(c, 0, ops, "RSET");
+ int rounds = 10;
+ long long batch_size = 100000;
+ int testrounds = 50;
+ long long total_qps = 0;
- gettimeofday(&tv_end, NULL);
- int time_used = TIME_SUB_MS(tv_end, tv_begin);
- long long qps = ops*1000/time_used;
- printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n",
- ops, time_used, qps);
+ for(int i = 0;i < testrounds; ++ i){
+ total_qps += test_nopersist_noreplica(c, rounds, batch_size);
+ }
+ printf("average qps:%lld\n", total_qps/testrounds);
}else if(mode == 4){
- struct timeval tv_begin, tv_end;
- gettimeofday(&tv_begin, NULL);
- long long ops = 100000;
- pipline_del_test(c, 0, ops, "RDEL");
-
- gettimeofday(&tv_end, NULL);
- int time_used = TIME_SUB_MS(tv_end, tv_begin);
- long long qps = ops*1000/time_used;
- printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n",
- ops, time_used, qps);
+ }else if(mode == 5){
}else if(mode == 10){
pipline_set_test(c, 0, 1000, "SET");