diff --git a/Makefile b/Makefile index 1366e3b..02c5d99 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,8 @@ SRCS = $(NET_SRCS) $(KV_SRCS) $(MEM_SRCS) $(COMMON_SRCS) $(DUMP_SRCS) INC = -I./NtyCo/core/ -I/usr/include/libxml2 -I./ LIBDIRS = -L./NtyCo/ -LIBS = -lntyco -lpthread -luring -ldl -lxml2 -ljemalloc +# LIBS = -lntyco -lpthread -luring -ldl -lxml2 -ljemalloc +LIBS = -lntyco -lpthread -luring -ldl -lxml2 TARGET = kvstore SUBDIR = ./NtyCo/ @@ -42,8 +43,11 @@ $(TEST_REDIS): $(TEST_REDIS_SRCS) %.o: %.c $(CC) $(CFLAGS) $(INC) -c $^ -g -o $@ -clean: clmem +clean: clmem cldata rm -rf $(OBJS) $(TARGET) $(TEST_REDIS) clmem: rm -rf mem_leak/* + +cldata: + rm -rf data/* diff --git a/README.md b/README.md index 2cdc6d7..630afcd 100644 --- a/README.md +++ b/README.md @@ -3,14 +3,14 @@ ## 需求 1. ntyco需要作为kvstore的submodule,通过git clone一次下载。 **完成**。 2. README需要包含编译步骤,测试方案与可行性,性能数据。 **完成**。 -3. 全量持久化保存数据集。 **BUG FIX,完成**。 +3. 全量持久化保存数据集。 **BUG FIX**。 4. 持久化的性能数据。 5. 特殊字符,可以解决redis的resp协议。 **完成**。 6. 实现配置文件,把日志级别,端口ip,主从模式,持久化方案。 **完成**。 7. 持久化落盘用io_uring,加载配置文件用mmap。 **完成**。 -8. 主从同步的性能,开启与关闭性能做到。 -9. 主从同步600w条,出现的coredump。 -10. 主从同步用ebpf实现。 +8. 主从同步的性能,开启与关闭性能做到5%?。 +9. 主从同步600w条,出现的coredump。 **完成**。 +10. 主从同步用ebpf实现。 **BUG FIX**。 11. 内存池测试qps与虚拟内存,物理内存。 12. 实现一个内存泄露检测组件。 **完成**。 @@ -34,6 +34,191 @@ make ``` ## 测试 +### 测试1:性能测试 +测试条件: +1. 不启用持久化。 +2. 不启用主从同步。 +2. pipline: + 1. RSET 100w 条, p:i v:i -> +OK + 2. RGET 100w 条, p:i -> +v:i + 3. RDEL 100w 条。 p:i -> +OK +3. 重复 15 次. +4. 本机发送请求。 + +内存分配: malloc +```bash +lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 +Connected to 192.168.10.129:8888 +BATCH (N=3000000) --> time_used=3294 ms, qps=910746 +BATCH (N=3000000) --> time_used=3501 ms, qps=856898 +BATCH (N=3000000) --> time_used=3457 ms, qps=867804 +BATCH (N=3000000) --> time_used=3351 ms, qps=895255 +BATCH (N=3000000) --> time_used=3320 ms, qps=903614 +BATCH (N=3000000) --> time_used=3551 ms, qps=844832 +BATCH (N=3000000) --> time_used=3354 ms, qps=894454 +BATCH (N=3000000) --> time_used=3475 ms, qps=863309 +BATCH (N=3000000) --> time_used=3404 ms, qps=881316 +BATCH (N=3000000) --> time_used=3460 ms, qps=867052 +BATCH (N=3000000) --> time_used=3392 ms, qps=884433 +BATCH (N=3000000) --> time_used=3427 ms, qps=875401 +BATCH (N=3000000) --> time_used=3441 ms, qps=871839 +BATCH (N=3000000) --> time_used=3471 ms, qps=864304 +BATCH (N=3000000) --> time_used=3354 ms, qps=894454 +BATCH (N=3000000) --> time_used=3447 ms, qps=870322 +BATCH (N=3000000) --> time_used=3364 ms, qps=891795 +BATCH (N=3000000) --> time_used=3200 ms, qps=937500 +BATCH (N=3000000) --> time_used=3159 ms, qps=949667 +BATCH (N=3000000) --> time_used=3482 ms, qps=861573 +BATCH (N=3000000) --> time_used=3474 ms, qps=863557 +BATCH (N=3000000) --> time_used=3591 ms, qps=835421 +BATCH (N=3000000) --> time_used=3466 ms, qps=865551 +BATCH (N=3000000) --> time_used=3425 ms, qps=875912 +BATCH (N=3000000) --> time_used=3346 ms, qps=896592 +BATCH (N=3000000) --> time_used=3532 ms, qps=849377 +BATCH (N=3000000) --> time_used=3471 ms, qps=864304 +BATCH (N=3000000) --> time_used=3616 ms, qps=829646 +BATCH (N=3000000) --> time_used=3403 ms, qps=881575 +BATCH (N=3000000) --> time_used=3426 ms, qps=875656 +BATCH (N=3000000) --> time_used=3493 ms, qps=858860 +BATCH (N=3000000) --> time_used=3411 ms, qps=879507 +BATCH (N=3000000) --> time_used=3422 ms, qps=876680 +BATCH (N=3000000) --> time_used=3556 ms, qps=843644 +BATCH (N=3000000) --> time_used=3285 ms, qps=913242 +BATCH (N=3000000) --> time_used=3486 ms, qps=860585 +BATCH (N=3000000) --> time_used=3427 ms, qps=875401 +BATCH (N=3000000) --> time_used=3563 ms, qps=841987 +BATCH (N=3000000) --> time_used=3304 ms, qps=907990 +BATCH (N=3000000) --> time_used=3582 ms, qps=837520 +BATCH (N=3000000) --> time_used=3468 ms, qps=865051 +BATCH (N=3000000) --> time_used=3360 ms, qps=892857 +BATCH (N=3000000) --> time_used=3426 ms, qps=875656 +BATCH (N=3000000) --> time_used=3186 ms, qps=941619 +BATCH (N=3000000) --> time_used=3251 ms, qps=922792 +BATCH (N=3000000) --> time_used=3400 ms, qps=882352 +BATCH (N=3000000) --> time_used=3446 ms, qps=870574 +BATCH (N=3000000) --> time_used=3302 ms, qps=908540 +BATCH (N=3000000) --> time_used=3072 ms, qps=976562 +BATCH (N=3000000) --> time_used=3458 ms, qps=867553 +average qps:880462 +ALL TESTS PASSED. +``` + +内存分配: 自实现内存池 +```bash +lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 +Connected to 192.168.10.129:8888 +BATCH (N=3000000) --> time_used=3552 ms, qps=844594 +BATCH (N=3000000) --> time_used=3404 ms, qps=881316 +BATCH (N=3000000) --> time_used=3083 ms, qps=973078 +BATCH (N=3000000) --> time_used=3315 ms, qps=904977 +BATCH (N=3000000) --> time_used=3462 ms, qps=866551 +BATCH (N=3000000) --> time_used=3334 ms, qps=899820 +BATCH (N=3000000) --> time_used=3358 ms, qps=893388 +BATCH (N=3000000) --> time_used=3423 ms, qps=876424 +BATCH (N=3000000) --> time_used=3319 ms, qps=903886 +BATCH (N=3000000) --> time_used=3327 ms, qps=901713 +BATCH (N=3000000) --> time_used=3157 ms, qps=950269 +BATCH (N=3000000) --> time_used=3241 ms, qps=925640 +BATCH (N=3000000) --> time_used=3301 ms, qps=908815 +BATCH (N=3000000) --> time_used=3345 ms, qps=896860 +BATCH (N=3000000) --> time_used=3319 ms, qps=903886 +BATCH (N=3000000) --> time_used=3312 ms, qps=905797 +BATCH (N=3000000) --> time_used=3337 ms, qps=899011 +BATCH (N=3000000) --> time_used=3309 ms, qps=906618 +BATCH (N=3000000) --> time_used=3385 ms, qps=886262 +BATCH (N=3000000) --> time_used=3328 ms, qps=901442 +BATCH (N=3000000) --> time_used=3194 ms, qps=939261 +BATCH (N=3000000) --> time_used=3309 ms, qps=906618 +BATCH (N=3000000) --> time_used=3262 ms, qps=919681 +BATCH (N=3000000) --> time_used=3314 ms, qps=905250 +BATCH (N=3000000) --> time_used=3382 ms, qps=887049 +BATCH (N=3000000) --> time_used=3296 ms, qps=910194 +BATCH (N=3000000) --> time_used=3331 ms, qps=900630 +BATCH (N=3000000) --> time_used=3279 ms, qps=914913 +BATCH (N=3000000) --> time_used=2996 ms, qps=1001335 +BATCH (N=3000000) --> time_used=3387 ms, qps=885739 +BATCH (N=3000000) --> time_used=3346 ms, qps=896592 +BATCH (N=3000000) --> time_used=3241 ms, qps=925640 +BATCH (N=3000000) --> time_used=3353 ms, qps=894721 +BATCH (N=3000000) --> time_used=3366 ms, qps=891265 +BATCH (N=3000000) --> time_used=3387 ms, qps=885739 +BATCH (N=3000000) --> time_used=3382 ms, qps=887049 +BATCH (N=3000000) --> time_used=3358 ms, qps=893388 +BATCH (N=3000000) --> time_used=3372 ms, qps=889679 +BATCH (N=3000000) --> time_used=3467 ms, qps=865301 +BATCH (N=3000000) --> time_used=3243 ms, qps=925069 +BATCH (N=3000000) --> time_used=3191 ms, qps=940144 +BATCH (N=3000000) --> time_used=3365 ms, qps=891530 +BATCH (N=3000000) --> time_used=3218 ms, qps=932256 +BATCH (N=3000000) --> time_used=3332 ms, qps=900360 +BATCH (N=3000000) --> time_used=3172 ms, qps=945775 +BATCH (N=3000000) --> time_used=3211 ms, qps=934288 +BATCH (N=3000000) --> time_used=3124 ms, qps=960307 +BATCH (N=3000000) --> time_used=3043 ms, qps=985869 +BATCH (N=3000000) --> time_used=3086 ms, qps=972132 +BATCH (N=3000000) --> time_used=3201 ms, qps=937207 +average qps:911106 +ALL TESTS PASSED. +``` + +内存分配:jemalloc +```shell +lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 +Connected to 192.168.10.129:8888 +BATCH (N=3000000) --> time_used=3197 ms, qps=938379 +BATCH (N=3000000) --> time_used=3221 ms, qps=931387 +BATCH (N=3000000) --> time_used=3360 ms, qps=892857 +BATCH (N=3000000) --> time_used=3292 ms, qps=911300 +BATCH (N=3000000) --> time_used=3407 ms, qps=880540 +BATCH (N=3000000) --> time_used=3317 ms, qps=904431 +BATCH (N=3000000) --> time_used=3337 ms, qps=899011 +BATCH (N=3000000) --> time_used=3384 ms, qps=886524 +BATCH (N=3000000) --> time_used=3355 ms, qps=894187 +BATCH (N=3000000) --> time_used=3379 ms, qps=887836 +BATCH (N=3000000) --> time_used=3243 ms, qps=925069 +BATCH (N=3000000) --> time_used=3377 ms, qps=888362 +BATCH (N=3000000) --> time_used=3212 ms, qps=933997 +BATCH (N=3000000) --> time_used=3248 ms, qps=923645 +BATCH (N=3000000) --> time_used=3234 ms, qps=927643 +BATCH (N=3000000) --> time_used=3152 ms, qps=951776 +BATCH (N=3000000) --> time_used=3089 ms, qps=971188 +BATCH (N=3000000) --> time_used=3287 ms, qps=912686 +BATCH (N=3000000) --> time_used=3079 ms, qps=974342 +BATCH (N=3000000) --> time_used=3261 ms, qps=919963 +BATCH (N=3000000) --> time_used=3123 ms, qps=960614 +BATCH (N=3000000) --> time_used=3234 ms, qps=927643 +BATCH (N=3000000) --> time_used=3056 ms, qps=981675 +BATCH (N=3000000) --> time_used=3040 ms, qps=986842 +BATCH (N=3000000) --> time_used=3187 ms, qps=941324 +BATCH (N=3000000) --> time_used=3311 ms, qps=906070 +BATCH (N=3000000) --> time_used=3155 ms, qps=950871 +BATCH (N=3000000) --> time_used=3318 ms, qps=904159 +BATCH (N=3000000) --> time_used=3372 ms, qps=889679 +BATCH (N=3000000) --> time_used=3254 ms, qps=921942 +BATCH (N=3000000) --> time_used=3386 ms, qps=886001 +BATCH (N=3000000) --> time_used=3413 ms, qps=878992 +BATCH (N=3000000) --> time_used=3474 ms, qps=863557 +BATCH (N=3000000) --> time_used=3412 ms, qps=879249 +BATCH (N=3000000) --> time_used=3414 ms, qps=878734 +BATCH (N=3000000) --> time_used=3325 ms, qps=902255 +BATCH (N=3000000) --> time_used=3346 ms, qps=896592 +BATCH (N=3000000) --> time_used=3345 ms, qps=896860 +BATCH (N=3000000) --> time_used=3582 ms, qps=837520 +BATCH (N=3000000) --> time_used=3412 ms, qps=879249 +BATCH (N=3000000) --> time_used=3370 ms, qps=890207 +BATCH (N=3000000) --> time_used=3375 ms, qps=888888 +BATCH (N=3000000) --> time_used=3190 ms, qps=940438 +BATCH (N=3000000) --> time_used=3324 ms, qps=902527 +BATCH (N=3000000) --> time_used=3253 ms, qps=922225 +BATCH (N=3000000) --> time_used=3230 ms, qps=928792 +BATCH (N=3000000) --> time_used=3294 ms, qps=910746 +BATCH (N=3000000) --> time_used=3295 ms, qps=910470 +BATCH (N=3000000) --> time_used=3148 ms, qps=952986 +BATCH (N=3000000) --> time_used=3228 ms, qps=929368 +average qps:914031 +ALL TESTS PASSED. +``` + ### 面试题 1. 为什么会实现kvstore,使用场景在哪里? diff --git a/config/config.xml b/config/config.xml index f1e2e71..bf8a219 100644 --- a/config/config.xml +++ b/config/config.xml @@ -17,7 +17,7 @@ - incremental + none data kvs_oplog.db @@ -27,7 +27,7 @@ - mypool + malloc disable diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index a0666ce..aafdd46 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -110,6 +110,13 @@ static void *worker_main(void *arg) task_t *done = (task_t *)(uintptr_t)cqe->user_data; task_finish(done, cqe->res); + // todo: 失败应该通知主线程,提供一个链表,把失败的taskpush进去。主线程用定时器任务定时处理失败信息然后destory。 + // 暂时用打印说明失败 + if(done->res < 0){ + printf("uring failed: fd:%d, offset:%ld\n", done->fd, done->off); + } + task_destroy(done); + io_uring_cqe_seen(&ctx->ring, cqe); } } diff --git a/dump/kvs_dump.h b/dump/kvs_dump.h index 95b9a51..bab43b3 100644 --- a/dump/kvs_dump.h +++ b/dump/kvs_dump.h @@ -11,7 +11,6 @@ extern char global_hash_file[256]; int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file); int kvs_create_snapshot_async(const char *ip, int port); -void __complete_snapshot(const char *ip, int port, const char *array_file, const char *rbtree_file, const char *hash_file); extern int global_oplog_fd; diff --git a/dump/kvs_oplog.c b/dump/kvs_oplog.c index e6ccef1..6a9f857 100644 --- a/dump/kvs_oplog.c +++ b/dump/kvs_oplog.c @@ -54,8 +54,8 @@ int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd){ return -4; } - task_wait(t); - task_destroy(t); + // task_wait(t); + // task_destroy(t); return 0; } diff --git a/dump/kvs_snapshot.c b/dump/kvs_snapshot.c index ba11b85..17d8564 100644 --- a/dump/kvs_snapshot.c +++ b/dump/kvs_snapshot.c @@ -1,5 +1,6 @@ #include "kvstore.h" #include "diskuring/diskuring.h" +#include "kvs_dump.h" #include #include #include @@ -39,10 +40,6 @@ int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char return ret; } -void __complete_snapshot(const char *ip, int port, const char *array_file, const char *rbtree_file, const char *hash_file){ - -} - static int send_file_to_ipport(const char *ip, int port, const char *filename) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket"); return -1; } @@ -119,9 +116,6 @@ int kvs_create_snapshot_async(const char *ip, int port){ _exit(1); } - // hook 通知 eBPF - __complete_snapshot(ip, port, tmp_array, tmp_rbtree, tmp_hash); - iouring_shutdown(&uring); _exit(0); } else { diff --git a/ebpf/c/replica.bpf.c b/ebpf/c/replica.bpf.c index b9e43b0..14c6778 100644 --- a/ebpf/c/replica.bpf.c +++ b/ebpf/c/replica.bpf.c @@ -1,103 +1,80 @@ // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* Copyright (c) 2020 Facebook */ -#include -#include +#include "vmlinux.h" #include #include +#include #include "replica.h" char LICENSE[] SEC("license") = "Dual BSD/GPL"; -int my_pid = 0; - struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __uint(key_size, sizeof(int)); - __uint(value_size, sizeof(int)); -} channel SEC(".maps"); + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(key_size, sizeof(int)); + __uint(value_size, sizeof(int)); +} events SEC(".maps"); -SEC("uprobe/kvs_create_snapshot_async") -int uprobe_create_snapshot_async(struct pt_regs *ctx) +/* __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); */ +SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__completed_cmd") +int BPF_KPROBE(handle_completed_cmd, + const __u8 *cmd, size_t len, __u64 seq) { - struct event ev; - __builtin_memset(&ev, 0, sizeof(ev)); + struct replica_event evt = {}; + __u32 copy_len; - const char *ip; - __u32 port; + evt.type = EVENT_COMPLETED_CMD; + evt.complete.seq = seq; - ev.type = EVENT_CREATE_SNAPSHOT_ASYNC; + copy_len = len; + if (copy_len > MAX_CMD_LEN) + copy_len = MAX_CMD_LEN; - ip = (const char *)PT_REGS_PARM1(ctx); - port = (__u32)PT_REGS_PARM2(ctx); + evt.complete.len = copy_len; - bpf_probe_read_user_str(ev.data.sync.ip, - sizeof(ev.data.sync.ip), - ip); - ev.data.sync.port = port; + if (cmd) + bpf_probe_read_user(evt.complete.cmd, copy_len, cmd); - bpf_perf_event_output(ctx, &channel, + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, - &ev, sizeof(ev)); + &evt, sizeof(evt)); return 0; } -SEC("uprobe/__compeleted_cmd") -int uprobe_completed_cmd(struct pt_regs *ctx) +/* __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); */ +SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__ssync") +int BPF_KPROBE(handle_ssync, + const __u8 *ip, __u32 ip_len, int port, __u64 seq) { - struct event ev; - __builtin_memset(&ev, 0, sizeof(ev)); + struct replica_event evt = {}; - const __u8 *cmd; - __u32 len; + evt.type = EVENT_SSYNC; + evt.sync.seq = seq; + evt.sync.port = port; - ev.type = EVENT_COMPLETED_CMD; + __u32 copy_len = ip_len; + if (copy_len > sizeof(evt.sync.ip)) + copy_len = sizeof(evt.sync.ip); - cmd = (const __u8 *)PT_REGS_PARM1(ctx); - len = (__u32)PT_REGS_PARM2(ctx); + if (ip) + bpf_probe_read_user(evt.sync.ip, copy_len, ip); - if (len > sizeof(ev.data.cmd.cmd)) - len = sizeof(ev.data.cmd.cmd); - - ev.data.cmd.len = len; - - bpf_probe_read_user(ev.data.cmd.cmd, len, cmd); - - bpf_perf_event_output(ctx, &channel, + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, - &ev, sizeof(ev)); + &evt, sizeof(evt)); return 0; } - -SEC("uprobe/__create_snapshot_ok") -int uprobe_create_snapshot_ok(struct pt_regs *ctx) +/* __sready(void); */ +SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__sready") +int BPF_KPROBE(handle_sready) { - struct event ev; - __builtin_memset(&ev, 0, sizeof(ev)); - - const char *array_file; - const char *rbtree_file; - const char *hash_file; + struct replica_event evt = {}; - ev.type = EVENT_CREATE_SNAPSHOT_OK; + evt.type = EVENT_SREADY; - array_file = (const char *)PT_REGS_PARM1(ctx); - rbtree_file = (const char *)PT_REGS_PARM2(ctx); - hash_file = (const char *)PT_REGS_PARM3(ctx); - - bpf_probe_read_user_str(ev.data.ok.array_file, - sizeof(ev.data.ok.array_file), - array_file); - bpf_probe_read_user_str(ev.data.ok.rbtree_file, - sizeof(ev.data.ok.rbtree_file), - rbtree_file); - bpf_probe_read_user_str(ev.data.ok.hash_file, - sizeof(ev.data.ok.hash_file), - hash_file); - - bpf_perf_event_output(ctx, &channel, + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, - &ev, sizeof(ev)); + &evt, sizeof(evt)); return 0; } \ No newline at end of file diff --git a/ebpf/c/replica.c b/ebpf/c/replica.c index c373ab2..87d454c 100644 --- a/ebpf/c/replica.c +++ b/ebpf/c/replica.c @@ -2,6 +2,8 @@ /* Copyright (c) 2020 Facebook */ #include #include +#include +#include #include #include #include "replica.skel.h" @@ -12,10 +14,15 @@ #include "replica.h" +typedef enum { + OFFLINE = 0, + ONLINE = 1, +}replica_state_e ; struct cmd_node { + __u64 seq; + __u32 len; uint8_t *cmd; - size_t len; struct cmd_node *next; }; @@ -25,164 +32,184 @@ struct pending_queue { int count; }; -static void queue_init(struct pending_queue *q) { - q->head = q->tail = NULL; - q->count = 0; -} +/* ================= 全局状态 ================= */ -static void queue_push(struct pending_queue *q, const uint8_t *cmd, size_t len) { - struct cmd_node *node = malloc(sizeof(*node)); - if (!node) return; - node->cmd = malloc(len); - if (!node->cmd) { free(node); return; } - memcpy(node->cmd, cmd, len); - node->len = len; - node->next = NULL; - if (q->tail) q->tail->next = node; - else q->head = node; - q->tail = node; - q->count++; -} +static replica_state_e state = OFFLINE; +static int sockfd = -1; -static void queue_send_and_clear(struct pending_queue *q, int sock) { - struct cmd_node *node = q->head; - int sent_count = 0; - while (node) { - if (send(sock, node->cmd, node->len, 0) > 0) { - sent_count++; - } - struct cmd_node *tmp = node; - node = node->next; - free(tmp->cmd); - free(tmp); - } - if (sent_count > 0) { - printf("[QUEUE] Sent %d commands to slave\n", sent_count); - } - queue_init(q); -} +static char peer_ip[MAX_IP_LEN]; +static int peer_port; +static __u64 peer_seq; -static void queue_free(struct pending_queue *q) { - struct cmd_node *node = q->head; - while (node) { - struct cmd_node *tmp = node; - node = node->next; - free(tmp->cmd); - free(tmp); - } - queue_init(q); -} +static struct pending_queue pending = { + .head = NULL, + .tail = NULL, + .count = 0, +}; -static int send_file(int sock, const char *path) { - FILE *fp = fopen(path, "rb"); - if (!fp) { - printf("[ERROR] Failed to open file: %s\n", path); - return -1; - } - - char buf[4096]; - size_t n, total = 0; - while ((n = fread(buf, 1, sizeof(buf), fp)) > 0) { - if (send(sock, buf, n, 0) < 0) { - fclose(fp); - printf("[ERROR] Failed to send file: %s (sent %zu bytes)\n", path, total); - return -1; - } - total += n; - } - fclose(fp); - printf("[FILE] Sent %s (%zu bytes)\n", path, total); - return 0; -} - -// 全局状态(单 Slave 简化) -static enum state current_state = NOSLAVE; -static char slave_ip[16] = {0}; -static int slave_port = 0; -static char array_file[128] = {0}; -static char rbtree_file[128] = {0}; -static char hash_file[128] = {0}; -static struct pending_queue pending; -static int slave_sock = -1; // 连接 Slave 的 socket - -// 连接 Slave -static int connect_slave() { - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) return -1; - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(slave_port); - inet_pton(AF_INET, slave_ip, &addr.sin_addr); - if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - close(sock); - return -1; - } - return sock; -} - -static void handle_event(void *ctx, int cpu, void *data, __u32 size) { - struct event *ev = (struct event *)data; - - switch (ev->type) { - case EVENT_CREATE_SNAPSHOT_ASYNC: - printf("[EVENT] Type: CREATE_SNAPSHOT_ASYNC\n"); - printf("[EVENT] Slave IP: %s, Port: %u\n", ev->data.sync.ip, ev->data.sync.port); - - if (current_state == NOSLAVE) { - current_state = START; - strncpy(slave_ip, ev->data.sync.ip, sizeof(slave_ip)); - slave_port = ev->data.sync.port; - queue_init(&pending); - slave_sock = connect_slave(); // 连接 Slave - if (slave_sock < 0) { - printf("Failed to connect to Slave %s:%d\n", slave_ip, slave_port); - current_state = NOSLAVE; - } - } - break; - case EVENT_COMPLETED_CMD: - printf("[EVENT] Type: COMPLETED_CMD\n"); - printf("[EVENT] Command length: %llu bytes\n", ev->data.cmd.len); - - if (current_state != NOSLAVE) { - queue_push(&pending, ev->data.cmd.cmd, ev->data.cmd.len); - } - break; - case EVENT_CREATE_SNAPSHOT_OK: - printf("[EVENT] Type: CREATE_SNAPSHOT_OK\n"); - printf("[EVENT] Array file: %s\n", ev->data.ok.array_file); - printf("[EVENT] RBTree file: %s\n", ev->data.ok.rbtree_file); - printf("[EVENT] Hash file: %s\n", ev->data.ok.hash_file); - - if (current_state == START) { - current_state = DONE; - strncpy(array_file, ev->data.ok.array_file, sizeof(array_file)); - strncpy(rbtree_file, ev->data.ok.rbtree_file, sizeof(rbtree_file)); - strncpy(hash_file, ev->data.ok.hash_file, sizeof(hash_file)); - } - break; - } -} - - -static void lost_event(void *ctx, int cpu, __u64 cnt) { - printf("Lost %llu events\n", cnt); -} - -static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args) +/* ================= pending 队列操作 ================= */ +static void pending_free() { - return vfprintf(stderr, format, args); + struct pending_queue *q = &pending; + struct cmd_node *cur = q->head; + while (cur) { + struct cmd_node *tmp = cur; + cur = cur->next; + free(tmp->cmd); + free(tmp); + } + q->head = q->tail = NULL; + q->count = 0; } +static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd) +{ + struct cmd_node *node = malloc(sizeof(*node)); + if (!node) + return; + + node->cmd = malloc(len); + if (!node->cmd) { + free(node); + return; + } + + memcpy(node->cmd, cmd, len); + node->seq = seq; + node->len = len; + node->next = NULL; + + if (!pending.tail) { + pending.head = pending.tail = node; + } else { + pending.tail->next = node; + pending.tail = node; + } + + pending.count++; +} + +static void pending_gc(__u64 min_seq) +{ + struct cmd_node *cur = pending.head; + + int n = pending.count; + while (cur && cur->seq < min_seq) { + struct cmd_node *tmp = cur; + cur = cur->next; + + free(tmp->cmd); + free(tmp); + pending.count--; + } + + printf("gc:%d\n", n-pending.count); + + pending.head = cur; + if (!cur) + pending.tail = NULL; +} + +static void pending_send_one(struct cmd_node *node){ + int rt = send(sockfd, node->cmd, node->len, 0); + printf("send seq:%lld, rt=%d\n", node->seq, rt); +} + +static void pending_send_all(void) +{ + struct cmd_node *cur = pending.head; + + while (cur) { + pending_send_one(cur); + cur = cur->next; + } +} + +/* ================= 网络逻辑 ================= */ +static void try_connect(void) +{ + if(sockfd > 0){ + close(sockfd); + sockfd = -1; + } + + struct sockaddr_in addr = {}; + int i = 0; + + addr.sin_family = AF_INET; + addr.sin_port = htons(peer_port); + inet_pton(AF_INET, peer_ip, &addr.sin_addr); + + for(i = 0;i < 10; ++ i){ + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + perror("socket"); + return; + } + + printf("connect try %d...\n", i + 1); + if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == 0) { + printf("connect success: %s:%d\n", peer_ip, peer_port); + state = ONLINE; + pending_send_all(); + return; + } + + perror("connect"); + close(sockfd); + sockfd = -1; + + sleep(1); + } + + printf("connect failed after 10 retries\n"); +} + + +/* ================= perf buffer 回调 ================= */ +static void handle_event(void *ctx, int cpu, void *data, __u32 size) +{ + struct replica_event *evt = data; + + switch (evt->type) { + + case EVENT_SSYNC: + strncpy(peer_ip, evt->sync.ip, sizeof(peer_ip)); + peer_port = evt->sync.port; + peer_seq = evt->sync.seq; + printf("SSYNC [seq:%lld], [%s:%d]\n", peer_seq, peer_ip, peer_port); + + state = OFFLINE; + pending_gc(peer_seq); + break; + + case EVENT_COMPLETED_CMD: + // printf("CMD [seq:%lld], cmd:\n[\n%s]\n", evt->complete.seq, evt->complete.cmd); + pending_push(evt->complete.seq, + evt->complete.len, + evt->complete.cmd); + + if (state == ONLINE && pending.tail) { + struct cmd_node *n = pending.tail; + pending_send_one(n); + } + break; + + case EVENT_SREADY: + printf("SREADY \n"); + if (state == OFFLINE) + try_connect(); + break; + } +} + +/* ================= main ================= */ int main(int argc, char **argv) { struct replica_bpf *skel; + struct perf_buffer *pb = NULL; int err; - /* Set up libbpf errors and debug info callback */ - libbpf_set_print(libbpf_print_fn); - /* Open BPF application */ skel = replica_bpf__open(); if (!skel) { @@ -190,9 +217,6 @@ int main(int argc, char **argv) return 1; } - /* ensure BPF program only handles write() syscalls from our process */ - skel->bss->my_pid = getpid(); - /* Load & verify BPF programs */ err = replica_bpf__load(skel); if (err) { @@ -211,47 +235,22 @@ int main(int argc, char **argv) "to see output of the BPF programs.\n"); - struct perf_buffer *pb = perf_buffer__new(bpf_map__fd(skel->maps.channel), 8, handle_event, lost_event, NULL, NULL); + pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8, + handle_event, NULL, NULL, NULL); + if(!pb){ goto cleanup; } -#if 0 - while(1){ - perf_buffer__poll(pb, 1000); - } -#else while (1) { perf_buffer__poll(pb, 1000); // 处理事件 - - // 循环中检查状态并发送 - if (current_state == DONE && slave_sock >= 0) { - // 发送快照文件 - if (send_file(slave_sock, array_file) == 0 && - send_file(slave_sock, rbtree_file) == 0 && - send_file(slave_sock, hash_file) == 0) { - current_state = ONLINE; - printf("Snapshot sent, state to ONLINE\n"); - } else { - printf("Failed to send snapshot\n"); - current_state = NOSLAVE; - close(slave_sock); - slave_sock = -1; - } - } - - if (current_state == ONLINE && slave_sock >= 0) { - // 发送 pending - queue_send_and_clear(&pending, slave_sock); - } } -#endif perf_buffer__free(pb); cleanup: - queue_free(&pending); - if (slave_sock >= 0) close(slave_sock); + pending_free(); + if (sockfd >= 0) close(sockfd); replica_bpf__destroy(skel); return -err; } diff --git a/ebpf/c/replica.h b/ebpf/c/replica.h index 9cf3c1c..130cd0c 100644 --- a/ebpf/c/replica.h +++ b/ebpf/c/replica.h @@ -1,36 +1,37 @@ #ifndef __REPLICA_H__ #define __REPLICA_H__ + +#define MAX_CMD_LEN 256 +#define MAX_IP_LEN 64 + enum event_type { - EVENT_CREATE_SNAPSHOT_ASYNC, - EVENT_CREATE_SNAPSHOT_OK, - EVENT_CREATE_SNAPSHOT_READY, - EVENT_COMPLETED_CMD + EVENT_COMPLETED_CMD, + EVENT_SSYNC, + EVENT_SREADY, }; -struct event { - enum event_type type; +struct complete_cmd_evt { + __u64 seq; + __u32 len; + __u8 cmd[MAX_CMD_LEN]; +}; + +struct sync_evt { + __u64 seq; + char ip[MAX_IP_LEN]; + __s32 port; +}; + +struct replica_event { + __u32 type; + __u32 _pad; + union { - struct { - char ip[16]; - __u32 port; - } sync; - struct { - __u8 cmd[256]; - __u64 len; - } cmd; - struct { - char array_file[128]; - char rbtree_file[128]; - char hash_file[128]; - } ok; - } data; + struct complete_cmd_evt complete; + struct sync_evt sync; + }; }; -enum state { - NOSLAVE, - PREPARING, - ONLINE -}; #endif \ No newline at end of file diff --git a/kvs_array_bin.c b/kvs_array_bin.c index 4a1046d..61e189f 100644 --- a/kvs_array_bin.c +++ b/kvs_array_bin.c @@ -237,13 +237,13 @@ int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); if (!t) { close(fd); return -4; } - int res = task_wait(t); - task_destroy(t); + // int res = task_wait(t); + // task_destroy(t); - if (res < 0) { - close(fd); - return -5; - } + // if (res < 0) { + // close(fd); + // return -5; + // } current_off += (off_t) total; } diff --git a/kvs_hash_bin.c b/kvs_hash_bin.c index 6a575c8..d99afd6 100755 --- a/kvs_hash_bin.c +++ b/kvs_hash_bin.c @@ -310,13 +310,13 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){ task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); if (!t) { close(fd); return -4; } - int res = task_wait(t); - task_destroy(t); + // int res = task_wait(t); + // task_destroy(t); - if (res < 0) { - close(fd); - return -5; - } + // if (res < 0) { + // close(fd); + // return -5; + // } current_off += (off_t) total; } diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index 9c8ec7c..3418626 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -14,6 +14,8 @@ extern kvs_rbtree_t global_rbtree; extern kvs_hash_t global_hash; #endif +extern unsigned long long global_seq; + static int need(const uint8_t *p, const uint8_t *end, size_t n) { return (p + n <= end) ? 0 : -1; } @@ -543,10 +545,12 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { return 0; } case KVS_CMD_SSYNC: + __ssync(cmd->argv[1].ptr, cmd->argv[1].len, atoi(cmd->argv[2].ptr), global_seq); kvs_create_snapshot_async(cmd->argv[1].ptr, atoi(cmd->argv[2].ptr)); *out_value = resp_simple("OK"); return 0; case KVS_CMD_SREADY: + __sready(); *out_value = resp_simple("OK"); return 0; default: @@ -555,4 +559,19 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { *out_value = resp_error("ERR unknown command"); return 0; +} + +void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq){ + // hook 在这里,必须等待 ebpf实时同步进程 状态切换为 PREPARING 才允许返回 + // 不这样做的话:快照立刻被创建,执行下一条命令(CmdX),如果此刻 ebpf 还没有被置为 PREPARING,即没有开始记录需要转发的命令 + // 会导致:快照里没有(CmdX),ebpf实时同步进程里也没有(CmdX) + // 要怎么做? + // master 在内部维护seq,ebpf 从启动就一直记录数据。ebpf探测 __ssync 的seq来确定从哪里开始发送。 + // ebpf: __ssync SSYNC ip port seq, 知道往哪里发,从哪里开始发 + // ebpf: __sready SREADY, 可以开始发了,发往(ip,port) + // ebpf: __complete_cmd cmd,len,seq, 记录命令 +} + +void __sready(){ + } \ No newline at end of file diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h index b670737..dcbf2b7 100644 --- a/kvs_protocol_resp.h +++ b/kvs_protocol_resp.h @@ -90,4 +90,7 @@ resp_value_t resp_nil(void); /* $-1\r\n */ */ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value); +void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); +void __sready(); + #endif \ No newline at end of file diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index 9ee7a06..abe17e1 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -507,12 +507,12 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off if (!t) { return -4; } - int res = task_wait(t); + // int res = task_wait(t); task_destroy(t); - if (res < 0) { - return -5; - } + // if (res < 0) { + // return -5; + // } *current_off += (off_t) total; diff --git a/kvs_slave.c b/kvs_slave.c index d0bdc9e..79bc1c3 100644 --- a/kvs_slave.c +++ b/kvs_slave.c @@ -192,17 +192,17 @@ int slave_bootstrap( printf("Sent SREADY to master\n"); // 6. 接收回包+OK\r\n - if (recv_exact(master_fd, resp, 5) < 0) { - fprintf(stderr, "Failed to receive final OK from master\n"); - goto cleanup; - } + // if (recv_exact(master_fd, resp, 5) < 0) { + // fprintf(stderr, "Failed to receive final OK from master\n"); + // goto cleanup; + // } - if (memcmp(resp, "+OK\r\n", 5) != 0) { - fprintf(stderr, "Unexpected final response from master: %.5s\n", resp); - goto cleanup; - } + // if (memcmp(resp, "+OK\r\n", 5) != 0) { + // fprintf(stderr, "Unexpected final response from master: %.5s\n", resp); + // goto cleanup; + // } - printf("Received final OK from master, bootstrap complete\n"); + // printf("Received final OK from master, bootstrap complete\n"); ret = 0; diff --git a/kvstore.c b/kvstore.c index a426294..5fcd71e 100644 --- a/kvstore.c +++ b/kvstore.c @@ -21,16 +21,15 @@ extern int slave_bootstrap(const char *listen_ip, int listen_port, const char *master_ip, int master_port); -#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC extern mp_pool_t global_mempool; -#endif AppConfig global_cfg; iouring_ctx_t global_uring_ctx; +unsigned long long global_seq; extern int global_oplog_fd; -void __completed_cmd(const uint8_t *cmd, size_t len){ +void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ } @@ -69,7 +68,8 @@ int kvs_protocol(struct conn* conn){ int dr = resp_dispatch(&cmd, &val); - __completed_cmd(p, len); + __completed_cmd(p, len, global_seq); + global_seq ++; /* @@ -186,9 +186,10 @@ void dest_kvengine(void) { } void init_memory_pool(AppConfig *cfg){ -#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC - mp_create(&global_mempool); -#endif + + if(cfg->allocator == ALLOC_MYPOOL) + mp_create(&global_mempool); + kvs_set_memleak_detect(cfg->leak_mode); kvs_set_alloc_type(cfg->allocator); @@ -196,7 +197,7 @@ void init_memory_pool(AppConfig *cfg){ } void dest_memory_pool(void){ -#if MEMORY_SELECT_MALLOC == MEMORY_USE_MYMALLOC +#if MEMORY_USE_MYMALLOC mp_destroy(&global_mempool); #endif } @@ -278,9 +279,10 @@ int main(int argc, char *argv[]) { printf("Init Config error"); return -1; } - - init_data_file(&global_cfg); + global_seq = 0; + init_memory_pool(&global_cfg); + init_data_file(&global_cfg); init_disk_uring(&global_uring_ctx); int port = global_cfg.port; @@ -296,9 +298,6 @@ int main(int argc, char *argv[]) { } - - - init_memory_pool(&global_cfg); init_kvengine(); #if (NETWORK_SELECT == NETWORK_REACTOR) diff --git a/kvstore.h b/kvstore.h index 704c457..b0f386c 100644 --- a/kvstore.h +++ b/kvstore.h @@ -279,7 +279,8 @@ extern kvs_rbtree_t global_rbtree; extern kvs_hash_t global_hash; #endif -void __completed_cmd(const uint8_t *cmd, size_t len); +void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); + #endif diff --git a/memory/alloc_dispatch.c b/memory/alloc_dispatch.c index ad4b9c9..4a1ee7e 100644 --- a/memory/alloc_dispatch.c +++ b/memory/alloc_dispatch.c @@ -9,7 +9,7 @@ mp_pool_t global_mempool; -static atomic_int g_memory_mode = ATOMIC_VAR_INIT(MEMLEAK_DETECT_OFF); +static atomic_int g_memory_mode = ATOMIC_VAR_INIT(ALLOC_OTHER); static atomic_int g_memleak_detect_mode = ATOMIC_VAR_INIT(MEMLEAK_DETECT_OFF); // 设置内存池类型 @@ -36,7 +36,7 @@ MemLeakDetectMode kvs_get_memleak_detect(void) { void *kvs_malloc_impl(size_t size){ - switch (atomic_load(&g_memleak_detect_mode)){ + switch (atomic_load(&g_memory_mode)){ case ALLOC_MALLOC: return malloc(size); case ALLOC_MYPOOL: @@ -51,7 +51,7 @@ void *kvs_malloc_impl(size_t size){ } void kvs_free_impl(void *ptr) { - switch (atomic_load(&g_memleak_detect_mode)){ + switch (atomic_load(&g_memory_mode)){ case ALLOC_MALLOC: free(ptr); break; @@ -75,20 +75,20 @@ void kvs_free_impl(void *ptr) { void *nMalloc(size_t size, const char * filename, const char *func, int line){ void *ptr = kvs_malloc_impl(size); - if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) { - char buff[128]; - snprintf(buff, 128, "./mem_leak/%p.mem", ptr); - FILE* fp = fopen(buff, "w"); - if(!fp){ - kvs_free(ptr); - return NULL; - } + // if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) { + // char buff[128]; + // snprintf(buff, 128, "./mem_leak/%p.mem", ptr); + // FILE* fp = fopen(buff, "w"); + // if(!fp){ + // kvs_free(ptr); + // return NULL; + // } - fprintf(fp, "[+] [%s:%d:%s] [%p:%ld]\n", filename, line, func, ptr, size); - fflush(fp); - fclose(fp); + // fprintf(fp, "[+] [%s:%d:%s] [%p:%ld]\n", filename, line, func, ptr, size); + // fflush(fp); + // fclose(fp); - } + // } return ptr; } @@ -97,15 +97,15 @@ void nFree(void *ptr, const char * filename, const char *func, int line){ return ; } - if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) { - char buff[128]; - snprintf(buff, 128, "./mem_leak/%p.mem", ptr); + // if(atomic_load(&g_memleak_detect_mode) == MEMLEAK_DETECT_ON) { + // char buff[128]; + // snprintf(buff, 128, "./mem_leak/%p.mem", ptr); - if(unlink(buff) < 0) { - return ; - } + // if(unlink(buff) < 0) { + // return ; + // } - } + // } kvs_free_impl(ptr); } \ No newline at end of file diff --git a/memory/mempool.c b/memory/mempool.c index c4777d9..5f3dc93 100644 --- a/memory/mempool.c +++ b/memory/mempool.c @@ -74,7 +74,7 @@ static mp_page_t* mp_page_create(mp_bucket_t *owner){ pg->prev = NULL; pg->next = NULL; - bitmap_clear_all(pg->bitmap, 8); + bitmap_clear_all(pg->bitmap, 20); char *p = (char*)page_payload(pg); for(uint16_t i = 0;i < cap - 1; ++ i){ diff --git a/memory/mempool.h b/memory/mempool.h index eae3d57..b3df5c0 100644 --- a/memory/mempool.h +++ b/memory/mempool.h @@ -6,11 +6,12 @@ #include #include -#define MEMPOOL_PAGE_SIZE 4096 +// #define MEMPOOL_PAGE_SIZE 4096 +#define MEMPOOL_PAGE_SIZE (1024*8) #define MEMPOOL_BLOCK_MAX_SIZE 512 #define MEMPOOL_ALIGNMENT 8 #define MEMPOOL_NUM_CLASSES (MEMPOOL_BLOCK_MAX_SIZE / MEMPOOL_ALIGNMENT) -#define MEMPOOL_CACHE_PAGE 2 +#define MEMPOOL_CACHE_PAGE 4 typedef struct mp_page_s mp_page_t; typedef struct mp_bucket_s mp_bucket_t; @@ -34,7 +35,7 @@ struct mp_page_s{ uint16_t free_count; uint16_t capacity; - uint64_t bitmap[8]; // 最多支持 512 个块 (64*8) + uint64_t bitmap[20]; // 最多支持 512/1280 个块 (64*20) }; struct mp_bucket_s{ diff --git a/reactor.c b/reactor.c index 1c4e490..4bb7ecd 100644 --- a/reactor.c +++ b/reactor.c @@ -336,6 +336,12 @@ int r_init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); + int opt = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + perror("setsockopt"); + close(sockfd); + } + struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 diff --git a/test-redis/test.c b/test-redis/test.c index e44869a..555bb17 100644 --- a/test-redis/test.c +++ b/test-redis/test.c @@ -5,7 +5,8 @@ #include #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) - +// #define PRINT printf +#define PRINT static void die(redisContext *c, const char *msg) { fprintf(stderr, "%s: %s\n", msg, c && c->err ? c->errstr : "unknown"); @@ -38,6 +39,7 @@ static void must_bulk_eq(redisReply *r, const void *buf, size_t n, const char *w if (!r) { fprintf(stderr, "%s: reply null\n", what); exit(1); } if (r->type != REDIS_REPLY_STRING || r->len != n || memcmp(r->str, buf, n) != 0) { fprintf(stderr, "%s: bulk mismatch. type=%d len=%zu\n", what, r->type, r->len); + fprintf(stderr, "expect:%s, truely:%s\n", (const char*)buf, r->str); freeReplyObject(r); exit(1); } @@ -111,17 +113,18 @@ void pipline_set_test(redisContext *c, int start, int countN, const char *op){ die(c, "redisAppendCommand SET failed"); } - if(i%10000 == 0) printf("%d\n", i); - + if(i%10000 == 0) PRINT("SEND: %d\n", i); } /* 再一次性把 N 个回复读出来 */ for (int i = start; i < end; i++) { redisReply *r = NULL; if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply SET failed"); must_ok(r, "pipeline SET reply"); + + if(i%10000 == 0) PRINT("RECV: %d\n", i); } - printf("[OK] SET pipeline batch %d\n", N); + PRINT("[OK] SET pipeline batch %d\n", N); } void pipline_get_test(redisContext *c, int start, int countN, const char *op){ @@ -138,7 +141,7 @@ void pipline_get_test(redisContext *c, int start, int countN, const char *op){ die(c, "redisAppendCommand GET failed"); } - if(i%10000 == 0) printf("%d\n", i); + if(i%10000 == 0) PRINT("SEND: %d\n", i); } for (int i = start; i < end; i++) { redisReply *r = NULL; @@ -146,9 +149,11 @@ void pipline_get_test(redisContext *c, int start, int countN, const char *op){ char expect[64]; int en = snprintf(expect, sizeof(expect), "v:%d", i); must_bulk_eq(r, expect, (size_t)en, "pipeline GET reply"); + + if(i%10000 == 0) PRINT("RECV: %d\n", i); } - printf("[OK] GET pipeline batch %d\n", N); + PRINT("[OK] GET pipeline batch %d\n", N); } void pipline_del_test(redisContext *c, int start, int countN, const char *op){ @@ -165,17 +170,44 @@ void pipline_del_test(redisContext *c, int start, int countN, const char *op){ die(c, "redisAppendCommand DEL failed"); } - if(i%10000 == 0) printf("%d\n", i); + if(i%10000 == 0) PRINT("SEND: %d\n", i); } for (int i = start; i < end; i++) { redisReply *r = NULL; if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply DEL failed"); freeReplyObject(r); /* DEL 返回 int,这里不强制检查 */ + + if(i%10000 == 0) PRINT("RECV: %d\n", i); } - printf("[OK] DEL pipeline batch %d\n", N); + PRINT("[OK] DEL pipeline batch %d\n", N); } +long long test_nopersist_noreplica(redisContext *c, int rounds, long long batch_size){ + struct timeval tv_begin, tv_end; + gettimeofday(&tv_begin, NULL); + + long long total_ops = batch_size*rounds; + + for(int i = 0;i < total_ops ; i += batch_size){ + pipline_set_test(c, i, batch_size, "RSET"); + } + + for(int i = 0;i < total_ops ; i += batch_size){ + pipline_get_test(c, i, batch_size, "RGET"); + } + + for(int i = 0;i < total_ops ; i += batch_size){ + pipline_del_test(c, i, batch_size, "RDEL"); + } + + gettimeofday(&tv_end, NULL); + int time_used = TIME_SUB_MS(tv_end, tv_begin); + long long qps = total_ops *3*1000/time_used; + printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n", total_ops *3, time_used, qps); + + return qps; +} int main(int argc, char **argv) { if(argc < 4) { @@ -197,29 +229,18 @@ int main(int argc, char **argv) { }else if(mode == 1){ basic_command_test(c); }else if(mode == 3){ - struct timeval tv_begin, tv_end; - gettimeofday(&tv_begin, NULL); - long long ops = 100000; - - pipline_set_test(c, 0, ops, "RSET"); + int rounds = 10; + long long batch_size = 100000; + int testrounds = 50; + long long total_qps = 0; - gettimeofday(&tv_end, NULL); - int time_used = TIME_SUB_MS(tv_end, tv_begin); - long long qps = ops*1000/time_used; - printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n", - ops, time_used, qps); + for(int i = 0;i < testrounds; ++ i){ + total_qps += test_nopersist_noreplica(c, rounds, batch_size); + } + printf("average qps:%lld\n", total_qps/testrounds); }else if(mode == 4){ - struct timeval tv_begin, tv_end; - gettimeofday(&tv_begin, NULL); - long long ops = 100000; - pipline_del_test(c, 0, ops, "RDEL"); - - gettimeofday(&tv_end, NULL); - int time_used = TIME_SUB_MS(tv_end, tv_begin); - long long qps = ops*1000/time_used; - printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n", - ops, time_used, qps); + }else if(mode == 5){ }else if(mode == 10){ pipline_set_test(c, 0, 1000, "SET");