diff --git a/README.md b/README.md index 630afcd..bd72d58 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,14 @@ 1. ntyco需要作为kvstore的submodule,通过git clone一次下载。 **完成**。 2. README需要包含编译步骤,测试方案与可行性,性能数据。 **完成**。 3. 全量持久化保存数据集。 **BUG FIX**。 -4. 持久化的性能数据。 +4. 持久化的性能数据。 **完成**。 5. 特殊字符,可以解决redis的resp协议。 **完成**。 6. 实现配置文件,把日志级别,端口ip,主从模式,持久化方案。 **完成**。 7. 持久化落盘用io_uring,加载配置文件用mmap。 **完成**。 8. 主从同步的性能,开启与关闭性能做到5%?。 9. 主从同步600w条,出现的coredump。 **完成**。 10. 主从同步用ebpf实现。 **BUG FIX**。 -11. 内存池测试qps与虚拟内存,物理内存。 +11. 内存池测试qps与虚拟内存,物理内存。 **完成**。 12. 实现一个内存泄露检测组件。 **完成**。 @@ -38,14 +38,13 @@ make 测试条件: 1. 不启用持久化。 2. 不启用主从同步。 -2. pipline: +3. pipline: 1. RSET 100w 条, p:i v:i -> +OK 2. RGET 100w 条, p:i -> +v:i 3. RDEL 100w 条。 p:i -> +OK -3. 重复 15 次. 4. 本机发送请求。 -内存分配: malloc +#### 内存分配: malloc ```bash lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 Connected to 192.168.10.129:8888 @@ -103,122 +102,210 @@ average qps:880462 ALL TESTS PASSED. ``` -内存分配: 自实现内存池 +#### 内存分配: 自实现内存池 ```bash lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 Connected to 192.168.10.129:8888 -BATCH (N=3000000) --> time_used=3552 ms, qps=844594 -BATCH (N=3000000) --> time_used=3404 ms, qps=881316 -BATCH (N=3000000) --> time_used=3083 ms, qps=973078 -BATCH (N=3000000) --> time_used=3315 ms, qps=904977 -BATCH (N=3000000) --> time_used=3462 ms, qps=866551 -BATCH (N=3000000) --> time_used=3334 ms, qps=899820 -BATCH (N=3000000) --> time_used=3358 ms, qps=893388 -BATCH (N=3000000) --> time_used=3423 ms, qps=876424 -BATCH (N=3000000) --> time_used=3319 ms, qps=903886 -BATCH (N=3000000) --> time_used=3327 ms, qps=901713 -BATCH (N=3000000) --> time_used=3157 ms, qps=950269 BATCH (N=3000000) --> time_used=3241 ms, qps=925640 -BATCH (N=3000000) --> time_used=3301 ms, qps=908815 +BATCH (N=3000000) --> time_used=3047 ms, qps=984574 +BATCH (N=3000000) --> time_used=3085 ms, qps=972447 +BATCH (N=3000000) --> time_used=3119 ms, qps=961846 +BATCH (N=3000000) --> time_used=3104 ms, qps=966494 +BATCH (N=3000000) --> time_used=3163 ms, qps=948466 +BATCH (N=3000000) --> time_used=3033 ms, qps=989119 +BATCH (N=3000000) --> time_used=3170 ms, qps=946372 +BATCH (N=3000000) --> time_used=3299 ms, qps=909366 +BATCH (N=3000000) --> time_used=3272 ms, qps=916870 +BATCH (N=3000000) --> time_used=3294 ms, qps=910746 +BATCH (N=3000000) --> time_used=3182 ms, qps=942803 +BATCH (N=3000000) --> time_used=3190 ms, qps=940438 +BATCH (N=3000000) --> time_used=3493 ms, qps=858860 +BATCH (N=3000000) --> time_used=3111 ms, qps=964320 +BATCH (N=3000000) --> time_used=3220 ms, qps=931677 +BATCH (N=3000000) --> time_used=3067 ms, qps=978154 BATCH (N=3000000) --> time_used=3345 ms, qps=896860 -BATCH (N=3000000) --> time_used=3319 ms, qps=903886 -BATCH (N=3000000) --> time_used=3312 ms, qps=905797 -BATCH (N=3000000) --> time_used=3337 ms, qps=899011 -BATCH (N=3000000) --> time_used=3309 ms, qps=906618 -BATCH (N=3000000) --> time_used=3385 ms, qps=886262 -BATCH (N=3000000) --> time_used=3328 ms, qps=901442 -BATCH (N=3000000) --> time_used=3194 ms, qps=939261 -BATCH (N=3000000) --> time_used=3309 ms, qps=906618 -BATCH (N=3000000) --> time_used=3262 ms, qps=919681 -BATCH (N=3000000) --> time_used=3314 ms, qps=905250 -BATCH (N=3000000) --> time_used=3382 ms, qps=887049 +BATCH (N=3000000) --> time_used=3381 ms, qps=887311 +BATCH (N=3000000) --> time_used=3416 ms, qps=878220 +BATCH (N=3000000) --> time_used=3192 ms, qps=939849 +BATCH (N=3000000) --> time_used=3085 ms, qps=972447 +BATCH (N=3000000) --> time_used=3150 ms, qps=952380 BATCH (N=3000000) --> time_used=3296 ms, qps=910194 -BATCH (N=3000000) --> time_used=3331 ms, qps=900630 -BATCH (N=3000000) --> time_used=3279 ms, qps=914913 -BATCH (N=3000000) --> time_used=2996 ms, qps=1001335 -BATCH (N=3000000) --> time_used=3387 ms, qps=885739 -BATCH (N=3000000) --> time_used=3346 ms, qps=896592 -BATCH (N=3000000) --> time_used=3241 ms, qps=925640 -BATCH (N=3000000) --> time_used=3353 ms, qps=894721 -BATCH (N=3000000) --> time_used=3366 ms, qps=891265 -BATCH (N=3000000) --> time_used=3387 ms, qps=885739 -BATCH (N=3000000) --> time_used=3382 ms, qps=887049 -BATCH (N=3000000) --> time_used=3358 ms, qps=893388 -BATCH (N=3000000) --> time_used=3372 ms, qps=889679 -BATCH (N=3000000) --> time_used=3467 ms, qps=865301 -BATCH (N=3000000) --> time_used=3243 ms, qps=925069 -BATCH (N=3000000) --> time_used=3191 ms, qps=940144 -BATCH (N=3000000) --> time_used=3365 ms, qps=891530 -BATCH (N=3000000) --> time_used=3218 ms, qps=932256 -BATCH (N=3000000) --> time_used=3332 ms, qps=900360 -BATCH (N=3000000) --> time_used=3172 ms, qps=945775 -BATCH (N=3000000) --> time_used=3211 ms, qps=934288 +BATCH (N=3000000) --> time_used=3001 ms, qps=999666 +BATCH (N=3000000) --> time_used=3143 ms, qps=954502 +BATCH (N=3000000) --> time_used=3111 ms, qps=964320 +BATCH (N=3000000) --> time_used=3123 ms, qps=960614 +BATCH (N=3000000) --> time_used=3257 ms, qps=921093 +BATCH (N=3000000) --> time_used=3037 ms, qps=987816 +BATCH (N=3000000) --> time_used=3135 ms, qps=956937 BATCH (N=3000000) --> time_used=3124 ms, qps=960307 -BATCH (N=3000000) --> time_used=3043 ms, qps=985869 -BATCH (N=3000000) --> time_used=3086 ms, qps=972132 -BATCH (N=3000000) --> time_used=3201 ms, qps=937207 -average qps:911106 +BATCH (N=3000000) --> time_used=3276 ms, qps=915750 +BATCH (N=3000000) --> time_used=3058 ms, qps=981033 +BATCH (N=3000000) --> time_used=3024 ms, qps=992063 +BATCH (N=3000000) --> time_used=3224 ms, qps=930521 +BATCH (N=3000000) --> time_used=3235 ms, qps=927357 +BATCH (N=3000000) --> time_used=3334 ms, qps=899820 +BATCH (N=3000000) --> time_used=3427 ms, qps=875401 +BATCH (N=3000000) --> time_used=3218 ms, qps=932256 +BATCH (N=3000000) --> time_used=3191 ms, qps=940144 +BATCH (N=3000000) --> time_used=3179 ms, qps=943692 +BATCH (N=3000000) --> time_used=3104 ms, qps=966494 +BATCH (N=3000000) --> time_used=3202 ms, qps=936914 +BATCH (N=3000000) --> time_used=3184 ms, qps=942211 +BATCH (N=3000000) --> time_used=3000 ms, qps=1000000 +BATCH (N=3000000) --> time_used=3280 ms, qps=914634 +BATCH (N=3000000) --> time_used=3141 ms, qps=955109 +BATCH (N=3000000) --> time_used=3198 ms, qps=938086 +BATCH (N=3000000) --> time_used=3126 ms, qps=959692 +average qps:942837 ALL TESTS PASSED. ``` -内存分配:jemalloc +#### 内存分配:jemalloc ```shell lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 Connected to 192.168.10.129:8888 -BATCH (N=3000000) --> time_used=3197 ms, qps=938379 -BATCH (N=3000000) --> time_used=3221 ms, qps=931387 -BATCH (N=3000000) --> time_used=3360 ms, qps=892857 -BATCH (N=3000000) --> time_used=3292 ms, qps=911300 -BATCH (N=3000000) --> time_used=3407 ms, qps=880540 -BATCH (N=3000000) --> time_used=3317 ms, qps=904431 -BATCH (N=3000000) --> time_used=3337 ms, qps=899011 -BATCH (N=3000000) --> time_used=3384 ms, qps=886524 -BATCH (N=3000000) --> time_used=3355 ms, qps=894187 -BATCH (N=3000000) --> time_used=3379 ms, qps=887836 -BATCH (N=3000000) --> time_used=3243 ms, qps=925069 -BATCH (N=3000000) --> time_used=3377 ms, qps=888362 -BATCH (N=3000000) --> time_used=3212 ms, qps=933997 -BATCH (N=3000000) --> time_used=3248 ms, qps=923645 +BATCH (N=3000000) --> time_used=3511 ms, qps=854457 +BATCH (N=3000000) --> time_used=3280 ms, qps=914634 +BATCH (N=3000000) --> time_used=3603 ms, qps=832639 +BATCH (N=3000000) --> time_used=3418 ms, qps=877706 +BATCH (N=3000000) --> time_used=3353 ms, qps=894721 +BATCH (N=3000000) --> time_used=3435 ms, qps=873362 +BATCH (N=3000000) --> time_used=3250 ms, qps=923076 +BATCH (N=3000000) --> time_used=3550 ms, qps=845070 +BATCH (N=3000000) --> time_used=3536 ms, qps=848416 +BATCH (N=3000000) --> time_used=3273 ms, qps=916590 +BATCH (N=3000000) --> time_used=3224 ms, qps=930521 +BATCH (N=3000000) --> time_used=3161 ms, qps=949066 +BATCH (N=3000000) --> time_used=3143 ms, qps=954502 +BATCH (N=3000000) --> time_used=3342 ms, qps=897666 +BATCH (N=3000000) --> time_used=3410 ms, qps=879765 +BATCH (N=3000000) --> time_used=3522 ms, qps=851788 +BATCH (N=3000000) --> time_used=3035 ms, qps=988467 +BATCH (N=3000000) --> time_used=3352 ms, qps=894988 +BATCH (N=3000000) --> time_used=3226 ms, qps=929944 +BATCH (N=3000000) --> time_used=3406 ms, qps=880798 +BATCH (N=3000000) --> time_used=3336 ms, qps=899280 +BATCH (N=3000000) --> time_used=3307 ms, qps=907166 +BATCH (N=3000000) --> time_used=3171 ms, qps=946073 +BATCH (N=3000000) --> time_used=3252 ms, qps=922509 +BATCH (N=3000000) --> time_used=3296 ms, qps=910194 +BATCH (N=3000000) --> time_used=3301 ms, qps=908815 +BATCH (N=3000000) --> time_used=3403 ms, qps=881575 BATCH (N=3000000) --> time_used=3234 ms, qps=927643 -BATCH (N=3000000) --> time_used=3152 ms, qps=951776 -BATCH (N=3000000) --> time_used=3089 ms, qps=971188 -BATCH (N=3000000) --> time_used=3287 ms, qps=912686 -BATCH (N=3000000) --> time_used=3079 ms, qps=974342 -BATCH (N=3000000) --> time_used=3261 ms, qps=919963 -BATCH (N=3000000) --> time_used=3123 ms, qps=960614 -BATCH (N=3000000) --> time_used=3234 ms, qps=927643 -BATCH (N=3000000) --> time_used=3056 ms, qps=981675 -BATCH (N=3000000) --> time_used=3040 ms, qps=986842 -BATCH (N=3000000) --> time_used=3187 ms, qps=941324 -BATCH (N=3000000) --> time_used=3311 ms, qps=906070 -BATCH (N=3000000) --> time_used=3155 ms, qps=950871 -BATCH (N=3000000) --> time_used=3318 ms, qps=904159 -BATCH (N=3000000) --> time_used=3372 ms, qps=889679 -BATCH (N=3000000) --> time_used=3254 ms, qps=921942 -BATCH (N=3000000) --> time_used=3386 ms, qps=886001 -BATCH (N=3000000) --> time_used=3413 ms, qps=878992 -BATCH (N=3000000) --> time_used=3474 ms, qps=863557 -BATCH (N=3000000) --> time_used=3412 ms, qps=879249 -BATCH (N=3000000) --> time_used=3414 ms, qps=878734 -BATCH (N=3000000) --> time_used=3325 ms, qps=902255 -BATCH (N=3000000) --> time_used=3346 ms, qps=896592 -BATCH (N=3000000) --> time_used=3345 ms, qps=896860 -BATCH (N=3000000) --> time_used=3582 ms, qps=837520 -BATCH (N=3000000) --> time_used=3412 ms, qps=879249 +BATCH (N=3000000) --> time_used=3348 ms, qps=896057 +BATCH (N=3000000) --> time_used=3517 ms, qps=852999 +BATCH (N=3000000) --> time_used=3354 ms, qps=894454 +BATCH (N=3000000) --> time_used=3529 ms, qps=850099 +BATCH (N=3000000) --> time_used=3473 ms, qps=863806 +BATCH (N=3000000) --> time_used=3521 ms, qps=852030 BATCH (N=3000000) --> time_used=3370 ms, qps=890207 -BATCH (N=3000000) --> time_used=3375 ms, qps=888888 -BATCH (N=3000000) --> time_used=3190 ms, qps=940438 -BATCH (N=3000000) --> time_used=3324 ms, qps=902527 -BATCH (N=3000000) --> time_used=3253 ms, qps=922225 -BATCH (N=3000000) --> time_used=3230 ms, qps=928792 +BATCH (N=3000000) --> time_used=3267 ms, qps=918273 +BATCH (N=3000000) --> time_used=3352 ms, qps=894988 +BATCH (N=3000000) --> time_used=3433 ms, qps=873871 +BATCH (N=3000000) --> time_used=3374 ms, qps=889152 +BATCH (N=3000000) --> time_used=3360 ms, qps=892857 +BATCH (N=3000000) --> time_used=3463 ms, qps=866300 +BATCH (N=3000000) --> time_used=3499 ms, qps=857387 BATCH (N=3000000) --> time_used=3294 ms, qps=910746 -BATCH (N=3000000) --> time_used=3295 ms, qps=910470 -BATCH (N=3000000) --> time_used=3148 ms, qps=952986 -BATCH (N=3000000) --> time_used=3228 ms, qps=929368 -average qps:914031 +BATCH (N=3000000) --> time_used=3311 ms, qps=906070 +BATCH (N=3000000) --> time_used=3443 ms, qps=871333 +BATCH (N=3000000) --> time_used=3381 ms, qps=887311 +BATCH (N=3000000) --> time_used=3422 ms, qps=876680 +BATCH (N=3000000) --> time_used=3421 ms, qps=876936 +BATCH (N=3000000) --> time_used=3322 ms, qps=903070 +BATCH (N=3000000) --> time_used=3494 ms, qps=858614 +average qps:892493 ALL TESTS PASSED. ``` +### 测试2:持久化 +测试条件: +1. 启用持久化。 +2. 不启用主从同步。 +3. pipline: + 1. RSET 100w 条, p:i v:i -> +OK + 2. RGET 100w 条, p:i -> +v:i + 3. RDEL 100w 条。 p:i -> +OK +5. 本机发送请求。 + +```shell +lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 4 +Connected to 192.168.10.129:8888 +BATCH (N=3000000) --> time_used=3500 ms, qps=857142 +BATCH (N=3000000) --> time_used=3322 ms, qps=903070 +BATCH (N=3000000) --> time_used=3424 ms, qps=876168 +BATCH (N=3000000) --> time_used=3483 ms, qps=861326 +BATCH (N=3000000) --> time_used=3421 ms, qps=876936 +BATCH (N=3000000) --> time_used=3519 ms, qps=852514 +BATCH (N=3000000) --> time_used=3597 ms, qps=834028 +BATCH (N=3000000) --> time_used=3504 ms, qps=856164 +BATCH (N=3000000) --> time_used=3281 ms, qps=914355 +BATCH (N=3000000) --> time_used=3446 ms, qps=870574 +average qps:870227 +ALL TESTS PASSED. +``` + +### 测试3:内存 + +#### malloc +```shell +VIRT 58504 +RES 4604 + +插入 20w 删除 10w,重复 10 次,共计插入 200w 删除 100w。 +BATCH (N=3000000) --> time_used=3320 ms, qps=1807228 + +VIRT 208M +RES 155M + +插入 10w 删除 20w,重复 10 次,共计插入 100w 删除 200w。 +BATCH (N=3000000) --> time_used=3097 ms, qps=1937358 + +VIRT 208M +RES 155M +``` +![alt text](image11.png) +![alt text](image12.png) +![alt text](image13.png) + +#### mypool +```shell +VIRT 58504 +RES 4636 + +插入 20w 删除 10w,重复 30 次,共计插入 600w 删除 300w。 +BATCH (N=3000000) --> time_used=3184 ms, qps=1884422 + +VIRT 625M +RES 572M + +插入 10w 删除 20w,重复 10 次,共计插入 100w 删除 200w。 +BATCH (N=3000000) --> time_used=3022 ms, qps=1985440 + +VIRT 122M +RES 71492 +``` +![alt text](image31.png) +![alt text](image32.png) +![alt text](image33.png) + +#### jemalloc +```shell +VIRT 69376 +RES 5408 + +插入 20w 删除 10w,重复 30 次,共计插入 600w 删除 300w。 +BATCH (N=9000000) --> time_used=9436 ms, qps=1907587 + +VIRT 356M +RES 294M + +插入 10w 删除 20w,重复 30 次,共计插入 300w 删除 600w。 +BATCH (N=9000000) --> time_used=9353 ms, qps=1924516 + +VIRT 356M +RES 119M +``` ### 面试题 1. 为什么会实现kvstore,使用场景在哪里? diff --git a/config/config.xml b/config/config.xml index bf8a219..877c906 100644 --- a/config/config.xml +++ b/config/config.xml @@ -17,7 +17,7 @@ - none + none data kvs_oplog.db @@ -27,7 +27,7 @@ - malloc + mypool disable diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index aafdd46..4071491 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -3,8 +3,16 @@ #include #include +static destroy_queue_t g_destroy_queue = {NULL, PTHREAD_MUTEX_INITIALIZER}; + +static long long push_to_queue = 0; +static long long push_to_sqe = 0; +static long long get_from_cqe = 0; +static long long release_cnt = 0; + void task_init(task_t *t) { + push_to_queue ++; pthread_mutex_init(&t->m, NULL); pthread_cond_init(&t->cv, NULL); t->done = 0; @@ -59,68 +67,196 @@ static void queue_push(iouring_ctx_t *ctx, task_t *t) pthread_mutex_unlock(&ctx->q_m); } +static void queue_push_front(iouring_ctx_t *ctx, task_t *list_head, task_t *list_tail) { + pthread_mutex_lock(&ctx->q_m); + list_tail->next = ctx->q_head; + ctx->q_head = list_head; + if (!ctx->q_tail) { + ctx->q_tail = list_tail; + } + pthread_cond_signal(&ctx->q_cv); + pthread_mutex_unlock(&ctx->q_m); } + static task_t *queue_pop_all(iouring_ctx_t *ctx) { pthread_mutex_lock(&ctx->q_m); - while (!ctx->stop && ctx->q_head == NULL) - { - pthread_cond_wait(&ctx->q_cv, &ctx->q_m); - } task_t *list = ctx->q_head; ctx->q_head = ctx->q_tail = NULL; pthread_mutex_unlock(&ctx->q_m); return list; } +static task_t *queue_pop_n(iouring_ctx_t *ctx, int n) +{ + if (n <= 0) + return NULL; + pthread_mutex_lock(&ctx->q_m); + task_t *head = ctx->q_head; + if (!head) { + pthread_mutex_unlock(&ctx->q_m); + return NULL; + } + task_t *curr = head; + task_t *prev = NULL; + int count = 0; + + while (curr && count < n) { + prev = curr; + curr = curr->next; + count++; + } + + ctx->q_head = curr; + if (!curr) { + // 队列被取空 + ctx->q_tail = NULL; + } + prev->next = NULL; + pthread_mutex_unlock(&ctx->q_m); + return head; +} + +extern void sync_wakeup(); static void *worker_main(void *arg) { iouring_ctx_t *ctx = (iouring_ctx_t *)arg; + const int BATCH_SIZE = 256; // 每次最多准备这么多,防止一次占满 SQ while (!ctx->stop) - { - - // 1. 阻塞等任务(这里睡觉) - task_t *list = queue_pop_all(ctx); - if (!list) - continue; - - // 2. 提交所有任务 - task_t *t; - int submit_cnt = 0; - - for (t = list; t; t = t->next) - { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); - if (!sqe) - break; - - io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); - sqe->user_data = (uint64_t)(uintptr_t)t; - submit_cnt++; + { + int cq_count = 0; + + // ========== 1. 疯狂收割 CQE(必须优先做,释放 in_flight 额度)========== + // 使用 while 而不是 if,确保把 CQ 薅干净 + while (true) { + struct io_uring_cqe *cqe; + unsigned head; + + io_uring_for_each_cqe(&ctx->ring, head, cqe) { + task_t *done = (task_t *)(uintptr_t)cqe->user_data; + + // 先减计数(必须在处理前减,否则可能瞬间突破上限) + atomic_fetch_sub(&ctx->in_flight, 1); + + task_finish(done, cqe->res); + + if (cqe->res < 0) { + fprintf(stderr, "write fail: fd=%d res=%d\n", done->fd, cqe->res); + } + + // 加入销毁队列 + pthread_mutex_lock(&g_destroy_queue.lock); + done->next = g_destroy_queue.head; + g_destroy_queue.head = done; + pthread_mutex_unlock(&g_destroy_queue.lock); + + get_from_cqe++; + cq_count++; + } + + if (cq_count > 0) { + io_uring_cq_advance(&ctx->ring, cq_count); + sync_wakeup(); + } + + // 如果这次没收满,说明 CQ 空了,退出收割循环 + if (cq_count == 0) break; + cq_count = 0; // 重置继续薅(可能有新的完成了) } - io_uring_submit(&ctx->ring); + // 检查溢出(保险起见,虽然有了背压不该再溢出) + if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { + fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n"); + abort(); // 直接崩溃,说明逻辑有 bug + } - // 3. 等全部完成 - for (int i = 0; i < submit_cnt; ++i) - { + // ========== 2. 计算还能提交多少 ========== + int current_in_flight = atomic_load(&ctx->in_flight); + int available_slots = ctx->max_in_flight - current_in_flight; + + if (available_slots <= 0) { + // 满了!不能取新任务,必须等待 CQE(忙等或阻塞等) + + // 方案 B:阻塞等 CQE(推荐) struct io_uring_cqe *cqe; - io_uring_wait_cqe(&ctx->ring, &cqe); - - task_t *done = (task_t *)(uintptr_t)cqe->user_data; - task_finish(done, cqe->res); - - // todo: 失败应该通知主线程,提供一个链表,把失败的taskpush进去。主线程用定时器任务定时处理失败信息然后destory。 - // 暂时用打印说明失败 - if(done->res < 0){ - printf("uring failed: fd:%d, offset:%ld\n", done->fd, done->off); + int ret = io_uring_wait_cqe(&ctx->ring, &cqe); + if (ret == 0 && !ctx->stop) { + // 收到一个 CQE,回循环开头处理 + continue; } - task_destroy(done); + continue; + } - io_uring_cqe_seen(&ctx->ring, cqe); + // ========== 3. 从任务队列取任务(只取 available_slots 个)========== + + task_t *task_list = queue_pop_n(ctx, available_slots); + if (!task_list) { + if (!ctx->stop && atomic_load(&ctx->in_flight) > 0) { + int ret = io_uring_submit_and_wait(&ctx->ring, 1); + continue; + } + // 没任务,等待条件变量 + pthread_mutex_lock(&ctx->q_m); + while (ctx->q_head == NULL && !ctx->stop) { + pthread_cond_wait(&ctx->q_cv, &ctx->q_m); + } + pthread_mutex_unlock(&ctx->q_m); + continue; + } + + // ========== 4. 准备 SQE(受限于 available_slots)========== + int batch_count = 0; + task_t *curr = task_list; + task_t *prev = NULL; + task_t *submitted_head = task_list; // 记录这次实际要提交的部分 + task_t *remaining_head = NULL; // 装不下的部分 + + while (curr && batch_count < available_slots) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); + + if (!sqe) { + // SQ 满了(这种情况在控制 inflight 后很少见,但保险起见) + break; + } + + io_uring_prep_writev(sqe, curr->fd, curr->iovs, curr->iovcnt, curr->off); + sqe->user_data = (uint64_t)(uintptr_t)curr; + + batch_count++; + prev = curr; + curr = curr->next; + } + + // 断开链表:已准备的 和 未准备的 + if (prev) { + prev->next = NULL; // 已提交的部分结尾 + } + remaining_head = curr; // 剩下的部分(如果有) + + // ========== 5. 提交并增加计数 ========== + if (batch_count > 0) { + int submitted = io_uring_submit(&ctx->ring); + if (submitted != batch_count) { + fprintf(stderr, "CRITICAL: prep %d but submit %d\n", batch_count, submitted); + // 这种情况很严重,说明 ring 损坏了,建议 abort + abort(); + } + + atomic_fetch_add(&ctx->in_flight, submitted); + push_to_sqe += submitted; + } + + // ========== 6. 把没提交的任务塞回队列头部(保持顺序)========== + if (remaining_head) { + task_t *tail = remaining_head; + while (tail->next) tail = tail->next; + + queue_push_front(ctx, remaining_head, tail); } } + printf("exit uring, stop: %d, inflight: %d\n", ctx->stop, + atomic_load(&ctx->in_flight)); return NULL; } @@ -135,12 +271,26 @@ int iouring_init(iouring_ctx_t *ctx, unsigned entries) memset(ctx, 0, sizeof(*ctx)); pthread_mutex_init(&ctx->q_m, NULL); pthread_cond_init(&ctx->q_cv, NULL); + ctx->stop = 0; struct io_uring_params params; memset(¶ms, 0, sizeof(params)); + + // params.flags |= IORING_SETUP_CQSIZE; + // params.cq_entries = 256 * 1024; + // params.sq_entries = 128 * 1024; int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); - if (ret < 0) + if (ret < 0) { + fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n", + ret, strerror(-ret)); return ret; + } + + unsigned cq_size = *ctx->ring.cq.kring_entries; + printf("Kernel CQ size: %u\n", cq_size); + + ctx->max_in_flight = (cq_size * 8) / 10; + atomic_init(&ctx->in_flight, 0); ret = pthread_create(&ctx->th, NULL, worker_main, ctx); if (ret != 0) @@ -201,3 +351,23 @@ task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int queue_push(ctx, t); return t; } + +// 主线程定期调用此函数清理 +void cleanup_finished_iouring_tasks() { + pthread_mutex_lock(&g_destroy_queue.lock); + task_t *list = g_destroy_queue.head; + g_destroy_queue.head = NULL; + pthread_mutex_unlock(&g_destroy_queue.lock); + + int cnt = 0; + while (list) { + cnt ++; + task_t *next = list->next; + task_destroy(list); // 在主线程执行销毁 + list = next; + } + // printf("clean: %d\n\n", cnt); + // mp_print(); + release_cnt += cnt; + // printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt); +} \ No newline at end of file diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h index 29ba85b..a91cbe9 100644 --- a/diskuring/diskuring.h +++ b/diskuring/diskuring.h @@ -18,6 +18,7 @@ typedef struct task { int fd; off_t off; + int refcount; int res; // cqe->res int done; // 0/1 @@ -39,8 +40,15 @@ typedef struct { task_t *q_head, *q_tail; int stop; + atomic_int in_flight; + int max_in_flight; } iouring_ctx_t; +typedef struct { + task_t *head; + pthread_mutex_t lock; +} destroy_queue_t; + int iouring_register_fd(iouring_ctx_t *ctx, int fd); void task_init(task_t *t); @@ -54,6 +62,8 @@ void iouring_shutdown(iouring_ctx_t *ctx); task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); +void cleanup_finished_iouring_tasks(); + extern iouring_ctx_t global_uring_ctx; #endif \ No newline at end of file diff --git a/dump/kvs_oplog.c b/dump/kvs_oplog.c index 6a9f857..a89017e 100644 --- a/dump/kvs_oplog.c +++ b/dump/kvs_oplog.c @@ -54,8 +54,6 @@ int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd){ return -4; } - // task_wait(t); - // task_destroy(t); return 0; } diff --git a/img/image11.png b/img/image11.png new file mode 100644 index 0000000..b73face Binary files /dev/null and b/img/image11.png differ diff --git a/img/image12.png b/img/image12.png new file mode 100644 index 0000000..66a9a22 Binary files /dev/null and b/img/image12.png differ diff --git a/img/image13.png b/img/image13.png new file mode 100644 index 0000000..66a9a22 Binary files /dev/null and b/img/image13.png differ diff --git a/img/image22.png b/img/image22.png new file mode 100644 index 0000000..f91a2cf Binary files /dev/null and b/img/image22.png differ diff --git a/img/image23.png b/img/image23.png new file mode 100644 index 0000000..094cd27 Binary files /dev/null and b/img/image23.png differ diff --git a/img/image31.png b/img/image31.png new file mode 100644 index 0000000..baf1d8c Binary files /dev/null and b/img/image31.png differ diff --git a/img/image32.png b/img/image32.png new file mode 100644 index 0000000..260bbc1 Binary files /dev/null and b/img/image32.png differ diff --git a/img/image33.png b/img/image33.png new file mode 100644 index 0000000..1b774a0 Binary files /dev/null and b/img/image33.png differ diff --git a/img/批处理1jemalloc.png b/img/批处理1jemalloc.png deleted file mode 100644 index 2f3b61f..0000000 Binary files a/img/批处理1jemalloc.png and /dev/null differ diff --git a/img/批处理1内存池.png b/img/批处理1内存池.png deleted file mode 100644 index 8466845..0000000 Binary files a/img/批处理1内存池.png and /dev/null differ diff --git a/img/批处理1无内存池.png b/img/批处理1无内存池.png deleted file mode 100644 index 8738262..0000000 Binary files a/img/批处理1无内存池.png and /dev/null differ diff --git a/img/未运行状态.png b/img/未运行状态.png deleted file mode 100644 index e0738d7..0000000 Binary files a/img/未运行状态.png and /dev/null differ diff --git a/kvs_array_bin.c b/kvs_array_bin.c index 61e189f..2fa0d27 100644 --- a/kvs_array_bin.c +++ b/kvs_array_bin.c @@ -237,13 +237,7 @@ int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); if (!t) { close(fd); return -4; } - // int res = task_wait(t); - // task_destroy(t); - // if (res < 0) { - // close(fd); - // return -5; - // } current_off += (off_t) total; } diff --git a/kvs_hash_bin.c b/kvs_hash_bin.c index d99afd6..6283c21 100755 --- a/kvs_hash_bin.c +++ b/kvs_hash_bin.c @@ -310,13 +310,7 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){ task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); if (!t) { close(fd); return -4; } - // int res = task_wait(t); - // task_destroy(t); - // if (res < 0) { - // close(fd); - // return -5; - // } current_off += (off_t) total; } diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index 3418626..26bf73f 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -1,5 +1,6 @@ #include "kvs_protocol_resp.h" #include "kvs_rw_tools.h" +#include "memory/alloc_dispatch.h" #include "dump/kvs_dump.h" #if ENABLE_ARRAY @@ -338,7 +339,7 @@ const char *command[] = { "SET", "GET", "DEL", "MOD", "EXIST", "RSET", "RGET", "RDEL", "RMOD", "REXIST", "HSET", "HGET", "HDEL", "HMOD", "HEXIST", - "SAVE", "SSYNC", "SREADY" + "SAVE", "SSYNC", "SREADY", "MEMPRINT" }; @@ -553,6 +554,12 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { __sready(); *out_value = resp_simple("OK"); return 0; + case KVS_CMD_MEM_PRINT:{ + int ret = kvs_mem_printf(); + printf("ret %d\n", ret); + *out_value = resp_int(ret); + return 0; + } default: break; } diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h index dcbf2b7..93fbf24 100644 --- a/kvs_protocol_resp.h +++ b/kvs_protocol_resp.h @@ -39,6 +39,7 @@ typedef enum { KVS_CMD_SAVE, KVS_CMD_SSYNC, KVS_CMD_SREADY, + KVS_CMD_MEM_PRINT, KVS_CMD_COUNT, }kvs_cmd_t; diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index abe17e1..64c88b0 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -507,12 +507,6 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off if (!t) { return -4; } - // int res = task_wait(t); - task_destroy(t); - - // if (res < 0) { - // return -5; - // } *current_off += (off_t) total; diff --git a/kvstore.c b/kvstore.c index 5fcd71e..ba26a19 100644 --- a/kvstore.c +++ b/kvstore.c @@ -197,9 +197,7 @@ void init_memory_pool(AppConfig *cfg){ } void dest_memory_pool(void){ -#if MEMORY_USE_MYMALLOC mp_destroy(&global_mempool); -#endif } static int ensure_dir_exists(const char *dir) @@ -270,7 +268,7 @@ int init_config(AppConfig *cfg){ } void init_disk_uring(iouring_ctx_t *uring_ctx){ - iouring_init(uring_ctx, 256); + iouring_init(uring_ctx, 2048); } diff --git a/memory/alloc_dispatch.c b/memory/alloc_dispatch.c index 4a1ee7e..4779812 100644 --- a/memory/alloc_dispatch.c +++ b/memory/alloc_dispatch.c @@ -72,6 +72,10 @@ void kvs_free_impl(void *ptr) { } } +int kvs_mem_printf(){ + return mp_print(&global_mempool); +} + void *nMalloc(size_t size, const char * filename, const char *func, int line){ void *ptr = kvs_malloc_impl(size); diff --git a/memory/alloc_dispatch.h b/memory/alloc_dispatch.h index 0bb7f0c..ff3eb82 100644 --- a/memory/alloc_dispatch.h +++ b/memory/alloc_dispatch.h @@ -30,6 +30,7 @@ MemLeakDetectMode kvs_get_memleak_detect(void); void *kvs_malloc_impl(size_t size); void kvs_free_impl(void *ptr); +int kvs_mem_printf(); void *nMalloc(size_t size, const char * filename, const char *func, int line); void nFree(void *ptr, const char * filename, const char *func, int line); diff --git a/memory/mempool.c b/memory/mempool.c index 5f3dc93..368397f 100644 --- a/memory/mempool.c +++ b/memory/mempool.c @@ -74,7 +74,7 @@ static mp_page_t* mp_page_create(mp_bucket_t *owner){ pg->prev = NULL; pg->next = NULL; - bitmap_clear_all(pg->bitmap, 20); + bitmap_clear_all(pg->bitmap, 16); char *p = (char*)page_payload(pg); for(uint16_t i = 0;i < cap - 1; ++ i){ @@ -313,4 +313,18 @@ int mp_free(mp_pool_t *pool, void *ptr){ } return 0; +} + +int mp_print(mp_pool_t *pool){ + int ret = 0; + printf("------\n"); + for(int i = 0; i < MEMPOOL_NUM_CLASSES; i++){ + mp_bucket_t *bucket = &pool->buckets[i]; + if(bucket->page_count) ret += bucket->page_count; + printf("size:%ld, page:%d, empty:%d\n", bucket->block_size, bucket->page_count, bucket->empty_count); + } + printf("------\n"); + + printf("page count: %d\n", ret); + return ret; } \ No newline at end of file diff --git a/memory/mempool.h b/memory/mempool.h index b3df5c0..e44d50e 100644 --- a/memory/mempool.h +++ b/memory/mempool.h @@ -5,13 +5,14 @@ #include #include #include +#include // #define MEMPOOL_PAGE_SIZE 4096 -#define MEMPOOL_PAGE_SIZE (1024*8) +#define MEMPOOL_PAGE_SIZE (4096*2) #define MEMPOOL_BLOCK_MAX_SIZE 512 #define MEMPOOL_ALIGNMENT 8 #define MEMPOOL_NUM_CLASSES (MEMPOOL_BLOCK_MAX_SIZE / MEMPOOL_ALIGNMENT) -#define MEMPOOL_CACHE_PAGE 4 +#define MEMPOOL_CACHE_PAGE 2 typedef struct mp_page_s mp_page_t; typedef struct mp_bucket_s mp_bucket_t; @@ -35,7 +36,7 @@ struct mp_page_s{ uint16_t free_count; uint16_t capacity; - uint64_t bitmap[20]; // 最多支持 512/1280 个块 (64*20) + uint64_t bitmap[16]; // 最多支持 512/1280 个块 (64*20) }; struct mp_bucket_s{ @@ -56,7 +57,7 @@ struct mp_large_s{ struct mp_pool_s{ mp_bucket_t buckets[MEMPOOL_NUM_CLASSES]; - mp_large_t *large_list; + mp_large_t *large_list; }; int mp_create(mp_pool_t *pool); @@ -65,4 +66,6 @@ int mp_destroy(mp_pool_t *pool); void *mp_alloc(mp_pool_t *pool, size_t size); int mp_free(mp_pool_t *pool, void *ptr); +int mp_print(mp_pool_t *pool); + #endif diff --git a/reactor.c b/reactor.c index 4bb7ecd..c38289c 100644 --- a/reactor.c +++ b/reactor.c @@ -1,6 +1,6 @@ - +#define _GNU_SOURCE #include #include #include @@ -13,6 +13,7 @@ #include #include #include +#include #include "server.h" @@ -33,6 +34,9 @@ typedef int (*msg_handler)(struct conn* conn); static msg_handler kvs_handler; +extern void cleanup_finished_iouring_tasks(); + + // 0 need more, -1 error, =1 suc int kvs_request(struct conn *c) { // int consumed_out = kvs_handler(c->rbuffer, c->rlength, c->wbuffer, &c->wlength); @@ -61,7 +65,7 @@ int epfd = 0; struct timeval begin; int wakeup_fd = -1; - +int timer_fd = -1; struct conn conn_list[CONNECTION_SIZE] = {0}; // fd @@ -296,31 +300,6 @@ int send_cb(int fd) { // wakup fd -int handle_wakeup_fd_cb(int fd); - -int init_wakeup_fd(void) { - wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - if (wakeup_fd < 0) { - printf("eventfd failed: errno=%d %s\n", errno, strerror(errno)); - return -1; - } - - conn_list[wakeup_fd].fd = wakeup_fd; - conn_list[wakeup_fd].r_action.recv_callback = handle_wakeup_fd_cb; - set_event(wakeup_fd, EPOLLIN, 1); - - return 0; -} - -// EPOLLOUT -void sync_wakeup(int fd) { - if (wakeup_fd < 0) return; - set_event(fd, EPOLLOUT, 0); - - uint64_t one = 1; - ssize_t n = write(wakeup_fd, &one, sizeof(one)); -} - int handle_wakeup_fd_cb(int fd) { uint64_t v; while (1) { @@ -329,9 +308,65 @@ int handle_wakeup_fd_cb(int fd) { if (n < 0 && errno == EAGAIN) break; // 已经读空 break; } + cleanup_finished_iouring_tasks(); + return 0; } +int init_wakeup_fd(void) { + int wfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (wfd < 0) { + printf("eventfd failed: errno=%d %s\n", errno, strerror(errno)); + return -1; + } + + conn_list[wfd].fd = wfd; + conn_list[wfd].r_action.recv_callback = handle_wakeup_fd_cb; + set_event(wfd, EPOLLIN, 1); + + return wfd; +} + +// EPOLLOUT +void sync_wakeup() { + if (wakeup_fd < 0) return; + // set_event(wakeup_fd, EPOLLOUT, 0); + + uint64_t one = 1; + ssize_t n = write(wakeup_fd, &one, sizeof(one)); +} + + + +// 定时器 +int handle_timer_fd_cb(int fd){ + uint64_t v; + while (1) { + ssize_t n = read(fd, &v, sizeof(v)); + if (n == sizeof(v)) { + continue; + } + if (n < 0 && errno == EAGAIN) break; + break; + } +} + +int init_timer_fd(void){ + int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + + struct itimerspec its = { + .it_interval = {1, 0}, // 每 1 秒 + .it_value = {1, 0}, // 1 秒后首次触发 + }; + timerfd_settime(tfd, 0, &its, NULL); + + conn_list[tfd].fd = tfd; + conn_list[tfd].r_action.recv_callback = handle_timer_fd_cb; + set_event(tfd, EPOLLIN, 1); + + return tfd; +} + int r_init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); @@ -365,11 +400,19 @@ int reactor_start(unsigned short port, msg_handler handler) { epfd = epoll_create(1); - if(init_wakeup_fd() < 0){ + wakeup_fd = init_wakeup_fd(); + if(wakeup_fd < 0){ close(epfd); return -1; } + // timer_fd = init_timer_fd(); + // if(timer_fd < 0){ + // close(epfd); + // close(wakeup_fd); + // return -1; + // } + int i = 0; for (i = 0;i < MAX_PORTS;i ++) { @@ -386,23 +429,15 @@ int reactor_start(unsigned short port, msg_handler handler) { gettimeofday(&begin, NULL); while (1) { // mainloop - struct epoll_event events[1024] = {0}; int nready = epoll_wait(epfd, events, 1024, -1); + // cleanup_finished_iouring_tasks(); int i = 0; for (i = 0;i < nready;i ++) { int connfd = events[i].data.fd; -#if 0 - if (events[i].events & EPOLLIN) { - conn_list[connfd].r_action.recv_callback(connfd); - } else if (events[i].events & EPOLLOUT) { - conn_list[connfd].send_callback(connfd); - } - -#else if (events[i].events & EPOLLIN) { // printf("connlist:%p, r_action:%p, recv_callaback:%p\n", &conn_list[connfd], &conn_list[connfd].r_action, conn_list[connfd].r_action.recv_callback); conn_list[connfd].r_action.recv_callback(connfd); @@ -411,7 +446,6 @@ int reactor_start(unsigned short port, msg_handler handler) { if (events[i].events & EPOLLOUT) { conn_list[connfd].send_callback(connfd); } -#endif } } diff --git a/test-redis/test.c b/test-redis/test.c index 555bb17..1bbf95f 100644 --- a/test-redis/test.c +++ b/test-redis/test.c @@ -91,12 +91,35 @@ void special_char_test(redisContext *c){ } void save(redisContext *c){ - must_ok((redisReply*)redisCommand(c, "SAVE"), "SET binary"); + must_ok((redisReply*)redisCommand(c, "SAVE"), "SAVE"); printf("[OK] SAVE\n"); } -void pipline_set_test(redisContext *c, int start, int countN, const char *op){ +void printmem(redisContext *c){ + redisReply *r = (redisReply*)redisCommand(c, "MEMPRINT"); + if (r == NULL) { + // 连接错误或命令发送失败 + printf("redisCommand failed: %s\n", c->errstr); + return; + } + + if (r->type == REDIS_REPLY_INTEGER) { + if (r->integer == 1) { + printf("MEMPRINT returned 1\n"); + } else { + printf("MEMPRINT returned %lld\n", r->integer); + } + } else if (r->type == REDIS_REPLY_ERROR) { + printf("Redis error: %s\n", r->str); + } else { + printf("Unexpected reply type: %d\n", r->type); + } + + printf("[OK] PRINT\n"); +} + +void pipline_set_test(redisContext *c, int start, int countN, const char *op, char* k, char *v){ /* ---------- 3) Pipeline 批处理测试 ---------- */ const int N = countN; @@ -104,8 +127,8 @@ void pipline_set_test(redisContext *c, int start, int countN, const char *op){ int end = start + N; for (int i = start; i < end; i++) { char kk[64], vv[64]; - int kn = snprintf(kk, sizeof(kk), "p:%d", i); - int vn = snprintf(vv, sizeof(vv), "v:%d", i); + int kn = snprintf(kk, sizeof(kk), "%s:%d", k, i); + int vn = snprintf(vv, sizeof(vv), "%s:%d", v, i); if (redisAppendCommand( c, "%s %b %b", op, kk, (size_t)kn, @@ -127,14 +150,14 @@ void pipline_set_test(redisContext *c, int start, int countN, const char *op){ PRINT("[OK] SET pipeline batch %d\n", N); } -void pipline_get_test(redisContext *c, int start, int countN, const char *op){ +void pipline_get_test(redisContext *c, int start, int countN, const char *op, char* k, char *v){ const int N = countN; /* pipeline GET + 校验 */ int end = start + N; for (int i = start; i < end; i++) { char kk[64]; - int kn = snprintf(kk, sizeof(kk), "p:%d", i); + int kn = snprintf(kk, sizeof(kk), "%s:%d", k, i); if (redisAppendCommand( c, "%s %b", op, kk, (size_t)kn) != REDIS_OK) { @@ -147,7 +170,7 @@ void pipline_get_test(redisContext *c, int start, int countN, const char *op){ redisReply *r = NULL; if (redisGetReply(c, (void**)&r) != REDIS_OK || !r) die(c, "redisGetReply GET failed"); char expect[64]; - int en = snprintf(expect, sizeof(expect), "v:%d", i); + int en = snprintf(expect, sizeof(expect), "%s:%d", v, i); must_bulk_eq(r, expect, (size_t)en, "pipeline GET reply"); if(i%10000 == 0) PRINT("RECV: %d\n", i); @@ -156,14 +179,14 @@ void pipline_get_test(redisContext *c, int start, int countN, const char *op){ PRINT("[OK] GET pipeline batch %d\n", N); } -void pipline_del_test(redisContext *c, int start, int countN, const char *op){ +void pipline_del_test(redisContext *c, int start, int countN, const char *op, char* k){ const int N = countN; /* cleanup:pipeline DEL */ int end = start + N; for (int i = start; i < end; i++) { char kk[64]; - int kn = snprintf(kk, sizeof(kk), "p:%d", i); + int kn = snprintf(kk, sizeof(kk), "%s:%d", k, i); if (redisAppendCommand( c, "%s %b", op, kk, (size_t)kn) != REDIS_OK) { @@ -190,15 +213,15 @@ long long test_nopersist_noreplica(redisContext *c, int rounds, long long batch_ long long total_ops = batch_size*rounds; for(int i = 0;i < total_ops ; i += batch_size){ - pipline_set_test(c, i, batch_size, "RSET"); + pipline_set_test(c, i, batch_size, "RSET", "p", "v"); } for(int i = 0;i < total_ops ; i += batch_size){ - pipline_get_test(c, i, batch_size, "RGET"); + pipline_get_test(c, i, batch_size, "RGET", "p", "v"); } for(int i = 0;i < total_ops ; i += batch_size){ - pipline_del_test(c, i, batch_size, "RDEL"); + pipline_del_test(c, i, batch_size, "RDEL", "p"); } gettimeofday(&tv_end, NULL); @@ -209,6 +232,35 @@ long long test_nopersist_noreplica(redisContext *c, int rounds, long long batch_ return qps; } +long long test_memory(redisContext *c, int rounds, long long batch_size, int mod){ + struct timeval tv_begin, tv_end; + gettimeofday(&tv_begin, NULL); + + long long total_ops = batch_size*rounds; + + if(mod == 1){ + for(int i = 0;i < total_ops ; i += batch_size){ + pipline_set_test(c, i, batch_size, "RSET","p1","v"); + pipline_set_test(c, i, batch_size, "RSET","p2","v"); + pipline_del_test(c, i, batch_size, "RDEL","p1"); + } + }else if(mod == 2){ + for(int i = 0;i < total_ops ; i += batch_size){ + pipline_set_test(c, i, batch_size, "RSET","p3","v"); + pipline_del_test(c, i, batch_size, "RDEL","p2"); + pipline_del_test(c, i, batch_size, "RDEL","p3"); + } + } + + + gettimeofday(&tv_end, NULL); + int time_used = TIME_SUB_MS(tv_end, tv_begin); + long long qps = total_ops *6*1000/time_used; + printf("BATCH (N=%lld) --> time_used=%d ms, qps=%lld\n", total_ops *3, time_used, qps); + + return qps; +} + int main(int argc, char **argv) { if(argc < 4) { printf("invalid input\n"); @@ -227,6 +279,8 @@ int main(int argc, char **argv) { if(mode == 0){ save(c); }else if(mode == 1){ + printmem(c); + }else if(mode == 2){ basic_command_test(c); }else if(mode == 3){ int rounds = 10; @@ -239,29 +293,47 @@ int main(int argc, char **argv) { } printf("average qps:%lld\n", total_qps/testrounds); }else if(mode == 4){ + int rounds = 10; + long long batch_size = 100000; + int testrounds = 5; + long long total_qps = 0; - }else if(mode == 5){ + for(int i = 0;i < testrounds; ++ i){ + total_qps += test_nopersist_noreplica(c, rounds, batch_size); + } + printf("average qps:%lld\n", total_qps/testrounds); + + }else if(mode == 51){ + int rounds = 30; + long long batch_size = 100000; + test_memory(c, rounds, batch_size, 1); + + + }else if(mode == 52){ + int rounds = 30; + long long batch_size = 100000; + test_memory(c, rounds, batch_size, 2); }else if(mode == 10){ - pipline_set_test(c, 0, 1000, "SET"); + pipline_set_test(c, 0, 1000, "SET","p","v"); }else if(mode == 11){ - pipline_get_test(c, 0, 1000, "GET"); + pipline_get_test(c, 0, 1000, "GET","p", "v"); }else if(mode == 12){ - pipline_del_test(c, 0, 1000, "DEL"); + pipline_del_test(c, 0, 1000, "DEL","p"); }else if(mode == 20){ - pipline_set_test(c, 0, 1000, "RSET"); + pipline_set_test(c, 0, 1000, "RSET","p","v"); }else if(mode == 21){ - pipline_get_test(c, 0, 1000, "RGET"); + pipline_get_test(c, 0, 1000, "RGET","p", "v"); }else if(mode == 22){ - pipline_del_test(c, 0, 1000, "RDEL"); + pipline_del_test(c, 0, 1000, "RDEL","p"); }else if(mode == 30){ - pipline_set_test(c, 0, 1000, "HSET"); + pipline_set_test(c, 0, 1000, "HSET","p","v"); }else if(mode == 31){ - pipline_get_test(c, 0, 1000, "HGET"); + pipline_get_test(c, 0, 1000, "HGET","p", "v"); }else if(mode == 32){ - pipline_del_test(c, 0, 1000, "HDEL"); + pipline_del_test(c, 0, 1000, "HDEL","p"); } redisFree(c);