diff --git a/Makefile b/Makefile index 02c5d99..35297ea 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ CC = gcc CFLAGS = -g -DJEMALLOC_NO_DEMANGLE NET_SRCS = ntyco.c proactor.c reactor.c kvstore.c -KV_SRCS = kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_protocol_resp.c kvs_slave.c +KV_SRCS = kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_protocol_resp.c kvs_slave.c replica_shm.c MEM_SRCS = ./memory/mempool.c ./memory/alloc_dispatch.c COMMON_SRCS = ./common/config.c ./diskuring/diskuring.c DUMP_SRCS = ./dump/kvs_snapshot.c ./dump/kvs_oplog.c @@ -12,8 +12,8 @@ SRCS = $(NET_SRCS) $(KV_SRCS) $(MEM_SRCS) $(COMMON_SRCS) $(DUMP_SRCS) INC = -I./NtyCo/core/ -I/usr/include/libxml2 -I./ LIBDIRS = -L./NtyCo/ -# LIBS = -lntyco -lpthread -luring -ldl -lxml2 -ljemalloc -LIBS = -lntyco -lpthread -luring -ldl -lxml2 +# LIBS = -lntyco -lpthread -luring -ldl -lxml2 -lrt -ljemalloc +LIBS = -lntyco -lpthread -luring -ldl -lxml2 -lrt TARGET = kvstore SUBDIR = ./NtyCo/ diff --git a/README.md b/README.md index bd72d58..d4054dd 100644 --- a/README.md +++ b/README.md @@ -253,13 +253,13 @@ VIRT 58504 RES 4604 插入 20w 删除 10w,重复 10 次,共计插入 200w 删除 100w。 -BATCH (N=3000000) --> time_used=3320 ms, qps=1807228 +BATCH (N=9000000) --> time_used=12897 ms, qps=1395673 -VIRT 208M -RES 155M +VIRT 489M +RES 430M 插入 10w 删除 20w,重复 10 次,共计插入 100w 删除 200w。 -BATCH (N=3000000) --> time_used=3097 ms, qps=1937358 +BATCH (N=9000000) --> time_used=10033 ms, qps=1794079 VIRT 208M RES 155M @@ -268,6 +268,27 @@ RES 155M ![alt text](image12.png) ![alt text](image13.png) +#### jemalloc +```shell +VIRT 69376 +RES 5408 + +插入 20w 删除 10w,重复 30 次,共计插入 600w 删除 300w。 +BATCH (N=9000000) --> time_used=9436 ms, qps=1907587 + +VIRT 356M +RES 294M + +插入 10w 删除 20w,重复 30 次,共计插入 300w 删除 600w。 +BATCH (N=9000000) --> time_used=9353 ms, qps=1924516 + +VIRT 356M +RES 119M +``` +![alt text](image11.png) +![alt text](image22.png) +![alt text](image23.png) + #### mypool ```shell VIRT 58504 @@ -289,22 +310,34 @@ RES 71492 ![alt text](image32.png) ![alt text](image33.png) -#### jemalloc +### 测试4:主从同步 +测试条件: +1. 不启用持久化。 +2. 启用主从同步。 +3. pipline: + 1. RSET 100w 条, p:i v:i -> +OK + 2. RGET 100w 条, p:i -> +v:i + 3. RDEL 100w 条。 p:i -> +OK +5. 本机发送请求。 ```shell -VIRT 69376 -RES 5408 - -插入 20w 删除 10w,重复 30 次,共计插入 600w 删除 300w。 -BATCH (N=9000000) --> time_used=9436 ms, qps=1907587 - -VIRT 356M -RES 294M - -插入 10w 删除 20w,重复 30 次,共计插入 300w 删除 600w。 -BATCH (N=9000000) --> time_used=9353 ms, qps=1924516 - -VIRT 356M -RES 119M +lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 4 +Connected to 192.168.10.129:8888 +BATCH (N=3000000) --> time_used=3702 ms, qps=810372 +BATCH (N=3000000) --> time_used=3804 ms, qps=788643 +BATCH (N=3000000) --> time_used=4076 ms, qps=736015 +BATCH (N=3000000) --> time_used=3840 ms, qps=781250 +BATCH (N=3000000) --> time_used=3824 ms, qps=784518 +average qps:780159 +ALL TESTS PASSED. +lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 4 +Connected to 192.168.10.129:8888 +BATCH (N=3000000) --> time_used=3958 ms, qps=757958 +BATCH (N=3000000) --> time_used=4043 ms, qps=742023 +BATCH (N=3000000) --> time_used=3729 ms, qps=804505 +BATCH (N=3000000) --> time_used=3989 ms, qps=752068 +BATCH (N=3000000) --> time_used=3603 ms, qps=832639 +average qps:777838 +ALL TESTS PASSED. ``` ### 面试题 diff --git a/common/config.c b/common/config.c index e960007..80a1f99 100644 --- a/common/config.c +++ b/common/config.c @@ -58,6 +58,7 @@ static void set_default_config(AppConfig *cfg) cfg->persistence = PERSIST_NONE; cfg->allocator = ALLOC_JEMALLOC; cfg->leak_mode = MEMLEAK_DETECT_OFF; + cfg->replica_mode = REPLICA_DISABLE; } /* ---- 字符串转枚举 ---- */ @@ -103,6 +104,14 @@ static void parse_leakage(const char *s, MemLeakDetectMode *out) else *out = MEMLEAK_DETECT_OFF; } +static void parse_replica(const char *s, ReplicaMode *out) +{ + if (!s || !out) return; + if (!strcasecmp(s, "enable")) *out = REPLICA_ENABLE; + else if (!strcasecmp(s, "disable")) *out = REPLICA_DISABLE; + else *out = REPLICA_DISABLE; +} + static int read_file_mmap(const char *filename, void **out_addr, size_t *out_len, int *out_fd) { if (!filename || !out_addr || !out_len || !out_fd) return -1; @@ -199,6 +208,16 @@ const char *leakage_to_string(MemLeakDetectMode a) } } +const char *replica_to_string(ReplicaMode a) +{ + switch (a) { + case MEMLEAK_DETECT_ON: return "enable"; + case MEMLEAK_DETECT_OFF: return "disable"; + default: return "unknown"; + } +} + + /* ---- 主函数:从 XML 加载配置 ---- */ /* server 部分 */ @@ -236,6 +255,15 @@ void server_load(xmlNodePtr *root, AppConfig *out_cfg){ } } + xmlNodePtr replica_node = find_child(server, "replica"); + if (replica_node) { + xmlChar *txt = xmlNodeGetContent(replica_node); + if (txt) { + parse_replica((char *)txt, &out_cfg->replica_mode); + xmlFree(txt); + } + } + /* master (always read if present) */ xmlNodePtr master = find_child(server, "master"); if (master) { diff --git a/common/config.h b/common/config.h index 6a48409..8a7a069 100644 --- a/common/config.h +++ b/common/config.h @@ -20,6 +20,11 @@ typedef enum { PERSIST_NONE } PersistenceType; +typedef enum { + REPLICA_DISABLE, + REPLICA_ENABLE +}ReplicaMode; + // typedef enum { // ALLOC_JEMALLOC, // ALLOC_MALLOC, @@ -51,6 +56,7 @@ typedef struct { AllocatorType allocator; MemLeakDetectMode leak_mode; + ReplicaMode replica_mode; } AppConfig; /** @@ -64,5 +70,7 @@ const char *server_mode_to_string(ServerMode mode); const char *persistence_to_string(PersistenceType p); const char *allocator_to_string(AllocatorType a); const char *leakage_to_string(MemLeakDetectMode a); +const char *replica_to_string(ReplicaMode a); + #endif /* CONFIG_H */ diff --git a/config/config.xml b/config/config.xml index 877c906..df40a51 100644 --- a/config/config.xml +++ b/config/config.xml @@ -6,6 +6,7 @@ master + enable 192.168.10.129 8888 diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index 4071491..5914653 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -67,53 +67,30 @@ static void queue_push(iouring_ctx_t *ctx, task_t *t) pthread_mutex_unlock(&ctx->q_m); } -static void queue_push_front(iouring_ctx_t *ctx, task_t *list_head, task_t *list_tail) { - pthread_mutex_lock(&ctx->q_m); - list_tail->next = ctx->q_head; - ctx->q_head = list_head; - if (!ctx->q_tail) { - ctx->q_tail = list_tail; - } - pthread_cond_signal(&ctx->q_cv); - pthread_mutex_unlock(&ctx->q_m); } - -static task_t *queue_pop_all(iouring_ctx_t *ctx) +static task_t *queue_pop(iouring_ctx_t *ctx) { pthread_mutex_lock(&ctx->q_m); - task_t *list = ctx->q_head; - ctx->q_head = ctx->q_tail = NULL; + task_t *t = ctx->q_head; + if (t) { + ctx->q_head = t->next; + if (!ctx->q_head) { + ctx->q_tail = NULL; + } + t->next = NULL; + } pthread_mutex_unlock(&ctx->q_m); - return list; + return t; } -static task_t *queue_pop_n(iouring_ctx_t *ctx, int n) +static void queue_push_front(iouring_ctx_t *ctx, task_t *t) { - if (n <= 0) - return NULL; pthread_mutex_lock(&ctx->q_m); - task_t *head = ctx->q_head; - if (!head) { - pthread_mutex_unlock(&ctx->q_m); - return NULL; + t->next = ctx->q_head; + ctx->q_head = t; + if (!ctx->q_tail) { + ctx->q_tail = t; } - task_t *curr = head; - task_t *prev = NULL; - int count = 0; - - while (curr && count < n) { - prev = curr; - curr = curr->next; - count++; - } - - ctx->q_head = curr; - if (!curr) { - // 队列被取空 - ctx->q_tail = NULL; - } - prev->next = NULL; pthread_mutex_unlock(&ctx->q_m); - return head; } extern void sync_wakeup(); @@ -126,25 +103,26 @@ static void *worker_main(void *arg) { int cq_count = 0; - // ========== 1. 疯狂收割 CQE(必须优先做,释放 in_flight 额度)========== - // 使用 while 而不是 if,确保把 CQ 薅干净 + // ========== 1. 收割 CQE ========== + // 检查溢出 + if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { + fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n"); + abort(); + } + while (true) { struct io_uring_cqe *cqe; unsigned head; io_uring_for_each_cqe(&ctx->ring, head, cqe) { - task_t *done = (task_t *)(uintptr_t)cqe->user_data; - - // 先减计数(必须在处理前减,否则可能瞬间突破上限) + task_t *done = (task_t *)(uintptr_t)cqe->user_data; atomic_fetch_sub(&ctx->in_flight, 1); - task_finish(done, cqe->res); if (cqe->res < 0) { - fprintf(stderr, "write fail: fd=%d res=%d\n", done->fd, cqe->res); + fprintf(stderr, "write fail: fd=%d res=%d, offset=%ld\n", done->fd, cqe->res, done->off); } - // 加入销毁队列 pthread_mutex_lock(&g_destroy_queue.lock); done->next = g_destroy_queue.head; g_destroy_queue.head = done; @@ -159,99 +137,87 @@ static void *worker_main(void *arg) sync_wakeup(); } - // 如果这次没收满,说明 CQ 空了,退出收割循环 if (cq_count == 0) break; - cq_count = 0; // 重置继续薅(可能有新的完成了) + cq_count = 0; } - // 检查溢出(保险起见,虽然有了背压不该再溢出) - if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { - fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n"); - abort(); // 直接崩溃,说明逻辑有 bug - } - // ========== 2. 计算还能提交多少 ========== - int current_in_flight = atomic_load(&ctx->in_flight); - int available_slots = ctx->max_in_flight - current_in_flight; - - if (available_slots <= 0) { - // 满了!不能取新任务,必须等待 CQE(忙等或阻塞等) - - // 方案 B:阻塞等 CQE(推荐) - struct io_uring_cqe *cqe; - int ret = io_uring_wait_cqe(&ctx->ring, &cqe); - if (ret == 0 && !ctx->stop) { - // 收到一个 CQE,回循环开头处理 - continue; - } - continue; - } - // ========== 3. 从任务队列取任务(只取 available_slots 个)========== - - task_t *task_list = queue_pop_n(ctx, available_slots); - if (!task_list) { - if (!ctx->stop && atomic_load(&ctx->in_flight) > 0) { - int ret = io_uring_submit_and_wait(&ctx->ring, 1); - continue; - } - // 没任务,等待条件变量 - pthread_mutex_lock(&ctx->q_m); - while (ctx->q_head == NULL && !ctx->stop) { - pthread_cond_wait(&ctx->q_cv, &ctx->q_m); - } - pthread_mutex_unlock(&ctx->q_m); - continue; - } - - // ========== 4. 准备 SQE(受限于 available_slots)========== + // ========== 2. 批量准备 SQE ========== int batch_count = 0; - task_t *curr = task_list; - task_t *prev = NULL; - task_t *submitted_head = task_list; // 记录这次实际要提交的部分 - task_t *remaining_head = NULL; // 装不下的部分 - - while (curr && batch_count < available_slots) { + while (true) { + int current_in_flight = atomic_load(&ctx->in_flight); + if (current_in_flight >= ctx->max_in_flight) { + break; // 满了,停止取任务 + } + task_t *t = queue_pop(ctx); + if (!t) break; struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); - if (!sqe) { - // SQ 满了(这种情况在控制 inflight 后很少见,但保险起见) + queue_push_front(ctx, t); break; } - - io_uring_prep_writev(sqe, curr->fd, curr->iovs, curr->iovcnt, curr->off); - sqe->user_data = (uint64_t)(uintptr_t)curr; - + + io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); + sqe->user_data = (uint64_t)(uintptr_t)t; batch_count++; - prev = curr; - curr = curr->next; } - // 断开链表:已准备的 和 未准备的 - if (prev) { - prev->next = NULL; // 已提交的部分结尾 - } - remaining_head = curr; // 剩下的部分(如果有) - - // ========== 5. 提交并增加计数 ========== + // ========== 3. 提交 ========== if (batch_count > 0) { int submitted = io_uring_submit(&ctx->ring); - if (submitted != batch_count) { - fprintf(stderr, "CRITICAL: prep %d but submit %d\n", batch_count, submitted); - // 这种情况很严重,说明 ring 损坏了,建议 abort - abort(); - } - - atomic_fetch_add(&ctx->in_flight, submitted); push_to_sqe += submitted; + atomic_fetch_add(&ctx->in_flight, submitted); + + continue; } - // ========== 6. 把没提交的任务塞回队列头部(保持顺序)========== - if (remaining_head) { - task_t *tail = remaining_head; - while (tail->next) tail = tail->next; + + // ========== 4. 没事做就等待 ========== + if (batch_count == 0) { + int inflight = atomic_load(&ctx->in_flight); + if (inflight > 0) { + // 有任务在飞,等一个CQE + continue; + } else { + // 真没事了,等新任务 + pthread_mutex_lock(&ctx->q_m); + while (ctx->q_head == NULL && !ctx->stop) { + pthread_cond_wait(&ctx->q_cv, &ctx->q_m); + } + pthread_mutex_unlock(&ctx->q_m); + } + } + } - queue_push_front(ctx, remaining_head, tail); + printf("Shutdown: draining remaining CQEs...\n"); + int final_cq = 0; + struct io_uring_cqe *cqe; + unsigned head; + + while (atomic_load(&ctx->in_flight) > 0) { + io_uring_for_each_cqe(&ctx->ring, head, cqe) { + task_t *done = (task_t *)(uintptr_t)cqe->user_data; + atomic_fetch_sub(&ctx->in_flight, 1); + task_finish(done, cqe->res); + + pthread_mutex_lock(&g_destroy_queue.lock); + done->next = g_destroy_queue.head; + g_destroy_queue.head = done; + pthread_mutex_unlock(&g_destroy_queue.lock); + + get_from_cqe++; + final_cq++; + } + + if (final_cq > 0) { + io_uring_cq_advance(&ctx->ring, final_cq); + final_cq = 0; + } + + // 如果还有 inflight,等一下 + if (atomic_load(&ctx->in_flight) > 0) { + io_uring_submit_and_wait(&ctx->ring, 1); } } @@ -260,12 +226,6 @@ static void *worker_main(void *arg) return NULL; } -int iouring_register_fd(iouring_ctx_t *ctx, int fd) { - int fds[1] = {fd}; - int ret = io_uring_register_files(&ctx->ring, fds, 1); - return ret; -} - int iouring_init(iouring_ctx_t *ctx, unsigned entries) { memset(ctx, 0, sizeof(*ctx)); @@ -352,8 +312,17 @@ task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int return t; } +int uring_task_complete(iouring_ctx_t *ctx){ + pthread_mutex_lock(&ctx->q_m); + int notask = ctx->q_head == NULL; + pthread_mutex_unlock(&ctx->q_m); + int noflight = atomic_load(&ctx->in_flight); + // printf("%d\n", noflight); + return (noflight == 0) && notask; +} + // 主线程定期调用此函数清理 -void cleanup_finished_iouring_tasks() { +void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { pthread_mutex_lock(&g_destroy_queue.lock); task_t *list = g_destroy_queue.head; g_destroy_queue.head = NULL; @@ -366,8 +335,6 @@ void cleanup_finished_iouring_tasks() { task_destroy(list); // 在主线程执行销毁 list = next; } - // printf("clean: %d\n\n", cnt); - // mp_print(); release_cnt += cnt; // printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt); } \ No newline at end of file diff --git a/diskuring/diskuring.c.nothread b/diskuring/diskuring.c.nothread new file mode 100644 index 0000000..3fded18 --- /dev/null +++ b/diskuring/diskuring.c.nothread @@ -0,0 +1,199 @@ +#include "diskuring.h" +#include "memory/alloc_dispatch.h" +#include +#include + +void task_init(task_t *t) +{ + t->done = 0; + t->res = 0; + t->next = NULL; +} + +void task_finish(task_t *t, int res) +{ + t->res = res; + t->done = 1; +} + +void task_destroy(task_t *t) +{ + if (t->iovs) { + for (int i = 0; i < t->iovcnt; i++) { + if (t->iovs[i].iov_base) { + kvs_free(t->iovs[i].iov_base); + } + } + kvs_free(t->iovs); + } + + kvs_free(t); +} + +int iouring_init(iouring_ctx_t *ctx, unsigned entries) +{ + memset(ctx, 0, sizeof(*ctx)); + + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + + // params.flags |= IORING_SETUP_CQSIZE; + // params.cq_entries = 256 * 1024; + // params.sq_entries = 128 * 1024; + int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); + if (ret < 0) { + fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n", + ret, strerror(-ret)); + return ret; + } + + unsigned cq_size = *ctx->ring.cq.kring_entries; + printf("Kernel CQ size: %u\n", cq_size); + + if (ret != 0) + { + io_uring_queue_exit(&ctx->ring); + return -ret; + } + return 0; +} + +void iouring_shutdown(iouring_ctx_t *ctx) +{ + io_uring_queue_exit(&ctx->ring); +} + + +void harvest_cqes(iouring_ctx_t *ctx) +{ + struct io_uring_cqe *cqe; + unsigned head; + int cq_count = 0; + + // 使用 for_each_cqe 薅干净 CQ + io_uring_for_each_cqe(&ctx->ring, head, cqe) { + task_t *done = (task_t *)(uintptr_t)cqe->user_data; + task_finish(done, cqe->res); + + if (cqe->res < 0) { + fprintf(stderr, "write fail: fd=%d res=%d\n", done->fd, cqe->res); + } + // 直接 destroy(单线程,无需全局队列) + task_destroy(done); + + cq_count++; + } + + if (cq_count > 0) { + // printf("harvest cq:%d\n", cq_count); + io_uring_cq_advance(&ctx->ring, cq_count); + } + + // 检查 CQ overflow(保险) + if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { + fprintf(stderr, "FATAL: CQ overflow detected!\n"); + abort(); + } +} + +task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){ + if (!bufs || !lens || count <= 0) return NULL; + + task_t *t = (task_t *)kvs_malloc(sizeof(task_t)); + task_init(t); + t->op = TASK_WRITE; + t->fd = fd; + t->off = off; + + t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count); + if(!t->iovs) { + kvs_free(t); + return NULL; + } + + for(int i = 0;i < count; ++ i){ + size_t len = lens[i]; + void *buf = kvs_malloc(len); + if(!buf){ + for(int j = 0; j < i; ++j){ + if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base); + } + kvs_free(t->iovs); + kvs_free(t); + return NULL; + } + memcpy(buf, bufs[i], len); + t->iovs[i].iov_base = buf; + t->iovs[i].iov_len = len; + } + + t->iovcnt = count; + + harvest_cqes(ctx); + + if(!ctx->head){ + ctx->head = t; + ctx->tail = t; + }else{ + ctx->tail->next = t; + ctx->tail = t; + } + + int submitted = 0; + while(true){ + task_t *cur = ctx->head; + if(!cur){ + break; + } + ctx->head = cur->next; + if (!ctx->head) { + ctx->tail = NULL; + } + cur->next = NULL; + + struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); + if (!sqe) { + break; + } + + io_uring_prep_writev(sqe, cur->fd, cur->iovs, cur->iovcnt, cur->off); + sqe->user_data = (uint64_t)(uintptr_t)cur; + + submitted++; + } + + if(submitted > 0){ + int ret = io_uring_submit(&ctx->ring); + } + + return t; +} + +void iouring_tick(iouring_ctx_t *ctx) { + + harvest_cqes(ctx); + + int submitted = 0; + while(ctx->head){ + struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); + if (!sqe) { + break; + } + task_t *cur = ctx->head; + ctx->head = cur->next; + if (!ctx->head) { + ctx->tail = NULL; + } + cur->next = NULL; + + io_uring_prep_writev(sqe, cur->fd, cur->iovs, cur->iovcnt, cur->off); + sqe->user_data = (uint64_t)(uintptr_t)cur; + + submitted++; + } + + + if(submitted > 0){ + int ret = io_uring_submit(&ctx->ring); + } +} \ No newline at end of file diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h index a91cbe9..4f985d2 100644 --- a/diskuring/diskuring.h +++ b/diskuring/diskuring.h @@ -18,7 +18,6 @@ typedef struct task { int fd; off_t off; - int refcount; int res; // cqe->res int done; // 0/1 @@ -49,7 +48,6 @@ typedef struct { pthread_mutex_t lock; } destroy_queue_t; -int iouring_register_fd(iouring_ctx_t *ctx, int fd); void task_init(task_t *t); void task_finish(task_t *t, int res); @@ -61,6 +59,7 @@ int iouring_init(iouring_ctx_t *ctx, unsigned entries); void iouring_shutdown(iouring_ctx_t *ctx); task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); +int uring_task_complete(iouring_ctx_t *ctx); void cleanup_finished_iouring_tasks(); diff --git a/diskuring/diskuring.h.nothread b/diskuring/diskuring.h.nothread new file mode 100644 index 0000000..615e9b3 --- /dev/null +++ b/diskuring/diskuring.h.nothread @@ -0,0 +1,50 @@ +#ifndef __DISK_IOURING_H__ +#define __DISK_IOURING_H__ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define BATCH_SIZE 256 +typedef enum { TASK_READ, TASK_WRITE } task_op_t; + +typedef struct task { + task_op_t op; + int fd; + off_t off; + + int res; // cqe->res + int done; // 0/1 + + struct iovec *iovs; // iovec 数组 + int iovcnt; // iovec 数量 + + struct task *next; +} task_t; + +typedef struct { + struct io_uring ring; + int pending_count; + + task_t *head; + task_t *tail; +} iouring_ctx_t; + +void task_init(task_t *t); +void task_finish(task_t *t, int res); +void task_destroy(task_t *t); + +int iouring_init(iouring_ctx_t *ctx, unsigned entries); +void iouring_shutdown(iouring_ctx_t *ctx); + +task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); +void iouring_tick(iouring_ctx_t *ctx); + +extern iouring_ctx_t global_uring_ctx; + +#endif \ No newline at end of file diff --git a/ebpf/c/Makefile b/ebpf/c/Makefile index 3a81dbb..5580d5d 100644 --- a/ebpf/c/Makefile +++ b/ebpf/c/Makefile @@ -130,7 +130,7 @@ $(BZS_APPS): $(LIBBLAZESYM_OBJ) # Build application binary $(APPS): %: $(OUTPUT)/%.o $(LIBBPF_OBJ) | $(OUTPUT) $(call msg,BINARY,$@) - $(Q)$(CC) $(CFLAGS) $^ $(ALL_LDFLAGS) -lelf -lz -o $@ + $(Q)$(CC) $(CFLAGS) $^ $(ALL_LDFLAGS) -lelf -lz -lrt -lpthread -o $@ # delete failed targets .DELETE_ON_ERROR: diff --git a/ebpf/c/replica.bpf.c b/ebpf/c/replica.bpf.c index 14c6778..ae39ec0 100644 --- a/ebpf/c/replica.bpf.c +++ b/ebpf/c/replica.bpf.c @@ -15,66 +15,46 @@ struct { __uint(value_size, sizeof(int)); } events SEC(".maps"); -/* __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); */ -SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__completed_cmd") -int BPF_KPROBE(handle_completed_cmd, - const __u8 *cmd, size_t len, __u64 seq) -{ - struct replica_event evt = {}; - __u32 copy_len; +// 1) notify: __replica_notify(seq, off, len) +// SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__replica_notify") +// int BPF_KPROBE(handle_replica_notify, __u64 seq, __u32 off, __u32 len) +// { +// struct replica_event evt = {}; +// evt.type = EVENT_CMD_META; +// evt.meta.seq = seq; +// evt.meta.off = off; +// evt.meta.len = len; - evt.type = EVENT_COMPLETED_CMD; - evt.complete.seq = seq; +// bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &evt, sizeof(evt)); +// return 0; +// } - copy_len = len; - if (copy_len > MAX_CMD_LEN) - copy_len = MAX_CMD_LEN; - - evt.complete.len = copy_len; - - if (cmd) - bpf_probe_read_user(evt.complete.cmd, copy_len, cmd); - - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, - &evt, sizeof(evt)); - return 0; -} - -/* __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); */ +// 2) ssync: __ssync(ip, ip_len, port, seq) SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__ssync") -int BPF_KPROBE(handle_ssync, - const __u8 *ip, __u32 ip_len, int port, __u64 seq) +int BPF_KPROBE(handle_ssync, const __u8 *ip, __u32 ip_len, int port, __u64 seq) { struct replica_event evt = {}; - evt.type = EVENT_SSYNC; evt.sync.seq = seq; evt.sync.port = port; __u32 copy_len = ip_len; - if (copy_len > sizeof(evt.sync.ip)) - copy_len = sizeof(evt.sync.ip); + if (copy_len > MAX_IP_LEN) copy_len = MAX_IP_LEN; + evt.sync.ip_len = copy_len; if (ip) bpf_probe_read_user(evt.sync.ip, copy_len, ip); - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, - &evt, sizeof(evt)); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &evt, sizeof(evt)); return 0; } -/* __sready(void); */ +// 3) sready: __sready() SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__sready") int BPF_KPROBE(handle_sready) { struct replica_event evt = {}; - evt.type = EVENT_SREADY; - - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, - &evt, sizeof(evt)); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &evt, sizeof(evt)); return 0; } \ No newline at end of file diff --git a/ebpf/c/replica.c b/ebpf/c/replica.c index 87d454c..3c9f9e3 100644 --- a/ebpf/c/replica.c +++ b/ebpf/c/replica.c @@ -10,202 +10,474 @@ #include #include #include +#include #include +#include +#include +#include +#include +#include "replica_shm.h" #include "replica.h" -typedef enum { - OFFLINE = 0, - ONLINE = 1, -}replica_state_e ; +#define DEBUGLOG(...) fprintf(stderr, __VA_ARGS__) -struct cmd_node { - __u64 seq; - __u32 len; - uint8_t *cmd; - struct cmd_node *next; -}; +/* ============================================================ */ +#define REPLICA_SHM_MAGIC 0x52504C43u /* 'RPLC' */ +#define REPLICA_SHM_VER 1 + +static inline uint64_t align8_u64(uint64_t x) { return (x + 7u) & ~7ull; } + +int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create) +{ + if (!s || !name || total_size < (sizeof(replica_shm_hdr_t) + 4096)) return -EINVAL; + memset(s, 0, sizeof(*s)); + + int flags = O_RDWR; + if (create) flags |= O_CREAT; + + int fd = shm_open(name, flags, 0666); + if (fd < 0) return -errno; + + if (create) { + if (ftruncate(fd, (off_t)total_size) != 0) { + int e = -errno; close(fd); return e; + } + } + + void *p = mmap(NULL, total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (p == MAP_FAILED) { + int e = -errno; close(fd); return e; + } + + s->fd = fd; + s->map_size = total_size; + s->hdr = (replica_shm_hdr_t *)p; + s->data = (uint8_t *)p + sizeof(replica_shm_hdr_t); + + // 初始化头 + if (create || s->hdr->magic != REPLICA_SHM_MAGIC) { + memset(s->hdr, 0, sizeof(*s->hdr)); + s->hdr->magic = REPLICA_SHM_MAGIC; + s->hdr->version = REPLICA_SHM_VER; + s->hdr->capacity = total_size - sizeof(replica_shm_hdr_t); + s->hdr->write_off = 0; + s->hdr->last_seq = 0; + } + + return 0; +} + +int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr) +{ + if (!s || !s->hdr || !s->data || !out_hdr) return -EINVAL; + if ((uint64_t)off + sizeof(replica_rec_hdr_t) > s->hdr->capacity) return -EINVAL; + memcpy(out_hdr, s->data + off, sizeof(*out_hdr)); + return 0; +} + +void replica_shm_close(replica_shm_t *s) +{ + if (!s) return; + if (s->hdr && s->map_size) munmap(s->hdr, s->map_size); + if (s->fd > 0) close(s->fd); + memset(s, 0, sizeof(*s)); +} + +/* ================================================================================*/ -struct pending_queue { - struct cmd_node *head; - struct cmd_node *tail; - int count; -}; /* ================= 全局状态 ================= */ -static replica_state_e state = OFFLINE; -static int sockfd = -1; +#define DEBUGLOG(...) fprintf(stderr, __VA_ARGS__) + +static replica_shm_t g_shm; +static int g_sockfd = -1; static char peer_ip[MAX_IP_LEN]; -static int peer_port; -static __u64 peer_seq; +static int peer_port = 0; +static uint64_t SYNC_SEQ = 0; -static struct pending_queue pending = { - .head = NULL, - .tail = NULL, - .count = 0, +static uint64_t local_seq = 0; +static uint32_t read_off = 0; + +static pthread_t reader_thread; +static pthread_t sender_thread; +static volatile int should_stop = 0; + +/* ================= ================= */ +struct send_node { + uint8_t *data; + uint32_t len; + uint32_t sent; + struct send_node *next; }; -/* ================= pending 队列操作 ================= */ -static void pending_free() +static struct { + struct send_node *head; + struct send_node *tail; + int count; + pthread_mutex_t lock; + pthread_cond_t not_empty; +} sendq = { + .lock = PTHREAD_MUTEX_INITIALIZER, + .not_empty = PTHREAD_COND_INITIALIZER +}; + +static void sendq_free_all(void) { - struct pending_queue *q = &pending; - struct cmd_node *cur = q->head; - while (cur) { - struct cmd_node *tmp = cur; - cur = cur->next; - free(tmp->cmd); - free(tmp); + pthread_mutex_lock(&sendq.lock); + struct send_node *c = sendq.head; + while (c) { + struct send_node *n = c->next; + free(c->data); + free(c); + c = n; } - q->head = q->tail = NULL; - q->count = 0; + sendq.head = sendq.tail = NULL; + sendq.count = 0; + pthread_mutex_unlock(&sendq.lock); } -static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd) -{ - struct cmd_node *node = malloc(sizeof(*node)); - if (!node) - return; - node->cmd = malloc(len); - if (!node->cmd) { - free(node); +static void sendq_push(uint8_t *data, uint32_t len) +{ + struct send_node *n = (struct send_node *)malloc(sizeof(*n)); + if (!n) { + free(data); return; } + n->data = data; + n->len = len; + n->sent = 0; + n->next = NULL; - memcpy(node->cmd, cmd, len); - node->seq = seq; - node->len = len; - node->next = NULL; + pthread_mutex_lock(&sendq.lock); - if (!pending.tail) { - pending.head = pending.tail = node; + if (!sendq.tail) { + sendq.head = sendq.tail = n; } else { - pending.tail->next = node; - pending.tail = node; + sendq.tail->next = n; + sendq.tail = n; } - - pending.count++; + sendq.count++; + pthread_cond_signal(&sendq.not_empty); + pthread_mutex_unlock(&sendq.lock); } -static void pending_gc(__u64 min_seq) +static void sendq_pop(void) { - struct cmd_node *cur = pending.head; - - int n = pending.count; - while (cur && cur->seq < min_seq) { - struct cmd_node *tmp = cur; - cur = cur->next; - - free(tmp->cmd); - free(tmp); - pending.count--; - } - - printf("gc:%d\n", n-pending.count); - - pending.head = cur; - if (!cur) - pending.tail = NULL; + if (!sendq.head) return; + struct send_node *n = sendq.head; + sendq.head = n->next; + if (!sendq.head) sendq.tail = NULL; + free(n->data); + free(n); + sendq.count--; } -static void pending_send_one(struct cmd_node *node){ - int rt = send(sockfd, node->cmd, node->len, 0); - printf("send seq:%lld, rt=%d\n", node->seq, rt); -} - -static void pending_send_all(void) +/* ================= Reader 线程:读共享内存 ================= */ +static void* reader_thread_func(void *arg) { - struct cmd_node *cur = pending.head; - - while (cur) { - pending_send_one(cur); - cur = cur->next; + (void)arg; + + DEBUGLOG("Reader thread started\n"); + + while (!should_stop) { + replica_rec_hdr_t h; + + uint64_t last = __atomic_load_n(&g_shm.hdr->last_seq, __ATOMIC_ACQUIRE); + if (local_seq > last) { + // 没有新数据,短暂休眠避免空转 + continue; + } + if (read_off+ sizeof(replica_rec_hdr_t) >= g_shm.hdr->capacity) { + DEBUGLOG("Reader: read_off overflow, reset\n"); + // read_off = 0; + break; + // continue; + } + + if (replica_shm_peek(&g_shm, read_off, &h) != 0) { + DEBUGLOG("Reader: peek failed at %u\n", read_off); + break; + // continue; + } + + // 检测 wrap + if (h.len == 0) { + DEBUGLOG("Reader: wrap at offset %u\n", read_off); + read_off = 0; + continue; + } + + // 跳过 SYNC_SEQ 之前的 + if (h.seq < SYNC_SEQ) { + uint64_t step = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)h.len); + if (read_off + step > g_shm.hdr->capacity) { + read_off = 0; + } else { + read_off += (uint32_t)step; + } + continue; + } + + // 序列号检查 + if (h.seq != local_seq) { + DEBUGLOG("Reader: seq mismatch! h.seq=%lu, local_seq=%lu, off=%u\n", + h.seq, local_seq, read_off); + continue; + } + + // 读取数据 + uint8_t *buf = (uint8_t *)malloc(h.len); + if (!buf) { + DEBUGLOG("Reader: malloc failed\n"); + usleep(1000); + continue; + } + + memcpy(buf, g_shm.data + read_off + sizeof(replica_rec_hdr_t), h.len); + sendq_push(buf, h.len); + + uint64_t step = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)h.len); + if (read_off + step > g_shm.hdr->capacity) { + read_off = 0; + } else { + read_off += (uint32_t)step; + } + + local_seq++; } + + DEBUGLOG("Reader thread stopped\n"); + return NULL; +} + +/* ================= Sender 线程:发送数据 ================= */ +static void* sender_thread_func(void *arg) +{ + (void)arg; + + DEBUGLOG("Sender thread started\n"); + + int epfd = epoll_create1(0); + if (epfd < 0) { + perror("epoll_create1"); + return NULL; + } + + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN | EPOLLOUT; + ev.data.fd = g_sockfd; + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, g_sockfd, &ev) != 0) { + perror("epoll_ctl ADD"); + close(epfd); + return NULL; + } + + while (!should_stop && g_sockfd >= 0) { + struct epoll_event events[4]; + int nfds = epoll_wait(epfd, events, 4, 100); // 100ms timeout + + if (nfds < 0) { + if (errno == EINTR) continue; + perror("epoll_wait"); + break; + } + + for (int i = 0; i < nfds; i++) { + if (events[i].data.fd != g_sockfd) + continue; + + if (events[i].events & (EPOLLERR | EPOLLHUP)) { + DEBUGLOG("Sender: EPOLLERR/EPOLLHUP\n"); + close(g_sockfd); + g_sockfd = -1; + break; + } + + if (events[i].events & EPOLLIN) { + char buf[4096]; + recv(g_sockfd, buf, sizeof(buf), 0); + } + + if (events[i].events & EPOLLOUT) { + pthread_mutex_lock(&sendq.lock); + + while (sendq.head) { + struct send_node *n = sendq.head; + + pthread_mutex_unlock(&sendq.lock); + + int nbytes = send(g_sockfd, n->data + n->sent, + (int)(n->len - n->sent), MSG_NOSIGNAL); + + pthread_mutex_lock(&sendq.lock); + + if (nbytes > 0) { + n->sent += (uint32_t)nbytes; + if (n->sent == n->len) { + sendq_pop(); + continue; + } + // partial send + break; + } + + if (nbytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + DEBUGLOG("Sender: send error errno=%d\n", errno); + pthread_mutex_unlock(&sendq.lock); + close(g_sockfd); + g_sockfd = -1; + goto out; + } + + // nbytes == 0 + DEBUGLOG("Sender: send returned 0\n"); + pthread_mutex_unlock(&sendq.lock); + close(g_sockfd); + g_sockfd = -1; + goto out; + } + + pthread_mutex_unlock(&sendq.lock); + } + } + } + +out: + close(epfd); + DEBUGLOG("Sender thread stopped\n"); + return NULL; } /* ================= 网络逻辑 ================= */ -static void try_connect(void) +static int connect_peer(void) { - if(sockfd > 0){ - close(sockfd); - sockfd = -1; + if (peer_port <= 0 || peer_ip[0] == '\0') + return -1; + + if (g_sockfd >= 0) { + close(g_sockfd); + g_sockfd = -1; } - struct sockaddr_in addr = {}; - int i = 0; - - addr.sin_family = AF_INET; - addr.sin_port = htons(peer_port); - inet_pton(AF_INET, peer_ip, &addr.sin_addr); - - for(i = 0;i < 10; ++ i){ - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) { - perror("socket"); - return; - } - - printf("connect try %d...\n", i + 1); - if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == 0) { - printf("connect success: %s:%d\n", peer_ip, peer_port); - state = ONLINE; - pending_send_all(); - return; - } - - perror("connect"); - close(sockfd); - sockfd = -1; - - sleep(1); + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + perror("socket"); + return -1; } - printf("connect failed after 10 retries\n"); + + struct sockaddr_in a; + memset(&a, 0, sizeof(a)); + a.sin_family = AF_INET; + a.sin_port = htons(peer_port); + if (inet_pton(AF_INET, peer_ip, &a.sin_addr) != 1) { + DEBUGLOG("inet_pton failed for ip=%s\n", peer_ip); + close(fd); + return -1; + } + + if (connect(fd, (struct sockaddr *)&a, sizeof(a)) != 0) { + // 这里可以重试;按你的要求先简单返回失败 + // perror("connect"); + close(fd); + return -1; + } + + // non-blocking(配合 epoll) + int flags = fcntl(fd, F_GETFL, 0); + if (flags >= 0) fcntl(fd, F_SETFL, flags | O_NONBLOCK); + + g_sockfd = fd; + DEBUGLOG("connect ok %s:%d\n", peer_ip, peer_port); + return 0; } /* ================= perf buffer 回调 ================= */ static void handle_event(void *ctx, int cpu, void *data, __u32 size) { - struct replica_event *evt = data; + (void)ctx; (void)cpu; + if (size < sizeof(struct replica_event)) return; - switch (evt->type) { + struct replica_event *e = (struct replica_event*)data; - case EVENT_SSYNC: - strncpy(peer_ip, evt->sync.ip, sizeof(peer_ip)); - peer_port = evt->sync.port; - peer_seq = evt->sync.seq; - printf("SSYNC [seq:%lld], [%s:%d]\n", peer_seq, peer_ip, peer_port); + if (e->type == EVENT_SSYNC) { + memset(peer_ip, 0, sizeof(peer_ip)); + memcpy(peer_ip, e->sync.ip, e->sync.ip_len); + peer_port = e->sync.port; + SYNC_SEQ = e->sync.seq; - state = OFFLINE; - pending_gc(peer_seq); - break; + local_seq = SYNC_SEQ; + read_off = 0; - case EVENT_COMPLETED_CMD: - // printf("CMD [seq:%lld], cmd:\n[\n%s]\n", evt->complete.seq, evt->complete.cmd); - pending_push(evt->complete.seq, - evt->complete.len, - evt->complete.cmd); + DEBUGLOG("SSYNC: peer=%s:%d SYNC_SEQ=%llu\n", + peer_ip, peer_port, (unsigned long long)SYNC_SEQ); - if (state == ONLINE && pending.tail) { - struct cmd_node *n = pending.tail; - pending_send_one(n); + // 停止旧线程 + should_stop = 1; + if (reader_thread) { + pthread_join(reader_thread, NULL); + reader_thread = 0; } - break; + if (sender_thread) { + pthread_join(sender_thread, NULL); + sender_thread = 0; + } + + if (g_sockfd >= 0) { + close(g_sockfd); + g_sockfd = -1; + } + sendq_free_all(); + return; + } - case EVENT_SREADY: - printf("SREADY \n"); - if (state == OFFLINE) - try_connect(); - break; + if (e->type == EVENT_SREADY) { + DEBUGLOG("SREADY\n"); + + if (connect_peer() != 0) { + DEBUGLOG("connect_peer failed\n"); + return; + } + + // 启动双线程 + should_stop = 0; + + if (pthread_create(&reader_thread, NULL, reader_thread_func, NULL) != 0) { + perror("pthread_create reader"); + return; + } + + if (pthread_create(&sender_thread, NULL, sender_thread_func, NULL) != 0) { + perror("pthread_create sender"); + pthread_cancel(reader_thread); + return; + } + + DEBUGLOG("Reader and Sender threads started\n"); + return; } } /* ================= main ================= */ int main(int argc, char **argv) { + + int rc = replica_shm_open(&g_shm, REPLICA_SHM_NAME, REPLICA_SHM_SIZE, 0); + if (rc != 0) { + fprintf(stderr, "replica_shm_open failed rc=%d (did you create it in kvstore?)\n", rc); + return 1; + } + struct replica_bpf *skel; struct perf_buffer *pb = NULL; int err; @@ -231,8 +503,7 @@ int main(int argc, char **argv) goto cleanup; } - printf("Successfully started! Please run `sudo cat /sys/kernel/debug/tracing/trace_pipe` " - "to see output of the BPF programs.\n"); + printf("Successfully started! \n"); pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8, @@ -244,13 +515,19 @@ int main(int argc, char **argv) while (1) { perf_buffer__poll(pb, 1000); // 处理事件 + + } perf_buffer__free(pb); cleanup: - pending_free(); - if (sockfd >= 0) close(sockfd); + should_stop = 1; + if (reader_thread) pthread_join(reader_thread, NULL); + if (sender_thread) pthread_join(sender_thread, NULL); + if (g_sockfd >= 0) close(g_sockfd); + replica_shm_close(&g_shm); + sendq_free_all(); replica_bpf__destroy(skel); return -err; } diff --git a/ebpf/c/replica.h b/ebpf/c/replica.h index 130cd0c..c14aabd 100644 --- a/ebpf/c/replica.h +++ b/ebpf/c/replica.h @@ -2,34 +2,25 @@ #define __REPLICA_H__ -#define MAX_CMD_LEN 256 -#define MAX_IP_LEN 64 -enum event_type { - EVENT_COMPLETED_CMD, - EVENT_SSYNC, - EVENT_SREADY, -}; +#define MAX_IP_LEN 64 -struct complete_cmd_evt { - __u64 seq; - __u32 len; - __u8 cmd[MAX_CMD_LEN]; -}; - -struct sync_evt { - __u64 seq; - char ip[MAX_IP_LEN]; - __s32 port; +enum { + EVENT_SSYNC = 1, + EVENT_SREADY = 2, }; struct replica_event { - __u32 type; - __u32 _pad; + uint32_t type; + uint32_t _pad; union { - struct complete_cmd_evt complete; - struct sync_evt sync; + struct { + uint64_t seq; // SYNC_SEQ:从这个 seq 开始增量 + int32_t port; + uint32_t ip_len; + char ip[MAX_IP_LEN]; + } sync; }; }; diff --git a/ebpf/c/replica_shm.h b/ebpf/c/replica_shm.h new file mode 100644 index 0000000..27080f1 --- /dev/null +++ b/ebpf/c/replica_shm.h @@ -0,0 +1,59 @@ +#ifndef __REPLICA_SHM_H__ +#define __REPLICA_SHM_H__ +#include +#include + +#ifndef REPLICA_SHM_NAME +#define REPLICA_SHM_NAME "/kvs_replica_shm" +#endif + +#ifndef REPLICA_SHM_SIZE +// 64MB,按需调 +#define REPLICA_SHM_SIZE (256u * 1024u * 1024u) +#endif + +// 每条记录头部(放在 shm 的 data 区里) +typedef struct __attribute__((packed)) { + uint64_t seq; // 单调递增 + uint32_t len; // payload bytes + uint32_t flags; // 预留:压缩、类型等 + uint32_t crc32; // 可选:0 表示不校验 + uint32_t reserved; // 对齐 + // uint8_t payload[len] 紧跟其后 +} replica_rec_hdr_t; + +// shm 顶部元数据 +typedef struct __attribute__((packed)) { + uint32_t magic; + uint32_t version; + uint64_t capacity; // data 区大小(字节) + uint64_t write_off; // producer 写指针(0..capacity-1) + uint64_t last_seq; // producer 最新 seq + uint8_t _pad[64]; // cacheline padding + // 后面紧跟 data[capacity] +} replica_shm_hdr_t; + +typedef struct { + int fd; + size_t map_size; + replica_shm_hdr_t *hdr; + uint8_t *data; +} replica_shm_t; + +// kvstore: 初始化(create/open + mmap) +int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create); + +// kvstore: append 一条记录,返回 off(相对 data 起始),用于 notify +// 单写者设计:无需锁。返回 0 成功,<0 失败(空间不足或参数错误) +int replica_shm_append(replica_shm_t *s, uint64_t seq, const void *buf, uint32_t len, uint32_t *out_off); + +// replicator: 读取记录头(不移动游标),你也可以直接 memcpy payload +// off 是 data 内偏移 +int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr); + +// 关闭 +void replica_shm_close(replica_shm_t *s); + + +extern replica_shm_t g_rep_shm; +#endif \ No newline at end of file diff --git a/ebpf/old.c/.gitignore b/ebpf/old.c/.gitignore new file mode 100644 index 0000000..9edf6d7 --- /dev/null +++ b/ebpf/old.c/.gitignore @@ -0,0 +1,18 @@ +/.output +/bootstrap +/bootstrap_legacy +/minimal +/minimal_legacy +/minimal_ns +/uprobe +/kprobe +/fentry +/profile +/usdt +/sockfilter +/tc +/ksyscall +/task_iter +/lsm +/cmake-build-debug/ +/cmake-build-release/ diff --git a/ebpf/old.c/CMakeLists.txt b/ebpf/old.c/CMakeLists.txt new file mode 100644 index 0000000..c5b0c92 --- /dev/null +++ b/ebpf/old.c/CMakeLists.txt @@ -0,0 +1,133 @@ +# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause + +cmake_minimum_required(VERSION 3.16) +project(examples C) + +# Tell cmake where to find BpfObject module +list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../tools/cmake) + +# Build vendored libbpf +include(ExternalProject) +ExternalProject_Add(libbpf + PREFIX libbpf + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../libbpf/src + CONFIGURE_COMMAND "" + BUILD_COMMAND make + CC=${CMAKE_C_COMPILER} + BUILD_STATIC_ONLY=1 + OBJDIR=${CMAKE_CURRENT_BINARY_DIR}/libbpf/libbpf + DESTDIR=${CMAKE_CURRENT_BINARY_DIR}/libbpf + INCLUDEDIR= + LIBDIR= + UAPIDIR= + install install_uapi_headers + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build + BUILD_BYPRODUCTS ${CMAKE_CURRENT_BINARY_DIR}/libbpf/libbpf.a +) + +ExternalProject_Add(bpftool + PREFIX bpftool + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../bpftool/src + CONFIGURE_COMMAND "" + BUILD_COMMAND make bootstrap + OUTPUT=${CMAKE_CURRENT_BINARY_DIR}/bpftool/ + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build +) + +find_program(CARGO_EXISTS cargo) +if(CARGO_EXISTS) + if(CMAKE_CROSSCOMPILING) + # Determine target triple + if(CMAKE_SYSTEM_NAME MATCHES "Linux") + if(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64") + set(CARGO_TARGET "x86_64-unknown-linux-gnu") + elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64") + set(CARGO_TARGET "aarch64-unknown-linux-gnu") + else() + message(FATAL_ERROR "Unsupported processor for Linux: ${CMAKE_SYSTEM_PROCESSOR}") + endif() + + if(CMAKE_CXX_COMPILER) + set(RUST_LINKER ${CMAKE_CXX_COMPILER}) + else() + set(RUST_LINKER ${CMAKE_C_COMPILER}) + endif() + else() + message((FATAL_ERROR "Unsupported platform: ${CMAKE_SYSTEM_NAME}")) + endif() + + ExternalProject_Add(blazesym + PREFIX blazesym + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym + CONFIGURE_COMMAND "" + BUILD_COMMAND ${CMAKE_COMMAND} -E env + RUSTFLAGS=-C\ linker=${RUST_LINKER} + cargo build --package=blazesym-c --release --target=${CARGO_TARGET} + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build + ) + else() # Host + ExternalProject_Add(blazesym + PREFIX blazesym + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym + CONFIGURE_COMMAND "" + BUILD_COMMAND + cargo build --package=blazesym-c --release + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build + ) + endif() +endif() + +# Set BpfObject input parameters -- note this is usually not necessary unless +# you're in a highly vendored environment (like libbpf-bootstrap) +if(${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64") + set(ARCH "x86") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm") + set(ARCH "arm") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64") + set(ARCH "arm64") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "ppc64le") + set(ARCH "powerpc") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "mips") + set(ARCH "mips") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "riscv64") + set(ARCH "riscv") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "loongarch64") + set(ARCH "loongarch") +endif() + +set(BPFOBJECT_BPFTOOL_EXE ${CMAKE_CURRENT_BINARY_DIR}/bpftool/bootstrap/bpftool) +set(BPFOBJECT_VMLINUX_H ${CMAKE_CURRENT_SOURCE_DIR}/../../vmlinux.h/include/${ARCH}/vmlinux.h) +set(LIBBPF_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/libbpf) +set(LIBBPF_LIBRARIES ${CMAKE_CURRENT_BINARY_DIR}/libbpf/libbpf.a) +find_package(BpfObject REQUIRED) + +# Create an executable for each application +file(GLOB apps *.bpf.c) +if(NOT CARGO_EXISTS) + list(REMOVE_ITEM apps ${CMAKE_CURRENT_SOURCE_DIR}/profile.bpf.c) +endif() +foreach(app ${apps}) + get_filename_component(app_stem ${app} NAME_WE) + + # Build object skeleton and depend skeleton on libbpf build + bpf_object(${app_stem} ${app_stem}.bpf.c) + add_dependencies(${app_stem}_skel libbpf bpftool) + + add_executable(${app_stem} ${app_stem}.c) + target_link_libraries(${app_stem} ${app_stem}_skel) + if(${app_stem} STREQUAL profile) + target_include_directories(${app_stem} PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym/capi/include) + target_link_libraries(${app_stem} + ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym/target/${CARGO_TARGET}/release/libblazesym_c.a -lpthread -lrt -ldl) + add_dependencies(${app_stem} blazesym) + endif() +endforeach() diff --git a/ebpf/old.c/Makefile b/ebpf/old.c/Makefile new file mode 100644 index 0000000..3a81dbb --- /dev/null +++ b/ebpf/old.c/Makefile @@ -0,0 +1,139 @@ +# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) +OUTPUT := .output +CLANG ?= clang +LIBBPF_SRC := $(abspath ../../libbpf/src) +BPFTOOL_SRC := $(abspath ../../bpftool/src) +LIBBPF_OBJ := $(abspath $(OUTPUT)/libbpf.a) +BPFTOOL_OUTPUT ?= $(abspath $(OUTPUT)/bpftool) +BPFTOOL ?= $(BPFTOOL_OUTPUT)/bootstrap/bpftool +LIBBLAZESYM_SRC := $(abspath ../../blazesym/) +LIBBLAZESYM_INC := $(abspath $(LIBBLAZESYM_SRC)/capi/include) +LIBBLAZESYM_OBJ := $(abspath $(OUTPUT)/libblazesym_c.a) +ARCH ?= $(shell uname -m | sed 's/x86_64/x86/' \ + | sed 's/arm.*/arm/' \ + | sed 's/aarch64/arm64/' \ + | sed 's/ppc64le/powerpc/' \ + | sed 's/mips.*/mips/' \ + | sed 's/riscv64/riscv/' \ + | sed 's/loongarch64/loongarch/') +VMLINUX := ../../vmlinux.h/include/$(ARCH)/vmlinux.h +# Use our own libbpf API headers and Linux UAPI headers distributed with +# libbpf to avoid dependency on system-wide headers, which could be missing or +# outdated +INCLUDES := -I$(OUTPUT) -I../../libbpf/include/uapi -I$(dir $(VMLINUX)) -I$(LIBBLAZESYM_INC) +CFLAGS := -g -Wall +ALL_LDFLAGS := $(LDFLAGS) $(EXTRA_LDFLAGS) + +# APPS = minimal minimal_legacy minimal_ns bootstrap bootstrap_legacy uprobe kprobe fentry \ + usdt sockfilter tc ksyscall task_iter lsm +APPS = replica + +CARGO ?= $(shell which cargo) +ifeq ($(strip $(CARGO)),) +BZS_APPS := +else +BZS_APPS := profile +APPS += $(BZS_APPS) +# Required by libblazesym +ALL_LDFLAGS += -lrt -ldl -lpthread -lm +endif + +# Get Clang's default includes on this system. We'll explicitly add these dirs +# to the includes list when compiling with `-target bpf` because otherwise some +# architecture-specific dirs will be "missing" on some architectures/distros - +# headers such as asm/types.h, asm/byteorder.h, asm/socket.h, asm/sockios.h, +# sys/cdefs.h etc. might be missing. +# +# Use '-idirafter': Don't interfere with include mechanics except where the +# build would have failed anyways. +CLANG_BPF_SYS_INCLUDES ?= $(shell $(CLANG) -v -E - &1 \ + | sed -n '/<...> search starts here:/,/End of search list./{ s| \(/.*\)|-idirafter \1|p }') + +ifeq ($(V),1) + Q = + msg = +else + Q = @ + msg = @printf ' %-8s %s%s\n' \ + "$(1)" \ + "$(patsubst $(abspath $(OUTPUT))/%,%,$(2))" \ + "$(if $(3), $(3))"; + MAKEFLAGS += --no-print-directory +endif + +define allow-override + $(if $(or $(findstring environment,$(origin $(1))),\ + $(findstring command line,$(origin $(1)))),,\ + $(eval $(1) = $(2))) +endef + +$(call allow-override,CC,$(CROSS_COMPILE)cc) +$(call allow-override,LD,$(CROSS_COMPILE)ld) + +.PHONY: all +all: $(APPS) + +.PHONY: clean +clean: + $(call msg,CLEAN) + $(Q)rm -rf $(OUTPUT) $(APPS) + +$(OUTPUT) $(OUTPUT)/libbpf $(BPFTOOL_OUTPUT): + $(call msg,MKDIR,$@) + $(Q)mkdir -p $@ + +# Build libbpf +$(LIBBPF_OBJ): $(wildcard $(LIBBPF_SRC)/*.[ch] $(LIBBPF_SRC)/Makefile) | $(OUTPUT)/libbpf + $(call msg,LIB,$@) + $(Q)$(MAKE) -C $(LIBBPF_SRC) BUILD_STATIC_ONLY=1 \ + OBJDIR=$(dir $@)/libbpf DESTDIR=$(dir $@) \ + INCLUDEDIR= LIBDIR= UAPIDIR= \ + install + +# Build bpftool +$(BPFTOOL): | $(BPFTOOL_OUTPUT) + $(call msg,BPFTOOL,$@) + $(Q)$(MAKE) ARCH= CROSS_COMPILE= OUTPUT=$(BPFTOOL_OUTPUT)/ -C $(BPFTOOL_SRC) bootstrap + + +$(LIBBLAZESYM_SRC)/target/release/libblazesym_c.a:: + $(Q)cd $(LIBBLAZESYM_SRC) && $(CARGO) build --package=blazesym-c --release + +$(LIBBLAZESYM_OBJ): $(LIBBLAZESYM_SRC)/target/release/libblazesym_c.a | $(OUTPUT) + $(call msg,LIB, $@) + $(Q)cp $(LIBBLAZESYM_SRC)/target/release/libblazesym_c.a $@ + +# Build BPF code +$(OUTPUT)/%.bpf.o: %.bpf.c $(LIBBPF_OBJ) $(wildcard %.h) $(VMLINUX) | $(OUTPUT) $(BPFTOOL) + $(call msg,BPF,$@) + $(Q)$(CLANG) -g -O2 -target bpf -D__TARGET_ARCH_$(ARCH) \ + $(INCLUDES) $(CLANG_BPF_SYS_INCLUDES) \ + -c $(filter %.c,$^) -o $(patsubst %.bpf.o,%.tmp.bpf.o,$@) + $(Q)$(BPFTOOL) gen object $@ $(patsubst %.bpf.o,%.tmp.bpf.o,$@) + +# Generate BPF skeletons +$(OUTPUT)/%.skel.h: $(OUTPUT)/%.bpf.o | $(OUTPUT) $(BPFTOOL) + $(call msg,GEN-SKEL,$@) + $(Q)$(BPFTOOL) gen skeleton $< > $@ + +# Build user-space code +$(patsubst %,$(OUTPUT)/%.o,$(APPS)): %.o: %.skel.h + +$(OUTPUT)/%.o: %.c $(wildcard %.h) | $(OUTPUT) + $(call msg,CC,$@) + $(Q)$(CC) $(CFLAGS) $(INCLUDES) -c $(filter %.c,$^) -o $@ + +$(patsubst %,$(OUTPUT)/%.o,$(BZS_APPS)): $(LIBBLAZESYM_OBJ) + +$(BZS_APPS): $(LIBBLAZESYM_OBJ) + +# Build application binary +$(APPS): %: $(OUTPUT)/%.o $(LIBBPF_OBJ) | $(OUTPUT) + $(call msg,BINARY,$@) + $(Q)$(CC) $(CFLAGS) $^ $(ALL_LDFLAGS) -lelf -lz -o $@ + +# delete failed targets +.DELETE_ON_ERROR: + +# keep intermediate (.skel.h, .bpf.o, etc) targets +.SECONDARY: diff --git a/ebpf/old.c/replica.bpf.c b/ebpf/old.c/replica.bpf.c new file mode 100644 index 0000000..14c6778 --- /dev/null +++ b/ebpf/old.c/replica.bpf.c @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause +/* Copyright (c) 2020 Facebook */ +#include "vmlinux.h" +#include +#include +#include + +#include "replica.h" + +char LICENSE[] SEC("license") = "Dual BSD/GPL"; + +struct { + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(key_size, sizeof(int)); + __uint(value_size, sizeof(int)); +} events SEC(".maps"); + +/* __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); */ +SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__completed_cmd") +int BPF_KPROBE(handle_completed_cmd, + const __u8 *cmd, size_t len, __u64 seq) +{ + struct replica_event evt = {}; + __u32 copy_len; + + evt.type = EVENT_COMPLETED_CMD; + evt.complete.seq = seq; + + copy_len = len; + if (copy_len > MAX_CMD_LEN) + copy_len = MAX_CMD_LEN; + + evt.complete.len = copy_len; + + if (cmd) + bpf_probe_read_user(evt.complete.cmd, copy_len, cmd); + + bpf_perf_event_output(ctx, &events, + BPF_F_CURRENT_CPU, + &evt, sizeof(evt)); + return 0; +} + +/* __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); */ +SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__ssync") +int BPF_KPROBE(handle_ssync, + const __u8 *ip, __u32 ip_len, int port, __u64 seq) +{ + struct replica_event evt = {}; + + evt.type = EVENT_SSYNC; + evt.sync.seq = seq; + evt.sync.port = port; + + __u32 copy_len = ip_len; + if (copy_len > sizeof(evt.sync.ip)) + copy_len = sizeof(evt.sync.ip); + + if (ip) + bpf_probe_read_user(evt.sync.ip, copy_len, ip); + + bpf_perf_event_output(ctx, &events, + BPF_F_CURRENT_CPU, + &evt, sizeof(evt)); + return 0; +} + +/* __sready(void); */ +SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__sready") +int BPF_KPROBE(handle_sready) +{ + struct replica_event evt = {}; + + evt.type = EVENT_SREADY; + + bpf_perf_event_output(ctx, &events, + BPF_F_CURRENT_CPU, + &evt, sizeof(evt)); + return 0; +} \ No newline at end of file diff --git a/ebpf/old.c/replica.c b/ebpf/old.c/replica.c new file mode 100644 index 0000000..544c85f --- /dev/null +++ b/ebpf/old.c/replica.c @@ -0,0 +1,355 @@ +// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) +/* Copyright (c) 2020 Facebook */ +#include +#include +#include +#include +#include +#include +#include "replica.skel.h" +#include +#include +#include +#include +#include + +#include "replica.h" + +#define DEBUGLOG printf + +typedef enum { + OFFLINE = 0, + ONLINE = 1, +}replica_state_e ; + +struct cmd_node { + __u64 seq; + __u32 len; + uint8_t *cmd; + struct cmd_node *next; +}; + +struct pending_queue { + struct cmd_node *head; + struct cmd_node *tail; + int count; +}; + +/* ================= 全局状态 ================= */ + +static replica_state_e state = OFFLINE; +static int sockfd = -1; +static int epollfd = -1; + +static char peer_ip[MAX_IP_LEN]; +static int peer_port; +static __u64 peer_seq; + +static struct pending_queue pending = { + .head = NULL, + .tail = NULL, + .count = 0, +}; + +/* ================= pending 队列操作 ================= */ +static void pending_free() +{ + struct pending_queue *q = &pending; + struct cmd_node *cur = q->head; + while (cur) { + struct cmd_node *tmp = cur; + cur = cur->next; + free(tmp->cmd); + free(tmp); + } + q->head = q->tail = NULL; + q->count = 0; +} + +static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd) +{ + struct cmd_node *node = malloc(sizeof(*node)); + if (!node) + return; + + node->cmd = malloc(len); + if (!node->cmd) { + free(node); + return; + } + + memcpy(node->cmd, cmd, len); + node->seq = seq; + node->len = len; + node->next = NULL; + + if (!pending.tail) { + pending.head = pending.tail = node; + } else { + pending.tail->next = node; + pending.tail = node; + } + + pending.count++; +} + +static void pending_gc(__u64 min_seq) +{ + struct cmd_node *cur = pending.head; + + int n = pending.count; + while (cur && cur->seq < min_seq) { + struct cmd_node *tmp = cur; + cur = cur->next; + + free(tmp->cmd); + free(tmp); + pending.count--; + } + + DEBUGLOG("gc:%d\n", n-pending.count); + + pending.head = cur; + if (!cur) + pending.tail = NULL; +} + +static void pending_send_all(void) +{ + struct cmd_node *cur = pending.head; + while (cur) { + int rt = send(sockfd, cur->cmd, cur->len, 0); + + if(rt == (int)cur->len){ + struct cmd_node *tmp = cur; + cur = cur->next; + + free(tmp->cmd); + free(tmp); + pending.count--; + }else{ + DEBUGLOG("error\n"); + // 失败:不移动 cur,直接 break + if (rt < 0) { + perror("send failed"); + if (errno == ECONNRESET || errno == EPIPE) { + state = OFFLINE; + if (sockfd >= 0) { + close(sockfd); + sockfd = -1; + DEBUGLOG("connect closed\n"); + } + } else if (rt == 0) { + fprintf(stderr, "send returned 0 (peer closed?)\n"); + } else { + fprintf(stderr, "partial send: %d/%u\n", rt, cur->len); + } + + break; + } + } + } + + pending.head = cur; + if(!cur) + pending.tail = NULL; +} + +/* ================= 网络逻辑 ================= */ +static void try_connect(void) +{ + if(sockfd > 0){ + close(sockfd); + sockfd = -1; + } + + struct sockaddr_in addr = {}; + int i = 0; + + addr.sin_family = AF_INET; + addr.sin_port = htons(peer_port); + inet_pton(AF_INET, peer_ip, &addr.sin_addr); + + for(i = 0;i < 10; ++ i){ + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + perror("socket"); + return; + } + + DEBUGLOG("connect try %d...\n", i + 1); + if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == 0) { + DEBUGLOG("connect success: %s:%d\n", peer_ip, peer_port); + + int flags = fcntl(sockfd, F_GETFL, 0); + fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); + + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.fd = sockfd; + epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev); + + state = ONLINE; + pending_send_all(); + return; + } + + perror("connect"); + close(sockfd); + sockfd = -1; + sleep(1); + } + + DEBUGLOG("connect failed after 10 retries\n"); +} + +static void handle_socket_readable(void) +{ + char buf[65536]; + while (1) { + int n = recv(sockfd, buf, sizeof(buf), MSG_DONTWAIT); + if (n > 0) { + continue; + } else if (n == 0) { + state = OFFLINE; + epoll_ctl(epollfd, EPOLL_CTL_DEL, sockfd, NULL); + close(sockfd); + sockfd = -1; + DEBUGLOG("connection closed\n"); + break; + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + perror("recv"); + state = OFFLINE; + epoll_ctl(epollfd, EPOLL_CTL_DEL, sockfd, NULL); + close(sockfd); + sockfd = -1; + break; + } + } +} + +static void handle_socket_writable(void) +{ + pending_send_all(); + if (!pending.head) { + struct epoll_event ev; + ev.events = EPOLLIN; // 只监听读 + ev.data.fd = sockfd; + epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev); + } +} + + +/* ================= perf buffer 回调 ================= */ +static void handle_event(void *ctx, int cpu, void *data, __u32 size) +{ + struct replica_event *evt = data; + switch (evt->type) { + + case EVENT_SSYNC: + strncpy(peer_ip, evt->sync.ip, sizeof(peer_ip)); + peer_port = evt->sync.port; + peer_seq = evt->sync.seq; + DEBUGLOG("SSYNC [seq:%lld], [%s:%d]\n", peer_seq, peer_ip, peer_port); + + state = OFFLINE; + pending_gc(peer_seq); + break; + + case EVENT_COMPLETED_CMD: + // DEBUGLOG("CMD [seq:%lld], cmd:\n[\n%s]\n", evt->complete.seq, evt->complete.cmd); + pending_push(evt->complete.seq, + evt->complete.len, + evt->complete.cmd); + + if (state == ONLINE && sockfd >= 0) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLOUT; + ev.data.fd = sockfd; + epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev); + } + break; + + case EVENT_SREADY: + DEBUGLOG("SREADY \n"); + if (state == OFFLINE) + try_connect(); + break; + } +} + +/* ================= main ================= */ +int main(int argc, char **argv) +{ + struct replica_bpf *skel; + struct perf_buffer *pb = NULL; + int err; + + /* Open BPF application */ + skel = replica_bpf__open(); + if (!skel) { + fprintf(stderr, "Failed to open BPF skeleton\n"); + return 1; + } + + /* Load & verify BPF programs */ + err = replica_bpf__load(skel); + if (err) { + fprintf(stderr, "Failed to load and verify BPF skeleton\n"); + goto cleanup; + } + + /* Attach tracepoint handler */ + err = replica_bpf__attach(skel); + if (err) { + fprintf(stderr, "Failed to attach BPF skeleton\n"); + goto cleanup; + } + + printf("Successfully started! \n"); + + + pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8, + handle_event, NULL, NULL, NULL); + + if(!pb){ + goto cleanup; + } + + epollfd = epoll_create1(0); + if (epollfd < 0) { + fprintf(stderr, "epoll_create1 failed\n"); + goto cleanup; + } + + while (1) { + struct epoll_event events[10]; + + perf_buffer__poll(pb, 1000); // 处理事件 + + if(OFFLINE) continue; + + int nfds = epoll_wait(epollfd, events, 10, 0); + for (int i = 0; i < nfds; i++) { + if (events[i].data.fd == sockfd) { + if (events[i].events & EPOLLIN) { + handle_socket_readable(); // 快速消费接收数据 + } + if (events[i].events & EPOLLOUT) { + handle_socket_writable(); // 发送数据 + } + } + } + } + + perf_buffer__free(pb); + +cleanup: + pending_free(); + if (sockfd >= 0) close(sockfd); + replica_bpf__destroy(skel); + return -err; +} diff --git a/ebpf/old.c/replica.h b/ebpf/old.c/replica.h new file mode 100644 index 0000000..130cd0c --- /dev/null +++ b/ebpf/old.c/replica.h @@ -0,0 +1,37 @@ +#ifndef __REPLICA_H__ +#define __REPLICA_H__ + + +#define MAX_CMD_LEN 256 +#define MAX_IP_LEN 64 + +enum event_type { + EVENT_COMPLETED_CMD, + EVENT_SSYNC, + EVENT_SREADY, +}; + +struct complete_cmd_evt { + __u64 seq; + __u32 len; + __u8 cmd[MAX_CMD_LEN]; +}; + +struct sync_evt { + __u64 seq; + char ip[MAX_IP_LEN]; + __s32 port; +}; + +struct replica_event { + __u32 type; + __u32 _pad; + + union { + struct complete_cmd_evt complete; + struct sync_evt sync; + }; +}; + + +#endif \ No newline at end of file diff --git a/ebpf/old.c/xmake.lua b/ebpf/old.c/xmake.lua new file mode 100644 index 0000000..c4a5f3d --- /dev/null +++ b/ebpf/old.c/xmake.lua @@ -0,0 +1,126 @@ +add_rules("mode.release", "mode.debug") +add_rules("platform.linux.bpf") +set_license("GPL-2.0") + +if xmake.version():satisfies(">=2.5.7 <=2.5.9") then + on_load(function (target) + raise("xmake(%s) has a bug preventing BPF source code compilation. Please run `xmake update -f 2.5.6` to revert to v2.5.6 version or upgrade to xmake v2.6.1 that fixed the issue.", xmake.version()) + end) +end + +option("system-libbpf", {showmenu = true, default = false, description = "Use system-installed libbpf"}) +option("require-bpftool", {showmenu = true, default = false, description = "Require bpftool package"}) + +add_requires("elfutils", "zlib") +if is_plat("android") then + add_requires("ndk >=22.x <26", "argp-standalone") + set_toolchains("@ndk", {sdkver = "23"}) +else + add_requires("llvm >=10.x") + set_toolchains("@llvm") + add_requires("linux-headers") +end + +-- fix error: libbpf: map 'my_pid_map': unsupported map linkage static. for bpftool >= 7.2.0 +-- we cannot add `"-fvisibility=hidden"` when compiling *.bpf.c +set_symbols("none") + +if is_arch("arm64", "arm64-v8a") then + add_includedirs("../../vmlinux.h/include/arm64") +elseif is_arch("arm.*") then + add_includedirs("../../vmlinux.h/include/arm") +elseif is_arch("riscv32", "riscv64") then + add_includedirs("../../vmlinux.h/include/riscv") +elseif is_arch("loongarch") then + add_includedirs("../../vmlinux.h/include/loongarch") +elseif is_arch("ppc", "powerpc") then + add_includedirs("../../vmlinux.h/include/powerpc") +elseif is_arch("x86_64", "i386") then + add_includedirs("../../vmlinux.h/include/x86") +else + add_includedirs("../../vmlinux.h/include") +end + +-- we can run `xmake f --require-bpftool=y` to pull bpftool from xmake-repo repository +if has_config("require-bpftool") then + add_requires("linux-tools", {configs = {bpftool = true}}) + add_packages("linux-tools") +else + before_build(function (target) + os.addenv("PATH", path.join(os.scriptdir(), "..", "..", "tools")) + end) +end + +-- we use the vendored libbpf sources for libbpf-bootstrap. +-- for some projects you may want to use the system-installed libbpf, so you can run `xmake f --system-libbpf=y` +if has_config("system-libbpf") then + add_requires("libbpf", {system = true}) +else + target("libbpf") + set_kind("static") + set_basename("bpf") + add_files("../../libbpf/src/*.c") + add_includedirs("../../libbpf/include") + add_includedirs("../../libbpf/include/uapi", {public = true}) + add_includedirs("$(buildir)", {interface = true}) + add_configfiles("../../libbpf/src/(*.h)", {prefixdir = "bpf"}) + add_packages("elfutils", "zlib") + if is_plat("android") then + add_defines("__user=", "__force=", "__poll_t=uint32_t") + end +end + +target("minimal") + set_kind("binary") + add_files("minimal.c", "minimal.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("minimal_legacy") + set_kind("binary") + add_files("minimal_legacy.c", "minimal_legacy.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("bootstrap") + set_kind("binary") + add_files("bootstrap.c", "bootstrap.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + if is_plat("android") then + add_packages("argp-standalone") + end + +target("fentry") + set_kind("binary") + add_files("fentry.c", "fentry.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("uprobe") + set_kind("binary") + add_files("uprobe.c", "uprobe.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("kprobe") + set_kind("binary") + add_files("kprobe.c", "kprobe.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + if is_plat("android") then + -- TODO we need fix vmlinux.h to support android + set_default(false) + end diff --git a/img/image12.png b/img/image12.png index 66a9a22..7ab95ff 100644 Binary files a/img/image12.png and b/img/image12.png differ diff --git a/img/image13.png b/img/image13.png index 66a9a22..023fbef 100644 Binary files a/img/image13.png and b/img/image13.png differ diff --git a/kvs_array_bin.c b/kvs_array_bin.c index 2fa0d27..af7f563 100644 --- a/kvs_array_bin.c +++ b/kvs_array_bin.c @@ -235,13 +235,22 @@ int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename for (int i = 0; i < count; i++) total += lens[i]; task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); + cleanup_finished_iouring_tasks(); - if (!t) { close(fd); return -4; } + if (!t) { + perror("task init failed"); + goto clean; + } current_off += (off_t) total; } +clean: + while (!uring_task_complete(uring)) { + usleep(1000); + cleanup_finished_iouring_tasks(); + } close(fd); return 0; } diff --git a/kvs_hash_bin.c b/kvs_hash_bin.c index 6283c21..b529d61 100755 --- a/kvs_hash_bin.c +++ b/kvs_hash_bin.c @@ -273,7 +273,9 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){ for(int i = 0;i < inst->max_slots; ++ i){ for (hashnode_t *n = inst->nodes[i]; n != NULL; n = n->next) { if (!n->key || n->key_len == 0) continue; - if (n->value_len > 0 && !n->value) { close(fd); return -3; } + if (n->value_len > 0 && !n->value) { + goto clean; + } uint32_t klen = htonl((uint32_t)n->key_len); @@ -308,14 +310,21 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){ task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); - - if (!t) { close(fd); return -4; } - + if(!t) { + perror("task init failed"); + goto clean; + } + cleanup_finished_iouring_tasks(); current_off += (off_t) total; } } +clean: + while (!uring_task_complete(uring)) { + usleep(1000); + cleanup_finished_iouring_tasks(); + } close(fd); return 0; } diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index 64c88b0..27cf03d 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -469,7 +469,7 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off if (node == inst->nil) return 0; int rc = 0; - + rc = kvs_rbtree_save_node(uring, fd, current_off, inst, node->left); if (rc < 0) return rc; @@ -504,9 +504,12 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off for (int i = 0; i < count; i++) total += lens[i]; task_t *t = submit_write(uring, fd, bufs, lens, count, *current_off); - + cleanup_finished_iouring_tasks(); - if (!t) { return -4; } + if(!t) { + perror("task init failed"); + return -4; + } *current_off += (off_t) total; @@ -527,7 +530,11 @@ int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filena int rc = kvs_rbtree_save_node(uring, fd, ¤t_off, inst, inst->root); - close(fd); + while (!uring_task_complete(uring)) { + usleep(1000); + cleanup_finished_iouring_tasks(); + } + close(fd); return rc; } diff --git a/kvstore.c b/kvstore.c index ba26a19..c12c38b 100644 --- a/kvstore.c +++ b/kvstore.c @@ -8,6 +8,7 @@ #include "memory/alloc_dispatch.h" #include "common/config.h" #include "diskuring/diskuring.h" +#include "replica_shm.h" #include #include @@ -29,10 +30,21 @@ unsigned long long global_seq; extern int global_oplog_fd; +replica_shm_t g_rep_shm; + +__attribute__((noinline)) void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ + asm volatile("" ::: "memory"); } +// __attribute__((noinline)) +// void __replica_notify(uint64_t seq, uint32_t off, uint32_t len) +// { +// // 空函数即可,目的是让 uprobe 拿到参数 +// asm volatile("" ::: "memory"); +// } + int kvs_protocol(struct conn* conn){ if (!conn) return -1; char *request = conn->rbuffer; @@ -68,10 +80,6 @@ int kvs_protocol(struct conn* conn){ int dr = resp_dispatch(&cmd, &val); - __completed_cmd(p, len, global_seq); - global_seq ++; - - /* * 语义建议: * - resp_dispatch() 即使返回 -1(比如 unknown command / wrong argc), @@ -88,29 +96,45 @@ int kvs_protocol(struct conn* conn){ } } else { // persist into oplog - if(global_cfg.persistence == PERSIST_INCREMENTAL){ - - /* 执行成功:在这里保存到日志中(只记录更新类命令) */ - if (cmd.argc > 0 && cmd.argv[0].ptr) { - /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ - const resp_slice_t *c0 = &cmd.argv[0]; - int is_update = 0; - if (c0->ptr && c0->len) { - if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || - ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { - is_update = 1; - } + /* 执行成功:在这里保存到日志中(只记录更新类命令) */ + if (cmd.argc > 0 && cmd.argv[0].ptr) { + /* 更新类命令:SET/DEL/MOD/RSET/RDEL/RMOD/HSET/HDEL/HMOD/SAVE */ + const resp_slice_t *c0 = &cmd.argv[0]; + int is_update = 0; + if (c0->ptr && c0->len) { + if (ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0) { + is_update = 1; } + } - if (is_update) { + if (is_update) { + if(global_cfg.persistence == PERSIST_INCREMENTAL){ kvs_oplog_append(p, len, global_oplog_fd); + } + + // __completed_cmd(p, len, global_seq); + // global_seq ++; + + if (global_cfg.replica_mode == REPLICA_ENABLE) { + uint32_t off = 0; + int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); + if (ar == 0) { + // __replica_notify(global_seq, off, (uint32_t)len); + global_seq++; + + } else { + // shm 满或异常:你可以选择降级(比如直接跳过复制,或阻塞/丢弃) + // 为了不影响主路径,这里先打印并跳过 + fprintf(stderr, "replica_shm_append failed %d\n", ar); + } } } } @@ -245,7 +269,8 @@ int init_config(AppConfig *cfg){ printf("=============== Config ===============\n"); printf("IP : %s\n", cfg->ip); printf("Port : %d\n", cfg->port); - + + printf("Replica-Mode : %s\n", replica_to_string(cfg->replica_mode)); printf("Mode : %s\n", server_mode_to_string(cfg->mode)); printf("|—— Master IP : %s\n", cfg->master_ip); printf("|—— Master Port : %d\n", cfg->master_port); @@ -268,7 +293,24 @@ int init_config(AppConfig *cfg){ } void init_disk_uring(iouring_ctx_t *uring_ctx){ - iouring_init(uring_ctx, 2048); + // iouring_init(uring_ctx, 4096); + iouring_init(uring_ctx, (1024*8)); +} + +void dest_disk_uring(iouring_ctx_t *uring_ctx){ + iouring_shutdown(uring_ctx); +} + +int kvs_replica_init(void) +{ + if (global_cfg.replica_mode == REPLICA_ENABLE) { + int rc = replica_shm_open(&g_rep_shm, REPLICA_SHM_NAME, REPLICA_SHM_SIZE, /*create=*/ 1); + if (rc != 0) { + fprintf(stderr, "replica_shm_open failed rc=%d\n", rc); + return rc; + } + } + return 0; } @@ -279,6 +321,7 @@ int main(int argc, char *argv[]) { } global_seq = 0; + kvs_replica_init(); init_memory_pool(&global_cfg); init_data_file(&global_cfg); init_disk_uring(&global_uring_ctx); diff --git a/kvstore.h b/kvstore.h index b0f386c..2184b53 100644 --- a/kvstore.h +++ b/kvstore.h @@ -279,7 +279,6 @@ extern kvs_rbtree_t global_rbtree; extern kvs_hash_t global_hash; #endif -void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); #endif diff --git a/reactor.c b/reactor.c index c38289c..f04090a 100644 --- a/reactor.c +++ b/reactor.c @@ -336,7 +336,9 @@ void sync_wakeup() { ssize_t n = write(wakeup_fd, &one, sizeof(one)); } - +// #include "diskuring/diskuring.h" +// extern iouring_ctx_t global_uring_ctx; +// extern void iouring_tick(iouring_ctx_t *ctx); // 定时器 int handle_timer_fd_cb(int fd){ @@ -349,14 +351,15 @@ int handle_timer_fd_cb(int fd){ if (n < 0 && errno == EAGAIN) break; break; } + // iouring_tick(&global_uring_ctx); } int init_timer_fd(void){ int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); struct itimerspec its = { - .it_interval = {1, 0}, // 每 1 秒 - .it_value = {1, 0}, // 1 秒后首次触发 + .it_interval = {0, 100 * 1000 * 1000}, // 100ms = 100,000,000 纳秒 + .it_value = {0, 100 * 1000 * 1000}, // 首次 100ms 后触发 }; timerfd_settime(tfd, 0, &its, NULL); @@ -406,12 +409,12 @@ int reactor_start(unsigned short port, msg_handler handler) { return -1; } - // timer_fd = init_timer_fd(); - // if(timer_fd < 0){ - // close(epfd); - // close(wakeup_fd); - // return -1; - // } + timer_fd = init_timer_fd(); + if(timer_fd < 0){ + close(epfd); + close(wakeup_fd); + return -1; + } int i = 0; diff --git a/replica_shm.c b/replica_shm.c new file mode 100644 index 0000000..5a35e6e --- /dev/null +++ b/replica_shm.c @@ -0,0 +1,110 @@ +#include "replica_shm.h" +#include +#include +#include +#include +#include +#include +#include + +#define REPLICA_SHM_MAGIC 0x52504C43u /* 'RPLC' */ +#define REPLICA_SHM_VER 1 + +static inline uint64_t align8_u64(uint64_t x) { return (x + 7u) & ~7ull; } + +int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create) +{ + if (!s || !name || total_size < (sizeof(replica_shm_hdr_t) + 4096)) return -EINVAL; + memset(s, 0, sizeof(*s)); + + int flags = O_RDWR; + if (create) flags |= O_CREAT; + + int fd = shm_open(name, flags, 0666); + if (fd < 0) return -errno; + + if (create) { + if (ftruncate(fd, (off_t)total_size) != 0) { + int e = -errno; close(fd); return e; + } + } + + void *p = mmap(NULL, total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (p == MAP_FAILED) { + int e = -errno; close(fd); return e; + } + + s->fd = fd; + s->map_size = total_size; + s->hdr = (replica_shm_hdr_t *)p; + s->data = (uint8_t *)p + sizeof(replica_shm_hdr_t); + + // 初始化头 + if (create || s->hdr->magic != REPLICA_SHM_MAGIC) { + memset(s->hdr, 0, sizeof(*s->hdr)); + s->hdr->magic = REPLICA_SHM_MAGIC; + s->hdr->version = REPLICA_SHM_VER; + s->hdr->capacity = total_size - sizeof(replica_shm_hdr_t) - sizeof(replica_rec_hdr_t); + s->hdr->write_off = 0; + s->hdr->last_seq = 0; + } + + printf("capcity:%ld\n", s->hdr->capacity); + + return 0; +} + +int replica_shm_append(replica_shm_t *s, uint64_t seq, const void *buf, uint32_t len, uint32_t *out_off) +{ + if (!s || !s->hdr || !s->data || !buf || len == 0 || !out_off) return -EINVAL; + + uint64_t cap = s->hdr->capacity; + uint64_t off = __atomic_load_n(&s->hdr->write_off, __ATOMIC_RELAXED); + + uint64_t need = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)len); + + // 简化:如果尾部放不下,则写一个“wrap marker”,回到 0 + // wrap marker: hdr.len=0, seq 保留,表示消费者遇到就跳到 0 + if (off + need > cap) { + replica_rec_hdr_t wrap = { .seq = seq, .len = 0, .flags = 0, .crc32 = 0, .reserved = 0 }; + memcpy(s->data + off, &wrap, sizeof(wrap)); + __atomic_store_n(&s->hdr->write_off, 0, __ATOMIC_RELEASE); + + off = 0; + if (need > cap) return -ENOSPC; // 单条记录太大 + } + + replica_rec_hdr_t h = {0}; + h.seq = seq; + h.len = len; + h.flags = 0; + h.crc32 = 0; + + // 写 header + payload + memcpy(s->data + off, &h, sizeof(h)); + memcpy(s->data + off + sizeof(h), buf, len); + + // 发布 write_off / last_seq(保证消费者看到 payload) + uint64_t new_off = off + need; + __atomic_store_n(&s->hdr->last_seq, seq, __ATOMIC_RELEASE); + __atomic_store_n(&s->hdr->write_off, new_off, __ATOMIC_RELEASE); + + *out_off = (uint32_t)off; + return 0; +} + +int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr) +{ + if (!s || !s->hdr || !s->data || !out_hdr) return -EINVAL; + if ((uint64_t)off + sizeof(replica_rec_hdr_t) > s->hdr->capacity) return -EINVAL; + memcpy(out_hdr, s->data + off, sizeof(*out_hdr)); + return 0; +} + +void replica_shm_close(replica_shm_t *s) +{ + if (!s) return; + if (s->hdr && s->map_size) munmap(s->hdr, s->map_size); + if (s->fd > 0) close(s->fd); + memset(s, 0, sizeof(*s)); +} diff --git a/replica_shm.h b/replica_shm.h new file mode 100644 index 0000000..27080f1 --- /dev/null +++ b/replica_shm.h @@ -0,0 +1,59 @@ +#ifndef __REPLICA_SHM_H__ +#define __REPLICA_SHM_H__ +#include +#include + +#ifndef REPLICA_SHM_NAME +#define REPLICA_SHM_NAME "/kvs_replica_shm" +#endif + +#ifndef REPLICA_SHM_SIZE +// 64MB,按需调 +#define REPLICA_SHM_SIZE (256u * 1024u * 1024u) +#endif + +// 每条记录头部(放在 shm 的 data 区里) +typedef struct __attribute__((packed)) { + uint64_t seq; // 单调递增 + uint32_t len; // payload bytes + uint32_t flags; // 预留:压缩、类型等 + uint32_t crc32; // 可选:0 表示不校验 + uint32_t reserved; // 对齐 + // uint8_t payload[len] 紧跟其后 +} replica_rec_hdr_t; + +// shm 顶部元数据 +typedef struct __attribute__((packed)) { + uint32_t magic; + uint32_t version; + uint64_t capacity; // data 区大小(字节) + uint64_t write_off; // producer 写指针(0..capacity-1) + uint64_t last_seq; // producer 最新 seq + uint8_t _pad[64]; // cacheline padding + // 后面紧跟 data[capacity] +} replica_shm_hdr_t; + +typedef struct { + int fd; + size_t map_size; + replica_shm_hdr_t *hdr; + uint8_t *data; +} replica_shm_t; + +// kvstore: 初始化(create/open + mmap) +int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create); + +// kvstore: append 一条记录,返回 off(相对 data 起始),用于 notify +// 单写者设计:无需锁。返回 0 成功,<0 失败(空间不足或参数错误) +int replica_shm_append(replica_shm_t *s, uint64_t seq, const void *buf, uint32_t len, uint32_t *out_off); + +// replicator: 读取记录头(不移动游标),你也可以直接 memcpy payload +// off 是 data 内偏移 +int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr); + +// 关闭 +void replica_shm_close(replica_shm_t *s); + + +extern replica_shm_t g_rep_shm; +#endif \ No newline at end of file