diff --git a/README.md b/README.md index 7c3e1c5..b9a9cb5 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,16 @@ make ``` +``` +docker run -it --rm \ + -v "$(pwd)":/workdir \ + -w /workdir \ + --pid=host \ + --privileged \ + ghcr.io/eunomia-bpf/bpftime:latest \ + /bin/bash +``` + ## 测试 ### 测试1:性能测试 测试条件: @@ -354,14 +364,169 @@ ALL TESTS PASSED. ## 项目收获 -reactor网络模型,用户态网络缓冲区的写法。\ -特殊字符串支持的引擎层数据结构设计,支持\0作为键值存储。\ -实现RESP协议的服务端协议解析。\ -使用fork的Copy On Write机制,实现的异步快照创建,不会受到原字符串的影响。\ -基于BinLog上OffSet的主从同步设计。\ -基于bpf的实时数据同步设计。\ -基于共享缓冲区+额外进程的实时数据同步设计。\ -基于bpf的内存泄露探测功能,实现热插拔。\ -实现支持分配可变长度内存块的内存池。\ -实现专门uring线程实现异步的增量、全量落盘操作。\ -使用配置文件规定端口、保存文件路径等,使用mmap加载到内存,使用libxml解析。\ \ No newline at end of file +#### reactor网络模型,用户态网络缓冲区的写法。 + +#### 特殊字符串支持的引擎层数据结构设计,支持\0作为键值存储。 + +1. 长度前缀 + 内容的 binary-safe 字符串表示,支持 \0 作为普通字符。 + +#### 实现RESP协议的服务端协议解析。 + +1. 解析流程: + 1. 内核 拷贝到 用户态缓冲区 + 2. 用户态缓冲区 就地解析 bulk-string 为执行引擎可以看得懂的结构体 + 3. 执行引擎 拷贝 结构体的内容,插入到底层存储结构中 + 4. 循环解析直到 用户态缓冲区为空 +2. 实现了 pipeline 支持:每次从 buffer 读到一个完整命令就交给应用层处理,应用层返回已消费字节数。如果 buffer 里有半包,连接层保留下次继续解析。 + +#### 快照异步创建。 + +1. 使用fork的Copy On Write机制,实现的异步快照创建,不会受到后续更新请求的影响 + +#### 实现SPSC/SPMC,专门uring线程实现异步的增量、全量落盘操作。 + +1. 熟悉了SPSC无锁队列的实现方案。 + 1. 无锁、cache friendly +2. 流程: + 1. 生产者组装task,丢给SPSC。 + 2. 消费者从中取出然后执行,置入destroy_queue,触发eventfd。 + 3. 生产者释放destroy_queue。 +3. 解决的问题: + 1. iouring 由于 cqe 接收不及时导致的 task 丢失无法释放。 + 1. 通过背压解决,设置inflight上限。 + 2. 背压后,生产者速度 > 消费者速度,ringbuffer 满导致只能阻塞主线程或降低速度。 + 1. 通过修改SPSC为SPMC,构建落盘线程组实现,当task_queue满,触发扩容线程组。 + 2. 每个落盘线程私有一个SPSC,减少竞争。 + 3. 简易工作负载,生产者线程随机选取两个uring线程选取任务少的push。 + 3. 采用读写段采用轮询方案导致乒乓现象,性能下降。 + 1. MESI协议定义了缓存行的四种状态:Modified表示独占且已修改,Exclusive表示独占且未修改,Shared表示多核共享只读,Invalid表示缓存行无效。 + - **关键点:** 但从S状态读没有什么开销,从I状态读则需要向其他CPU申请。 + - **关键点:** 从E/M状态写也没有什么开销,从S状态写则需要广播invalidate并且等待ACK(50-100时钟周期)。 + - 原子操作由于内存序的顺序性和可见性语义,也有额外开销(刷新invalidate queue、阻止指令重排) + 2. 短临界区:分层退避。自旋-让出CPU时间片-较长时间休眠。允许生产者在期间无争用的写入一批数据,然后统一读。 + 3. 更通用的情况:事件驱动。用futex替代轮询。Fast Userspace Mutex。 + 1. **消费者:** 调用`syscall(SYS_futex, &futex_word, FUTEX_WAIT, old_val, &timeout, NULL, 0)` , 如果futex_word仍等于old_val,线程进入内核等待队列,真正睡眠,不占用CPU。 + 2. **生产者:** 调用`syscall(SYS_futex, &futex_word, FUTEX_WAKE, 1, NULL, NULL, 0)` 唤醒一个等待线程。 +4. 优化:生产者速度 > 消费者速度,则写端不需要关注读指针,或者极少关注读指针(缓冲区大小的1/2)次写入才考虑一次。 + +#### 基于BinLog上OffSet的主从同步(已有数据+实时数据)设计。 + +1. 初始化阶段: + 1. master 持续将收到的更新请求+seq_id 落盘到本地 binlog 中。 + 2. slave 向 master 发起连接并且发送本地binlog中最大的seq_id 为 slave_seq_id。 +2. 执行阶段: + 1. master启动一个独立同步线程与 slave 构建连接。同步线程有两个阶段: + 1. slave_seq_id < local_min_seq_id:master通过fork创建内存快照发送,且发送同时刻的local_max_seq_id。 + 2. slave_seq_id < local_max_seq_id:通过自实现同步协议批量发送binlog的seq,并且回包new_slave_seq_id。 + 3. slave_seq_id == local_max_seq_id:线程休眠。 + 2. master收到新的请求的时候,会通过条件变量唤醒同步线程。 + +#### 基于ebpf的实时数据同步设计。 + +**基准性能**:Kvstore QPS ~90w(无持久化/同步)。 + +ebpf对程序的影响要考虑如下方面: + +1. eBPF 探针的触发频率(上下文切换) + +2. 数据拷贝方式 + + + +**用户态探针 (Uprobe) 的开销**, + +1. **逐条命令探测**。QPS大概在 25w左右。**原因**:Uprobe 基于断点指令(int3)实现,用户态 → 异常 → 内核 → eBPF → 返回用户态,高频触发会导致 CPU 流水线停顿严重。 +2. **批处理探测**。QPS大概在85w左右。大幅减少了上下文切换次数,平摊了中断开销。 + +**内核态探针 (Tracepoint/Kprobe) 的开销** + +1. **Tracepoint (sys_exit_recvfrom)等** + 1. 纯探测:QPS ~85w。 + 2. 带数据拷贝 (read_user):QPS 降至 ~70w。**原因**:**bpf_probe_read_user** 是一个非常重的 helper,开销远大于一次 memcpy。 +2. **Kprobe (tcp_rcv_established)等** + 1. 纯探测:QPS ~86w。 + 2. 带数据拷贝 (read_kernel):QPS ~83w。**原因**:比**bpf_probe_read_user**轻得多。 + 3. 问题:此时数据位于 TCP 协议栈底层,可能存在分片(Fragment)、乱序或非线性存储(Paged SKB),需要处理复杂的数据重组逻辑。 + +工作流程: + +1. **内核态捕获**: + 1. eBPF 程序挂载于内核网络路径( TC 或 TCP 层),拦截流量。 + 2. 使用 bpf_probe_read_kernel 或 bpf_skb_load_bytes 高效提取数据载荷。 + 3. 通过 bpf_ringbuf_submit 将数据写入环形缓冲区。 +2. **用户态转发**: + 1. 独立进程消费 RingBuffer。 + 2. 将数据暂存入本地队列,平滑突发流量,防止 RingBuffer 溢出导致的数据丢失。 +3. **状态机控制**: + 1. **SYNC 阶段**:探测到 __ssync,识别 Slave 连接信息(IP/Port),标记状态为“同步中”。 + 2. **READY 阶段**:探测到 __sready,标志全量同步完成。 + 3. **实时转发**:Agent 启动独立线程,消耗 Pending Queue,将增量数据通过标准 TCP 发送给 Slave。 + +##### TC 与 XDP + +网络栈 + +``` +-> [ 网卡 (NIC) ] +-> [ XDP (eXpress Data Path) ] <--- xdp 处理原始帧 no skb +-> [ sk_buff 分配 ] +-> [ TC (Traffic Control) Ingress ] | tc 可操作 on skb +-> [ Netfilter (PREROUTING) ] +-> [ IP 协议栈 ] | ip_rcv -> ip_local_deliver +-> [ TCP 协议栈 ] | tcp_v4_rcv -> tcp_rcv_established -> tcp_data_queue +producer skb -> sk->sk_receive_queue +____________________________________________________________________ +consumer sk->sk_receive_queue +-> [ Socket Layer ] | tcp_recvmsg 拷贝到内存 +-> [ Syscall Interface ] | sys_exit_recvfrom +-> [ 用户态应用 (Kvstore) ] +``` + +XDP 是网卡驱动层的探点,操作系统刚刚收到数据包(DMA读入ringbuffer,CRC校验),还没有分配sk_buffer。数据形态是**裸的以太网帧** 。 + +TC 是在 sk_buff 分配之后,IP 协议栈处理之前的探测点。数据形态是__sk_buffer。 + +TCP协议栈是分界点, + +#### 内存泄露探测功能,实现热插拔。 + +方案1 基于bpf + +1. 通过预定义宏__FILE__等封装一个内存分配宏定义,向真正的分配函数传递代码位置等信息。 +2. 通过bpf探测内存分配的地址、大小、文件、代码位置、函数等信息,并且记录。 +3. 通过bpf探测内存释放的指针信息,然后释放。 +4. 打印最终剩余的内存地址。 +5. 缺点:bpf 探测 malloc 等对性能的影响非常的大。 + +方案2 基于全局变量启用的代码内置探测工具,在网络层接收启用/关闭探测工具的请求。 + +1. 分配时打开一个文件,关闭时删除。 +2. 分配时使用kv保存,删除时删除k。 + +都对性能和内存有很大的影响,不建议长时间启用。 + +#### 实现支持分配可变长度内存块的内存池。 + +1. 熟悉了glibc 的 ptmalloc的底层操作。默认阈值约 128KB,且会根据分配行为动态调整。 + + 1. '<= 默认阈值 通过 brk/sbrk 扩展连续堆,堆里维护了 chunk 结构。 + 2. '> 默认阈值 的块用 mmap 独立申请,释放时 munmap。 + 3. 每个 chunk 头部存大小信息(通常 8~16 字节),用户拿到的是 chunk + 头部后的地址。 + 4. 空闲 chunk 按大小分到多个 bin(tcache、fastbin、small bin、large bin 等),fastbin 和 tcache 为了速度不合并相邻空闲块。 + 5. 缺点: + 1. 分配路径有较多分支判断和链表遍历,不是严格 O(1)。 + 2. 小块故意不合并(fastbin/tcache)会导致外部碎片。 + 3. 长期运行内存利用率下降。 + +2. 内存池实现: + + 1. 内存池有多个桶,桶中存储固定大小的块。分配/释放均为 O(1) + 2. 以 huge page 为单位向系统申请内存并切分为固定块。 + 3. 当页内块全部释放时整页归还系统,显著降低长期碎片。 + 4. 通过地址对齐 O(1) 反推出页头元信息(所属桶、空闲计数)。 + 5. malloc通常向上对齐,桶对应的存储大小可以根据不同系统设定,减少内部碎片。 + 6. 使用 bitmap + freelist 防 double free 且无额外查找开销。 + 7. 每线程独立内存池,相对malloc更少的锁竞争。 + 8. 大块分配自动退化为 malloc 处理。 + + 相比 ptmalloc,该设计消除了外部碎片,降低了系统调用次数,并在多线程场景下显著提升分配性能与内存利用率。 \ No newline at end of file diff --git a/config/config.xml b/config/config.xml index f13c407..81bdbec 100644 --- a/config/config.xml +++ b/config/config.xml @@ -6,7 +6,7 @@ master - enable + disable 192.168.10.129 8888 @@ -18,7 +18,7 @@ - incremental + none data kvs_oplog.db diff --git a/ebpf/c/replica b/ebpf/c/replica new file mode 100755 index 0000000..ebf43be Binary files /dev/null and b/ebpf/c/replica differ diff --git a/ebpf/c/replica.c b/ebpf/c/replica.c index 37eb545..9d358e7 100644 --- a/ebpf/c/replica.c +++ b/ebpf/c/replica.c @@ -189,6 +189,7 @@ static void* reader_thread_func(void *arg) uint64_t last = __atomic_load_n(&g_shm.hdr->last_seq, __ATOMIC_ACQUIRE); if (local_seq > last) { // 没有新数据,短暂休眠避免空转 + usleep(500); continue; } if (read_off+ sizeof(replica_rec_hdr_t) >= g_shm.hdr->capacity) { diff --git a/ebpf/leak_detect/complete.bpf b/ebpf/leak_detect/complete.bpf new file mode 100644 index 0000000..5a3483e --- /dev/null +++ b/ebpf/leak_detect/complete.bpf @@ -0,0 +1,39 @@ +#!/usr/bin/env bpftrace + +BEGIN +{ + printf("开始统计 kvstore 进程的 __completed_cmd 调用次数...\n"); + printf("每 5 秒打印一次统计,Ctrl-C 退出\n\n"); + + // 统计变量 + @enter = 0; + @exit = 0; +} + +interval:s:5 +{ + time("%H:%M:%S"); + printf(" __completed_enter_cmd 调用次数: %10d\n", @enter); + printf(" __completed_exit_cmd 调用次数: %10d\n", @exit); + + // 可选:如果想每轮清零统计,取消下面注释 + // clear(@enter); + // clear(@exit); +} + +uprobe:/home/lian/share/9.1-kvstore/kvstore:__completed_cmd +{ + @exit++; +} + +uretprobe:/home/lian/share/9.1-kvstore/kvstore:__completed_cmd +{ + @enter++; +} + + + +END +{ + printf("\n最终统计:\n"); +} \ No newline at end of file diff --git a/ebpf/leak_detect/tcp_probe.bpf b/ebpf/leak_detect/tcp_probe.bpf new file mode 100644 index 0000000..ddec305 --- /dev/null +++ b/ebpf/leak_detect/tcp_probe.bpf @@ -0,0 +1,39 @@ +#!/usr/bin/env bpftrace + +BEGIN +{ + printf("开始统计 kvstore 进程的 tcp_rcv_established 调用次数...\n"); + printf("每 5 秒打印一次统计,Ctrl-C 退出\n\n"); + + // 统计变量 + @enter = 0; + @exit = 0; +} + +interval:s:5 +{ + time("%H:%M:%S"); + printf(" tcp_rcv_established 调用次数: %10d\n", @enter); + printf(" tcp_rcv_established ret 调用次数: %10d\n", @exit); + + // 可选:如果想每轮清零统计,取消下面注释 + // clear(@enter); + // clear(@exit); +} + +kprobe:tcp_rcv_established +{ + @enter++; +} + +kretprobe:tcp_rcv_established +{ + @exit++; +} + +END +{ + printf("\n最终统计:\n"); + printf("tcp_rcv_established : %d 次\n", @enter); + printf("tcp_rcv_established ret: %d 次\n", @exit); +} \ No newline at end of file diff --git a/ebpf/leak_detect/tracepoint.bpf b/ebpf/leak_detect/tracepoint.bpf new file mode 100644 index 0000000..d637526 --- /dev/null +++ b/ebpf/leak_detect/tracepoint.bpf @@ -0,0 +1,41 @@ +#!/usr/bin/env bpftrace + +BEGIN +{ + printf("开始统计 kvstore 进程的 recvfrom 调用次数...\n"); + printf("每 5 秒打印一次统计,Ctrl-C 退出\n\n"); + + // 统计变量 + @enter = 0; + @exit = 0; +} + +interval:s:5 +{ + time("%H:%M:%S"); + printf(" sys_enter_recvfrom 调用次数: %10d\n", @enter); + printf(" sys_exit_recvfrom 调用次数: %10d\n", @exit); + + // 可选:如果想每轮清零统计,取消下面注释 + // clear(@enter); + // clear(@exit); +} + +tracepoint:syscalls:sys_enter_recvfrom +/comm == "kvstore"/ +{ + @enter++; +} + +tracepoint:syscalls:sys_exit_recvfrom +/comm == "kvstore"/ +{ + @exit++; +} + +END +{ + printf("\n最终统计:\n"); + printf("sys_enter_recvfrom: %d 次\n", @enter); + printf("sys_exit_recvfrom : %d 次\n", @exit); +} \ No newline at end of file diff --git a/ebpf/old.c/Makefile b/ebpf/old.c/Makefile index 3a81dbb..fe9814b 100644 --- a/ebpf/old.c/Makefile +++ b/ebpf/old.c/Makefile @@ -1,12 +1,12 @@ # SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) OUTPUT := .output CLANG ?= clang -LIBBPF_SRC := $(abspath ../../libbpf/src) -BPFTOOL_SRC := $(abspath ../../bpftool/src) +LIBBPF_SRC := $(abspath ../../libbpf-bootstrap/libbpf/src) +BPFTOOL_SRC := $(abspath ../../libbpf-bootstrap/bpftool/src) LIBBPF_OBJ := $(abspath $(OUTPUT)/libbpf.a) BPFTOOL_OUTPUT ?= $(abspath $(OUTPUT)/bpftool) BPFTOOL ?= $(BPFTOOL_OUTPUT)/bootstrap/bpftool -LIBBLAZESYM_SRC := $(abspath ../../blazesym/) +LIBBLAZESYM_SRC := $(abspath ../../libbpf-bootstrap/blazesym/) LIBBLAZESYM_INC := $(abspath $(LIBBLAZESYM_SRC)/capi/include) LIBBLAZESYM_OBJ := $(abspath $(OUTPUT)/libblazesym_c.a) ARCH ?= $(shell uname -m | sed 's/x86_64/x86/' \ @@ -16,11 +16,11 @@ ARCH ?= $(shell uname -m | sed 's/x86_64/x86/' \ | sed 's/mips.*/mips/' \ | sed 's/riscv64/riscv/' \ | sed 's/loongarch64/loongarch/') -VMLINUX := ../../vmlinux.h/include/$(ARCH)/vmlinux.h +VMLINUX := ../../libbpf-bootstrap/vmlinux.h/include/$(ARCH)/vmlinux.h # Use our own libbpf API headers and Linux UAPI headers distributed with # libbpf to avoid dependency on system-wide headers, which could be missing or # outdated -INCLUDES := -I$(OUTPUT) -I../../libbpf/include/uapi -I$(dir $(VMLINUX)) -I$(LIBBLAZESYM_INC) +INCLUDES := -I$(OUTPUT) -I../../libbpf-bootstrap/libbpf/include/uapi -I$(dir $(VMLINUX)) -I$(LIBBLAZESYM_INC) CFLAGS := -g -Wall ALL_LDFLAGS := $(LDFLAGS) $(EXTRA_LDFLAGS) diff --git a/ebpf/old.c/replica b/ebpf/old.c/replica new file mode 100755 index 0000000..ed31c84 Binary files /dev/null and b/ebpf/old.c/replica differ diff --git a/ebpf/old.c/replica.bpf.c b/ebpf/old.c/replica.bpf.c index 14c6778..1cd6588 100644 --- a/ebpf/old.c/replica.bpf.c +++ b/ebpf/old.c/replica.bpf.c @@ -1,80 +1,133 @@ -// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause -/* Copyright (c) 2020 Facebook */ #include "vmlinux.h" #include #include #include +#include #include "replica.h" char LICENSE[] SEC("license") = "Dual BSD/GPL"; +#define FLAG_SSYNC_HAPPENED 0 +#define TARGET_PORT 8888 + +/* ================= BPF Maps ================= */ + struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __uint(key_size, sizeof(int)); - __uint(value_size, sizeof(int)); -} events SEC(".maps"); + __uint(type, BPF_MAP_TYPE_ARRAY); + __type(key, __u32); + __type(value, __u32); + __uint(max_entries, 1); +} flags SEC(".maps"); -/* __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq); */ -SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__completed_cmd") -int BPF_KPROBE(handle_completed_cmd, - const __u8 *cmd, size_t len, __u64 seq) +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 1 << 26); // 64MB +} rb SEC(".maps"); + +/* ================= Helper Functions ================= */ + +// 无需 process filter,改用 socket port filter + +/* ================= Kernel Hooks (TCP Layer) ================= */ + +/* + * 使用 kprobe 挂载 tcp_rcv_established + * 此时 skb 包含完整的 TCP 包(Header + Payload),数据在内核态。 + */ +SEC("kprobe/tcp_rcv_established") +int BPF_KPROBE(trace_tcp_rcv, struct sock *sk, struct sk_buff *skb) { - struct replica_event evt = {}; - __u32 copy_len; + // 1. 检查 SSYNC 标志是否已开启 (只在全量同步后开始抓包) + __u32 flag_key = FLAG_SSYNC_HAPPENED; + __u32 *flag_val = bpf_map_lookup_elem(&flags, &flag_key); + if (!flag_val || *flag_val == 0) + return 0; - evt.type = EVENT_COMPLETED_CMD; - evt.complete.seq = seq; + // 2. 过滤端口 8888 + // sk->sk_num 存储的是 Host Byte Order 的本地端口 + __u16 lport = BPF_CORE_READ(sk, __sk_common.skc_num); + if (lport != TARGET_PORT) + return 0; - copy_len = len; - if (copy_len > MAX_CMD_LEN) - copy_len = MAX_CMD_LEN; + // 3. 计算数据长度 + // 在 tcp_rcv_established 中,skb->len 是 (TCP Header + Data) 的长度 + // skb->data 指向 TCP Header 的起始位置 + unsigned int skb_len = BPF_CORE_READ(skb, len); + + // 读取 TCP Header 长度 (doff 字段,单位是 4字节) + // 需要读取 skb->data 指向的内存的前几个字节来获取 doff + unsigned char *skb_data = BPF_CORE_READ(skb, data); + + // 读取 TCP Header 的第 12 个字节 (包含 Data Offset) + // Offset 12: Data Offset (4 bits) | Reserved (3 bits) | NS (1 bit) + unsigned char doff_byte; + if (bpf_probe_read_kernel(&doff_byte, 1, skb_data + 12) < 0) + return 0; + + unsigned int tcp_hdr_len = (doff_byte >> 4) * 4; + + // 计算 Payload 长度 + if (skb_len <= tcp_hdr_len) + return 0; // 只有 ACK 没有数据,或者是控制包 - evt.complete.len = copy_len; + unsigned int payload_len = skb_len - tcp_hdr_len; - if (cmd) - bpf_probe_read_user(evt.complete.cmd, copy_len, cmd); + // 4. 准备 RingBuffer 数据 + struct replica_event *e = bpf_ringbuf_reserve(&rb, sizeof(*e), 0); + if (!e) + return 0; - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, - &evt, sizeof(evt)); + e->type = EVENT_COMPLETED_CMD; + + // 截断超长数据 + if (payload_len > MAX_CMD_LEN) + e->complete.len = MAX_CMD_LEN; + else + e->complete.len = payload_len; + + // 5. 核心修改:使用 bpf_probe_read_kernel 读取数据 + // 数据起始位置 = skb->data + tcp_hdr_len + if (bpf_probe_read_kernel(&e->complete.cmd[0], e->complete.len, skb_data + tcp_hdr_len) < 0) { + bpf_ringbuf_discard(e, 0); + return 0; + } + + bpf_ringbuf_submit(e, 0); return 0; } -/* __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long seq); */ +/* ================= Uprobe Hooks================= */ + SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__ssync") int BPF_KPROBE(handle_ssync, - const __u8 *ip, __u32 ip_len, int port, __u64 seq) + const __u8 *ip, __u32 ip_len, int port, __u64 seq_unused) { - struct replica_event evt = {}; + __u32 key = FLAG_SSYNC_HAPPENED; + __u32 val = 1; + bpf_map_update_elem(&flags, &key, &val, BPF_ANY); - evt.type = EVENT_SSYNC; - evt.sync.seq = seq; - evt.sync.port = port; + struct replica_event *e = bpf_ringbuf_reserve(&rb, sizeof(*e), 0); + if (!e) return 0; + + e->type = EVENT_SSYNC; + e->sync.port = port; __u32 copy_len = ip_len; - if (copy_len > sizeof(evt.sync.ip)) - copy_len = sizeof(evt.sync.ip); + if (copy_len > sizeof(e->sync.ip)) copy_len = sizeof(e->sync.ip); + if (ip) bpf_probe_read_user(e->sync.ip, copy_len, ip); - if (ip) - bpf_probe_read_user(evt.sync.ip, copy_len, ip); - - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, - &evt, sizeof(evt)); + bpf_ringbuf_submit(e, 0); return 0; } -/* __sready(void); */ SEC("uprobe//home/lian/share/9.1-kvstore/kvstore:__sready") int BPF_KPROBE(handle_sready) { - struct replica_event evt = {}; + struct replica_event *e = bpf_ringbuf_reserve(&rb, sizeof(*e), 0); + if (!e) return 0; - evt.type = EVENT_SREADY; - - bpf_perf_event_output(ctx, &events, - BPF_F_CURRENT_CPU, - &evt, sizeof(evt)); + e->type = EVENT_SREADY; + bpf_ringbuf_submit(e, 0); return 0; } \ No newline at end of file diff --git a/ebpf/old.c/replica.c b/ebpf/old.c/replica.c index 544c85f..f64c83c 100644 --- a/ebpf/old.c/replica.c +++ b/ebpf/old.c/replica.c @@ -1,5 +1,3 @@ -// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) -/* Copyright (c) 2020 Facebook */ #include #include #include @@ -12,6 +10,7 @@ #include #include #include +#include #include "replica.h" @@ -20,10 +19,9 @@ typedef enum { OFFLINE = 0, ONLINE = 1, -}replica_state_e ; +} replica_state_e; struct cmd_node { - __u64 seq; __u32 len; uint8_t *cmd; struct cmd_node *next; @@ -32,7 +30,7 @@ struct cmd_node { struct pending_queue { struct cmd_node *head; struct cmd_node *tail; - int count; + int count; }; /* ================= 全局状态 ================= */ @@ -43,7 +41,6 @@ static int epollfd = -1; static char peer_ip[MAX_IP_LEN]; static int peer_port; -static __u64 peer_seq; static struct pending_queue pending = { .head = NULL, @@ -66,7 +63,7 @@ static void pending_free() q->count = 0; } -static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd) +static void pending_push(__u32 len, const uint8_t *cmd) { struct cmd_node *node = malloc(sizeof(*node)); if (!node) @@ -79,7 +76,6 @@ static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd) } memcpy(node->cmd, cmd, len); - node->seq = seq; node->len = len; node->next = NULL; @@ -93,72 +89,66 @@ static void pending_push(__u64 seq, __u32 len, const uint8_t *cmd) pending.count++; } -static void pending_gc(__u64 min_seq) -{ - struct cmd_node *cur = pending.head; - - int n = pending.count; - while (cur && cur->seq < min_seq) { - struct cmd_node *tmp = cur; - cur = cur->next; - - free(tmp->cmd); - free(tmp); - pending.count--; - } - - DEBUGLOG("gc:%d\n", n-pending.count); - - pending.head = cur; - if (!cur) - pending.tail = NULL; -} - +static long long int sendn = 0; static void pending_send_all(void) { struct cmd_node *cur = pending.head; - while (cur) { - int rt = send(sockfd, cur->cmd, cur->len, 0); + int need_out = 0; + int sent_count = 0; + const int MAX_BATCH = 100; // 批量发送上限,避免阻塞过久 - if(rt == (int)cur->len){ + while (cur && sent_count < MAX_BATCH) { + // 使用 MSG_MORE 合并多个小包 + int flags = (cur->next && sent_count < MAX_BATCH - 1) ? MSG_MORE : 0; + int rt = send(sockfd, cur->cmd, cur->len, flags); + + if (rt == (int)cur->len) { + sendn += rt; + printf("%s\n", cur->cmd); struct cmd_node *tmp = cur; cur = cur->next; - free(tmp->cmd); free(tmp); pending.count--; - }else{ - DEBUGLOG("error\n"); - // 失败:不移动 cur,直接 break - if (rt < 0) { + pending.head = cur; + sent_count++; + } else if (rt > 0) { + sendn += rt; + memmove(cur->cmd, cur->cmd + rt, cur->len - rt); + cur->len -= rt; + need_out = 1; + break; + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + need_out = 1; + break; + } else { perror("send failed"); - if (errno == ECONNRESET || errno == EPIPE) { - state = OFFLINE; - if (sockfd >= 0) { - close(sockfd); - sockfd = -1; - DEBUGLOG("connect closed\n"); - } - } else if (rt == 0) { - fprintf(stderr, "send returned 0 (peer closed?)\n"); - } else { - fprintf(stderr, "partial send: %d/%u\n", rt, cur->len); - } - + state = OFFLINE; break; } } } + DEBUGLOG("sendn :%lld\n", sendn); pending.head = cur; - if(!cur) - pending.tail = NULL; + if (!cur) pending.tail = NULL; + + if (sockfd >= 0 && state == ONLINE) { + struct epoll_event ev = {0}; + ev.data.fd = sockfd; + ev.events = EPOLLIN; + if (need_out || pending.head) { + ev.events |= EPOLLOUT; + } + epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev); + } } /* ================= 网络逻辑 ================= */ static void try_connect(void) { - if(sockfd > 0){ + if (sockfd > 0) { close(sockfd); sockfd = -1; } @@ -170,14 +160,14 @@ static void try_connect(void) addr.sin_port = htons(peer_port); inet_pton(AF_INET, peer_ip, &addr.sin_addr); - for(i = 0;i < 10; ++ i){ + for (i = 0; i < 10; ++i) { sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket"); return; } - DEBUGLOG("connect try %d...\n", i + 1); + DEBUGLOG("connect try %d... %s:%d\n", i + 1, peer_ip, peer_port); if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == 0) { DEBUGLOG("connect success: %s:%d\n", peer_ip, peer_port); @@ -190,7 +180,10 @@ static void try_connect(void) epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev); state = ONLINE; - pending_send_all(); + if (pending.head) { + ev.events = EPOLLIN | EPOLLOUT; + epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev); + } return; } @@ -234,17 +227,10 @@ static void handle_socket_readable(void) static void handle_socket_writable(void) { pending_send_all(); - if (!pending.head) { - struct epoll_event ev; - ev.events = EPOLLIN; // 只监听读 - ev.data.fd = sockfd; - epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev); - } } - -/* ================= perf buffer 回调 ================= */ -static void handle_event(void *ctx, int cpu, void *data, __u32 size) +/* ================= ring buffer 回调 ================= */ +static int handle_event(void *ctx, void *data, size_t size) { struct replica_event *evt = data; switch (evt->type) { @@ -252,20 +238,18 @@ static void handle_event(void *ctx, int cpu, void *data, __u32 size) case EVENT_SSYNC: strncpy(peer_ip, evt->sync.ip, sizeof(peer_ip)); peer_port = evt->sync.port; - peer_seq = evt->sync.seq; - DEBUGLOG("SSYNC [seq:%lld], [%s:%d]\n", peer_seq, peer_ip, peer_port); - + DEBUGLOG("SSYNC [%s:%d]\n", peer_ip, peer_port); state = OFFLINE; - pending_gc(peer_seq); break; case EVENT_COMPLETED_CMD: - // DEBUGLOG("CMD [seq:%lld], cmd:\n[\n%s]\n", evt->complete.seq, evt->complete.cmd); - pending_push(evt->complete.seq, - evt->complete.len, - evt->complete.cmd); + // 这里收到的可能是半个命令,或者是多个命令的粘包 + // 但对于转发器来说,只是字节流,直接 push 即可 + if (evt->complete.len > 0) { + pending_push(evt->complete.len, evt->complete.cmd); + } - if (state == ONLINE && sockfd >= 0) { + if (state == ONLINE && sockfd >= 0 && pending.head) { struct epoll_event ev; ev.events = EPOLLIN | EPOLLOUT; ev.data.fd = sockfd; @@ -274,82 +258,80 @@ static void handle_event(void *ctx, int cpu, void *data, __u32 size) break; case EVENT_SREADY: - DEBUGLOG("SREADY \n"); + DEBUGLOG("SREADY\n"); if (state == OFFLINE) try_connect(); break; } + return 0; } -/* ================= main ================= */ int main(int argc, char **argv) { - struct replica_bpf *skel; - struct perf_buffer *pb = NULL; - int err; + struct replica_bpf *skel; + struct ring_buffer *rb = NULL; + int err; - /* Open BPF application */ - skel = replica_bpf__open(); - if (!skel) { - fprintf(stderr, "Failed to open BPF skeleton\n"); - return 1; - } + // 提高 rlimit 以允许加载 BPF + struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY}; + setrlimit(RLIMIT_MEMLOCK, &r); - /* Load & verify BPF programs */ - err = replica_bpf__load(skel); - if (err) { - fprintf(stderr, "Failed to load and verify BPF skeleton\n"); - goto cleanup; - } + skel = replica_bpf__open(); + if (!skel) { + fprintf(stderr, "Failed to open BPF skeleton\n"); + return 1; + } - /* Attach tracepoint handler */ - err = replica_bpf__attach(skel); - if (err) { - fprintf(stderr, "Failed to attach BPF skeleton\n"); - goto cleanup; - } - - printf("Successfully started! \n"); - - - pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8, - handle_event, NULL, NULL, NULL); - - if(!pb){ - goto cleanup; - } - - epollfd = epoll_create1(0); - if (epollfd < 0) { - fprintf(stderr, "epoll_create1 failed\n"); + err = replica_bpf__load(skel); + if (err) { + fprintf(stderr, "Failed to load BPF skeleton\n"); goto cleanup; } - while (1) { - struct epoll_event events[10]; + err = replica_bpf__attach(skel); + if (err) { + fprintf(stderr, "Failed to attach BPF skeleton\n"); + goto cleanup; + } - perf_buffer__poll(pb, 1000); // 处理事件 + printf("Successfully started! Monitoring TCP port 8888 (Kernel Side)...\n"); + + rb = ring_buffer__new(bpf_map__fd(skel->maps.rb), handle_event, NULL, NULL); + if (!rb) { + fprintf(stderr, "Failed to create ring buffer\n"); + goto cleanup; + } + + epollfd = epoll_create1(0); + // ... (主循环保持不变) ... + + // 主循环建议: + while (1) { + struct epoll_event events[10]; - if(OFFLINE) continue; + // 既然追求性能,Polling 依然是必要的 + // 10ms 的延迟对于 RingBuffer 消费是可以接受的 + int poll_timeout = (state == ONLINE) ? 10 : 100; + + ring_buffer__poll(rb, poll_timeout); + + if (state == OFFLINE) continue; int nfds = epoll_wait(epollfd, events, 10, 0); for (int i = 0; i < nfds; i++) { if (events[i].data.fd == sockfd) { - if (events[i].events & EPOLLIN) { - handle_socket_readable(); // 快速消费接收数据 - } - if (events[i].events & EPOLLOUT) { - handle_socket_writable(); // 发送数据 - } + if (events[i].events & EPOLLIN) handle_socket_readable(); + if (events[i].events & EPOLLOUT) handle_socket_writable(); } } } - perf_buffer__free(pb); - cleanup: - pending_free(); - if (sockfd >= 0) close(sockfd); - replica_bpf__destroy(skel); - return -err; -} + // ... (清理代码保持不变) ... + if (rb) ring_buffer__free(rb); + pending_free(); + if (sockfd >= 0) close(sockfd); + if (epollfd >= 0) close(epollfd); + replica_bpf__destroy(skel); + return -err; +} \ No newline at end of file diff --git a/ebpf/old.c/replica.h b/ebpf/old.c/replica.h index 130cd0c..c564d3b 100644 --- a/ebpf/old.c/replica.h +++ b/ebpf/old.c/replica.h @@ -1,24 +1,21 @@ #ifndef __REPLICA_H__ #define __REPLICA_H__ - -#define MAX_CMD_LEN 256 +#define MAX_CMD_LEN 4096 #define MAX_IP_LEN 64 enum event_type { - EVENT_COMPLETED_CMD, - EVENT_SSYNC, - EVENT_SREADY, + EVENT_COMPLETED_CMD = 1, + EVENT_SSYNC = 2, + EVENT_SREADY = 3, }; struct complete_cmd_evt { - __u64 seq; __u32 len; __u8 cmd[MAX_CMD_LEN]; }; struct sync_evt { - __u64 seq; char ip[MAX_IP_LEN]; __s32 port; }; @@ -33,5 +30,4 @@ struct replica_event { }; }; - #endif \ No newline at end of file diff --git a/kvstore.c b/kvstore.c index 91bb092..9823b01 100644 --- a/kvstore.c +++ b/kvstore.c @@ -20,6 +20,8 @@ #include #include +#define TIME_COLLECT 0 + extern int slave_bootstrap(const char *listen_ip, int listen_port, const char *master_ip, int master_port); extern mp_pool_t global_mempool; @@ -38,20 +40,15 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ } -// __attribute__((noinline)) -// void __replica_notify(uint64_t seq, uint32_t off, uint32_t len) -// { -// // 空函数即可,目的是让 uprobe 拿到参数 -// asm volatile("" ::: "memory"); -// } - #include #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) #define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) int kvs_protocol(struct conn* conn){ - // struct timeval func_start; - // gettimeofday(&func_start, NULL); - // long total_oplog_us = 0; +#if TIME_COLLECT == 1 + struct timeval func_start; + gettimeofday(&func_start, NULL); + long total_oplog_us = 0; +#endif if (!conn) return -1; char *request = conn->rbuffer; @@ -98,9 +95,10 @@ int kvs_protocol(struct conn* conn){ * 一般也已经把 out_value 设置成了 RESP error,这样客户端能收到错误响应。 * - 如果 dr < 0 但 val.type 没被正确设置,兜底回一个通用错误。 */ - // struct timeval oplog_start, oplog_end; - // gettimeofday(&oplog_start, NULL); - +#if TIME_COLLECT == 1 + struct timeval oplog_start, oplog_end; + gettimeofday(&oplog_start, NULL); +#endif // if(dr < 0){ // if (val.type != RESP_T_SIMPLE_STR && // val.type != RESP_T_ERROR && @@ -158,9 +156,27 @@ int kvs_protocol(struct conn* conn){ if(global_cfg.persistence == PERSIST_INCREMENTAL){ kvs_oplog_append(p, len, global_oplog_fd); } - // gettimeofday(&oplog_end, NULL); - // total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 + - // (oplog_end.tv_usec - oplog_start.tv_usec); + + // __completed_cmd(p, len, global_seq); + // global_seq ++; + if (global_cfg.replica_mode == REPLICA_ENABLE) { + uint32_t off = 0; + int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); + if (ar == 0) { + // __replica_notify(global_seq, off, (uint32_t)len); + global_seq++; + + } else { + // shm 满或异常:你可以选择降级(比如直接跳过复制,或阻塞/丢弃) + // 为了不影响主路径,这里先打印并跳过 + fprintf(stderr, "replica_shm_append failed %d\n", ar); + } + } +#if TIME_COLLECT == 1 + gettimeofday(&oplog_end, NULL); + total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 + + (oplog_end.tv_usec - oplog_start.tv_usec); +#endif /* 构建响应 */ int cap = KVS_MAX_RESPONSE - out_len; @@ -175,16 +191,19 @@ int kvs_protocol(struct conn* conn){ return consumed; } + __completed_cmd(request, consumed, 0); + out_len += resp_len; consumed += len; } - - // struct timeval func_end; - // gettimeofday(&func_end, NULL); - // long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 + - // (func_end.tv_usec - func_start.tv_usec); - // fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); +#if TIME_COLLECT == 1 + struct timeval func_end; + gettimeofday(&func_end, NULL); + long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 + + (func_end.tv_usec - func_start.tv_usec); + fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); +#endif *response_length = out_len; return consumed; diff --git a/memory/mempool.h b/memory/mempool.h index 38782d0..6e1562c 100644 --- a/memory/mempool.h +++ b/memory/mempool.h @@ -7,8 +7,8 @@ #include #include -// #define MEMPOOL_PAGE_SIZE 4096 -#define MEMPOOL_PAGE_SIZE (256 * 1024) +#define MEMPOOL_PAGE_SIZE 4096 +// #define MEMPOOL_PAGE_SIZE (256 * 1024) #define MEMPOOL_BLOCK_MAX_SIZE 512 #define MEMPOOL_ALIGNMENT 8 #define MEMPOOL_NUM_CLASSES (MEMPOOL_BLOCK_MAX_SIZE / MEMPOOL_ALIGNMENT) diff --git a/reactor.c b/reactor.c index f04090a..661cbab 100644 --- a/reactor.c +++ b/reactor.c @@ -243,8 +243,6 @@ int send_cb(int fd) { struct conn *c = &conn_list[fd]; int sent_total = 0; - pthread_mutex_lock(&c->g_sync_lock); - while (c->wlength > 0) { ssize_t n = send(fd, c->wbuffer, (size_t)c->wlength, MSG_NOSIGNAL); if (n > 0) { @@ -268,14 +266,12 @@ int send_cb(int fd) { if (n < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { /* 暂时发不出去,等下一次可写事件 */ - pthread_mutex_unlock(&c->g_sync_lock); set_event(fd, EPOLLOUT, 0); return sent_total; } /* 对端断开 / 其他错误 */ int e = errno; - pthread_mutex_unlock(&c->g_sync_lock); printf("send fd=%d errno=%d %s\n", fd, e, strerror(e)); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); @@ -285,9 +281,8 @@ int send_cb(int fd) { break; } - pthread_mutex_unlock(&c->g_sync_lock); - if (c->wlength > 0) { + if (c->wlength > 0) { /* 还有没发完,继续监听可写 */ set_event(fd, EPOLLOUT, 0); } else { @@ -295,6 +290,8 @@ int send_cb(int fd) { set_event(fd, EPOLLIN, 0); } + // printf("send_total :%d; remain: %d\n", sent_total, c->wlength); + return sent_total; } diff --git a/server.h b/server.h index 1025736..40d5f10 100644 --- a/server.h +++ b/server.h @@ -35,8 +35,6 @@ struct conn { int is_stop; - pthread_mutex_t g_sync_lock; - int status; #if 1 // websocket char *payload;