From 2bdb48d63d86826ba9fa687279b0825215c521e2 Mon Sep 17 00:00:00 2001 From: 1iaan <139833683+1iaan@users.noreply.github.com> Date: Thu, 29 Jan 2026 10:47:24 +0000 Subject: [PATCH] =?UTF-8?q?=E5=B7=B2=E6=9C=89=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=8A=9F=E8=83=BD=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitmodules | 4 + README.md | 3 + config/config.xml | 2 +- diskuring/diskuring.c | 2 - dump/kvs_dump.h | 5 +- dump/kvs_snapshot.c | 129 ++++++++++++++------- ebpf/c/.gitignore | 18 +++ ebpf/c/CMakeLists.txt | 133 ++++++++++++++++++++++ ebpf/c/Makefile | 139 +++++++++++++++++++++++ ebpf/c/replica.bpf.c | 103 +++++++++++++++++ ebpf/c/replica.c | 257 ++++++++++++++++++++++++++++++++++++++++++ ebpf/c/replica.h | 36 ++++++ ebpf/c/xmake.lua | 126 +++++++++++++++++++++ init.sh | 7 ++ kvs_array_bin.c | 6 +- kvs_hash | Bin 16616 -> 0 bytes kvs_hash_bin.c | 4 +- kvs_protocol_resp.c | 9 +- kvs_protocol_resp.h | 3 +- kvs_rbtree_bin.c | 14 +-- kvs_slave.c | 237 +++++++++++++++++++++++++++----------- kvstore.c | 7 +- kvstore.h | 10 +- libbpf-bootstrap | 1 + reactor.c | 14 +-- server.h | 2 +- test/test_client.h | 2 +- 27 files changed, 1134 insertions(+), 139 deletions(-) create mode 100644 ebpf/c/.gitignore create mode 100644 ebpf/c/CMakeLists.txt create mode 100644 ebpf/c/Makefile create mode 100644 ebpf/c/replica.bpf.c create mode 100644 ebpf/c/replica.c create mode 100644 ebpf/c/replica.h create mode 100644 ebpf/c/xmake.lua create mode 100755 init.sh delete mode 100755 kvs_hash create mode 160000 libbpf-bootstrap diff --git a/.gitmodules b/.gitmodules index 19d7edb..8f10d30 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,7 @@ [submodule "NtyCo"] path = NtyCo url = https://github.com/wangbojing/NtyCo.git +[submodule "libbpf-bootstrap"] + path = libbpf-bootstrap + url = https://github.com/libbpf/libbpf-bootstrap + ignore = dirty diff --git a/README.md b/README.md index 6638702..2cdc6d7 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,11 @@ sudo apt install -y bpftrace git clone git@gitlab.0voice.com:lianyiheng/9.1-kvstore.git cd 9.1-kvstore/ +git submodule update --init --recursive +./init.sh make + ``` ## 测试 diff --git a/config/config.xml b/config/config.xml index b29030d..f1e2e71 100644 --- a/config/config.xml +++ b/config/config.xml @@ -1,7 +1,7 @@ - 127.0.0.1 + 192.168.10.129 8888 master diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index a630d34..a0666ce 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -3,8 +3,6 @@ #include #include -iouring_ctx_t global_uring_ctx; - void task_init(task_t *t) { pthread_mutex_init(&t->m, NULL); diff --git a/dump/kvs_dump.h b/dump/kvs_dump.h index 3d9c5cc..95b9a51 100644 --- a/dump/kvs_dump.h +++ b/dump/kvs_dump.h @@ -2,15 +2,16 @@ #define __KVS_DUMP_H__ #include "kvstore.h" +#include "diskuring/diskuring.h" extern char global_oplog_file[256]; extern char global_array_file[256]; extern char global_rbtree_file[256]; extern char global_hash_file[256]; -int kvs_create_snapshot(const char* array_file, const char* rbtree_file, const char* hash_file); +int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file); int kvs_create_snapshot_async(const char *ip, int port); -void __create_snapshot_ok(const char* array_file, const char* rbtree_file, const char* hash_file); +void __complete_snapshot(const char *ip, int port, const char *array_file, const char *rbtree_file, const char *hash_file); extern int global_oplog_fd; diff --git a/dump/kvs_snapshot.c b/dump/kvs_snapshot.c index 3c7d605..ba11b85 100644 --- a/dump/kvs_snapshot.c +++ b/dump/kvs_snapshot.c @@ -1,16 +1,20 @@ #include "kvstore.h" +#include "diskuring/diskuring.h" #include +#include +#include +#include char global_oplog_file[256] = "kvs_oplog.default.db"; char global_array_file[256] = "kvs_array.default.db"; char global_rbtree_file[256] = "kvs_rbtree.default.db"; char global_hash_file[256] = "kvs_hash.default.db"; -int kvs_create_snapshot(const char* array_file, const char* rbtree_file, const char* hash_file){ +int kvs_create_snapshot(iouring_ctx_t *uring, const char* array_file, const char* rbtree_file, const char* hash_file){ int ret = 0; int rc = 0; #if ENABLE_ARRAY - rc = kvs_array_save(&global_array, array_file); + rc = kvs_array_save(uring, &global_array, array_file); if(rc < 0){ printf("kvs_engine_array save error\n"); ret = -1; @@ -18,7 +22,7 @@ int kvs_create_snapshot(const char* array_file, const char* rbtree_file, const c #endif #if ENABLE_RBTREE - rc = kvs_rbtree_save(&global_rbtree, rbtree_file); + rc = kvs_rbtree_save(uring, &global_rbtree, rbtree_file); if(rc < 0){ printf("kvs_engine_rbtree save error\n"); ret = -1; @@ -26,7 +30,7 @@ int kvs_create_snapshot(const char* array_file, const char* rbtree_file, const c #endif #if ENABLE_HASH - rc = kvs_hash_save(&global_hash, hash_file); + rc = kvs_hash_save(uring, &global_hash, hash_file); if(rc < 0){ printf("kvs_engine_hash save error\n"); ret = -1; @@ -35,44 +39,93 @@ int kvs_create_snapshot(const char* array_file, const char* rbtree_file, const c return ret; } -void __create_snapshot_ok(const char* array_file, const char* rbtree_file, const char* hash_file){ +void __complete_snapshot(const char *ip, int port, const char *array_file, const char *rbtree_file, const char *hash_file){ } -int kvs_create_snapshot_async(const char *ip, int port){ - int pipefd[2]; // 用于子进程通知主进程 - if (pipe(pipefd) == -1) { perror("pipe"); return -1; } - pid_t pid = fork(); - if (pid == -1) { perror("fork"); return -1; } +static int send_file_to_ipport(const char *ip, int port, const char *filename) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { perror("socket"); return -1; } - if (pid == 0) { // 子进程 - close(pipefd[0]); // 关闭读端 - // 指定临时文件路径,避免覆盖 global_xxx_file - char tmp_array[128]; // 可写缓冲区 - char tmp_rbtree[128]; - char tmp_hash[128]; + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip); - snprintf(tmp_array, sizeof(tmp_array), "snapshot_array_%s.tmp", ip); - snprintf(tmp_rbtree, sizeof(tmp_rbtree), "snapshot_rbtree_%s.tmp", ip); - snprintf(tmp_hash, sizeof(tmp_hash), "snapshot_hash_%s.tmp", ip); - - int ret = kvs_create_snapshot(tmp_array, tmp_rbtree, tmp_hash); - if (ret == 0) { - // 成功:rename 到最终路径,或直接通知 - write(pipefd[1], "OK", 2); // 通知主进程 - } else { - write(pipefd[1], "ERR", 3); - } - close(pipefd[1]); - _exit(0); // 子进程退出 - - // hook point - __create_snapshot_ok(tmp_array, tmp_rbtree, tmp_hash); - } else { // 主进程 - close(pipefd[1]); // 关闭写端 - // 立即返回,继续处理其他请求(不阻塞) - // 可以记录 pid,在别处 waitpid(pid, NULL, WNOHANG) 检查完成 - // 或用信号:signal(SIGCHLD, handler); 在 handler 中 read(pipefd[0]) 检查 "OK" - return 0; // SYNC 响应成功,主进程继续 + if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + perror("connect"); close(sockfd); return -1; } + + FILE *fp = fopen(filename, "rb"); + if (!fp) { perror("fopen"); close(sockfd); return -1; } + + char buf[4096]; + size_t n; + + while ((n = fread(buf, 1, sizeof(buf), fp)) > 0) { + ssize_t written = 0; + while (written < n) { + ssize_t ret = write(sockfd, buf + written, n - written); + if (ret < 0) { + perror("write"); + fclose(fp); + close(sockfd); + return -1; + } + written += ret; + } + } + + if (ferror(fp)) { + perror("fread"); + fclose(fp); + close(sockfd); + return -1; + } + + fclose(fp); + shutdown(sockfd, SHUT_WR); + close(sockfd); + return 0; +} + +int kvs_create_snapshot_async(const char *ip, int port){ + pid_t pid = fork(); + if (pid == -1) { perror("fork"); return -1; } + + if (pid == 0) { + char tmp_array[128]; + char tmp_rbtree[128]; + char tmp_hash[128]; + + snprintf(tmp_array, sizeof(tmp_array), "data/snapshot_array_%s.tmp.db", ip); + snprintf(tmp_rbtree, sizeof(tmp_rbtree), "data/snapshot_rbtree_%s.tmp.db", ip); + snprintf(tmp_hash, sizeof(tmp_hash), "data/snapshot_hash_%s.tmp.db", ip); + + iouring_ctx_t uring; + iouring_init(&uring, 256); + + int ret = kvs_create_snapshot(&uring, tmp_array, tmp_rbtree, tmp_hash); + if (ret != 0) { + fprintf(stderr, "snapshot creation failed\n"); + _exit(1); + } + + // 发送快照到目标 IP:port + if (send_file_to_ipport(ip, port, tmp_array) != 0 || + send_file_to_ipport(ip, port, tmp_rbtree) != 0 || + send_file_to_ipport(ip, port, tmp_hash) != 0) { + fprintf(stderr, "snapshot send failed\n"); + _exit(1); + } + + // hook 通知 eBPF + __complete_snapshot(ip, port, tmp_array, tmp_rbtree, tmp_hash); + + iouring_shutdown(&uring); + _exit(0); + } else { + + return 0; + } } \ No newline at end of file diff --git a/ebpf/c/.gitignore b/ebpf/c/.gitignore new file mode 100644 index 0000000..9edf6d7 --- /dev/null +++ b/ebpf/c/.gitignore @@ -0,0 +1,18 @@ +/.output +/bootstrap +/bootstrap_legacy +/minimal +/minimal_legacy +/minimal_ns +/uprobe +/kprobe +/fentry +/profile +/usdt +/sockfilter +/tc +/ksyscall +/task_iter +/lsm +/cmake-build-debug/ +/cmake-build-release/ diff --git a/ebpf/c/CMakeLists.txt b/ebpf/c/CMakeLists.txt new file mode 100644 index 0000000..c5b0c92 --- /dev/null +++ b/ebpf/c/CMakeLists.txt @@ -0,0 +1,133 @@ +# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause + +cmake_minimum_required(VERSION 3.16) +project(examples C) + +# Tell cmake where to find BpfObject module +list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../tools/cmake) + +# Build vendored libbpf +include(ExternalProject) +ExternalProject_Add(libbpf + PREFIX libbpf + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../libbpf/src + CONFIGURE_COMMAND "" + BUILD_COMMAND make + CC=${CMAKE_C_COMPILER} + BUILD_STATIC_ONLY=1 + OBJDIR=${CMAKE_CURRENT_BINARY_DIR}/libbpf/libbpf + DESTDIR=${CMAKE_CURRENT_BINARY_DIR}/libbpf + INCLUDEDIR= + LIBDIR= + UAPIDIR= + install install_uapi_headers + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build + BUILD_BYPRODUCTS ${CMAKE_CURRENT_BINARY_DIR}/libbpf/libbpf.a +) + +ExternalProject_Add(bpftool + PREFIX bpftool + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../bpftool/src + CONFIGURE_COMMAND "" + BUILD_COMMAND make bootstrap + OUTPUT=${CMAKE_CURRENT_BINARY_DIR}/bpftool/ + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build +) + +find_program(CARGO_EXISTS cargo) +if(CARGO_EXISTS) + if(CMAKE_CROSSCOMPILING) + # Determine target triple + if(CMAKE_SYSTEM_NAME MATCHES "Linux") + if(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64") + set(CARGO_TARGET "x86_64-unknown-linux-gnu") + elseif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64") + set(CARGO_TARGET "aarch64-unknown-linux-gnu") + else() + message(FATAL_ERROR "Unsupported processor for Linux: ${CMAKE_SYSTEM_PROCESSOR}") + endif() + + if(CMAKE_CXX_COMPILER) + set(RUST_LINKER ${CMAKE_CXX_COMPILER}) + else() + set(RUST_LINKER ${CMAKE_C_COMPILER}) + endif() + else() + message((FATAL_ERROR "Unsupported platform: ${CMAKE_SYSTEM_NAME}")) + endif() + + ExternalProject_Add(blazesym + PREFIX blazesym + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym + CONFIGURE_COMMAND "" + BUILD_COMMAND ${CMAKE_COMMAND} -E env + RUSTFLAGS=-C\ linker=${RUST_LINKER} + cargo build --package=blazesym-c --release --target=${CARGO_TARGET} + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build + ) + else() # Host + ExternalProject_Add(blazesym + PREFIX blazesym + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym + CONFIGURE_COMMAND "" + BUILD_COMMAND + cargo build --package=blazesym-c --release + BUILD_IN_SOURCE TRUE + INSTALL_COMMAND "" + STEP_TARGETS build + ) + endif() +endif() + +# Set BpfObject input parameters -- note this is usually not necessary unless +# you're in a highly vendored environment (like libbpf-bootstrap) +if(${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64") + set(ARCH "x86") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm") + set(ARCH "arm") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64") + set(ARCH "arm64") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "ppc64le") + set(ARCH "powerpc") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "mips") + set(ARCH "mips") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "riscv64") + set(ARCH "riscv") +elseif(${CMAKE_SYSTEM_PROCESSOR} MATCHES "loongarch64") + set(ARCH "loongarch") +endif() + +set(BPFOBJECT_BPFTOOL_EXE ${CMAKE_CURRENT_BINARY_DIR}/bpftool/bootstrap/bpftool) +set(BPFOBJECT_VMLINUX_H ${CMAKE_CURRENT_SOURCE_DIR}/../../vmlinux.h/include/${ARCH}/vmlinux.h) +set(LIBBPF_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/libbpf) +set(LIBBPF_LIBRARIES ${CMAKE_CURRENT_BINARY_DIR}/libbpf/libbpf.a) +find_package(BpfObject REQUIRED) + +# Create an executable for each application +file(GLOB apps *.bpf.c) +if(NOT CARGO_EXISTS) + list(REMOVE_ITEM apps ${CMAKE_CURRENT_SOURCE_DIR}/profile.bpf.c) +endif() +foreach(app ${apps}) + get_filename_component(app_stem ${app} NAME_WE) + + # Build object skeleton and depend skeleton on libbpf build + bpf_object(${app_stem} ${app_stem}.bpf.c) + add_dependencies(${app_stem}_skel libbpf bpftool) + + add_executable(${app_stem} ${app_stem}.c) + target_link_libraries(${app_stem} ${app_stem}_skel) + if(${app_stem} STREQUAL profile) + target_include_directories(${app_stem} PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym/capi/include) + target_link_libraries(${app_stem} + ${CMAKE_CURRENT_SOURCE_DIR}/../../blazesym/target/${CARGO_TARGET}/release/libblazesym_c.a -lpthread -lrt -ldl) + add_dependencies(${app_stem} blazesym) + endif() +endforeach() diff --git a/ebpf/c/Makefile b/ebpf/c/Makefile new file mode 100644 index 0000000..3a81dbb --- /dev/null +++ b/ebpf/c/Makefile @@ -0,0 +1,139 @@ +# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) +OUTPUT := .output +CLANG ?= clang +LIBBPF_SRC := $(abspath ../../libbpf/src) +BPFTOOL_SRC := $(abspath ../../bpftool/src) +LIBBPF_OBJ := $(abspath $(OUTPUT)/libbpf.a) +BPFTOOL_OUTPUT ?= $(abspath $(OUTPUT)/bpftool) +BPFTOOL ?= $(BPFTOOL_OUTPUT)/bootstrap/bpftool +LIBBLAZESYM_SRC := $(abspath ../../blazesym/) +LIBBLAZESYM_INC := $(abspath $(LIBBLAZESYM_SRC)/capi/include) +LIBBLAZESYM_OBJ := $(abspath $(OUTPUT)/libblazesym_c.a) +ARCH ?= $(shell uname -m | sed 's/x86_64/x86/' \ + | sed 's/arm.*/arm/' \ + | sed 's/aarch64/arm64/' \ + | sed 's/ppc64le/powerpc/' \ + | sed 's/mips.*/mips/' \ + | sed 's/riscv64/riscv/' \ + | sed 's/loongarch64/loongarch/') +VMLINUX := ../../vmlinux.h/include/$(ARCH)/vmlinux.h +# Use our own libbpf API headers and Linux UAPI headers distributed with +# libbpf to avoid dependency on system-wide headers, which could be missing or +# outdated +INCLUDES := -I$(OUTPUT) -I../../libbpf/include/uapi -I$(dir $(VMLINUX)) -I$(LIBBLAZESYM_INC) +CFLAGS := -g -Wall +ALL_LDFLAGS := $(LDFLAGS) $(EXTRA_LDFLAGS) + +# APPS = minimal minimal_legacy minimal_ns bootstrap bootstrap_legacy uprobe kprobe fentry \ + usdt sockfilter tc ksyscall task_iter lsm +APPS = replica + +CARGO ?= $(shell which cargo) +ifeq ($(strip $(CARGO)),) +BZS_APPS := +else +BZS_APPS := profile +APPS += $(BZS_APPS) +# Required by libblazesym +ALL_LDFLAGS += -lrt -ldl -lpthread -lm +endif + +# Get Clang's default includes on this system. We'll explicitly add these dirs +# to the includes list when compiling with `-target bpf` because otherwise some +# architecture-specific dirs will be "missing" on some architectures/distros - +# headers such as asm/types.h, asm/byteorder.h, asm/socket.h, asm/sockios.h, +# sys/cdefs.h etc. might be missing. +# +# Use '-idirafter': Don't interfere with include mechanics except where the +# build would have failed anyways. +CLANG_BPF_SYS_INCLUDES ?= $(shell $(CLANG) -v -E - &1 \ + | sed -n '/<...> search starts here:/,/End of search list./{ s| \(/.*\)|-idirafter \1|p }') + +ifeq ($(V),1) + Q = + msg = +else + Q = @ + msg = @printf ' %-8s %s%s\n' \ + "$(1)" \ + "$(patsubst $(abspath $(OUTPUT))/%,%,$(2))" \ + "$(if $(3), $(3))"; + MAKEFLAGS += --no-print-directory +endif + +define allow-override + $(if $(or $(findstring environment,$(origin $(1))),\ + $(findstring command line,$(origin $(1)))),,\ + $(eval $(1) = $(2))) +endef + +$(call allow-override,CC,$(CROSS_COMPILE)cc) +$(call allow-override,LD,$(CROSS_COMPILE)ld) + +.PHONY: all +all: $(APPS) + +.PHONY: clean +clean: + $(call msg,CLEAN) + $(Q)rm -rf $(OUTPUT) $(APPS) + +$(OUTPUT) $(OUTPUT)/libbpf $(BPFTOOL_OUTPUT): + $(call msg,MKDIR,$@) + $(Q)mkdir -p $@ + +# Build libbpf +$(LIBBPF_OBJ): $(wildcard $(LIBBPF_SRC)/*.[ch] $(LIBBPF_SRC)/Makefile) | $(OUTPUT)/libbpf + $(call msg,LIB,$@) + $(Q)$(MAKE) -C $(LIBBPF_SRC) BUILD_STATIC_ONLY=1 \ + OBJDIR=$(dir $@)/libbpf DESTDIR=$(dir $@) \ + INCLUDEDIR= LIBDIR= UAPIDIR= \ + install + +# Build bpftool +$(BPFTOOL): | $(BPFTOOL_OUTPUT) + $(call msg,BPFTOOL,$@) + $(Q)$(MAKE) ARCH= CROSS_COMPILE= OUTPUT=$(BPFTOOL_OUTPUT)/ -C $(BPFTOOL_SRC) bootstrap + + +$(LIBBLAZESYM_SRC)/target/release/libblazesym_c.a:: + $(Q)cd $(LIBBLAZESYM_SRC) && $(CARGO) build --package=blazesym-c --release + +$(LIBBLAZESYM_OBJ): $(LIBBLAZESYM_SRC)/target/release/libblazesym_c.a | $(OUTPUT) + $(call msg,LIB, $@) + $(Q)cp $(LIBBLAZESYM_SRC)/target/release/libblazesym_c.a $@ + +# Build BPF code +$(OUTPUT)/%.bpf.o: %.bpf.c $(LIBBPF_OBJ) $(wildcard %.h) $(VMLINUX) | $(OUTPUT) $(BPFTOOL) + $(call msg,BPF,$@) + $(Q)$(CLANG) -g -O2 -target bpf -D__TARGET_ARCH_$(ARCH) \ + $(INCLUDES) $(CLANG_BPF_SYS_INCLUDES) \ + -c $(filter %.c,$^) -o $(patsubst %.bpf.o,%.tmp.bpf.o,$@) + $(Q)$(BPFTOOL) gen object $@ $(patsubst %.bpf.o,%.tmp.bpf.o,$@) + +# Generate BPF skeletons +$(OUTPUT)/%.skel.h: $(OUTPUT)/%.bpf.o | $(OUTPUT) $(BPFTOOL) + $(call msg,GEN-SKEL,$@) + $(Q)$(BPFTOOL) gen skeleton $< > $@ + +# Build user-space code +$(patsubst %,$(OUTPUT)/%.o,$(APPS)): %.o: %.skel.h + +$(OUTPUT)/%.o: %.c $(wildcard %.h) | $(OUTPUT) + $(call msg,CC,$@) + $(Q)$(CC) $(CFLAGS) $(INCLUDES) -c $(filter %.c,$^) -o $@ + +$(patsubst %,$(OUTPUT)/%.o,$(BZS_APPS)): $(LIBBLAZESYM_OBJ) + +$(BZS_APPS): $(LIBBLAZESYM_OBJ) + +# Build application binary +$(APPS): %: $(OUTPUT)/%.o $(LIBBPF_OBJ) | $(OUTPUT) + $(call msg,BINARY,$@) + $(Q)$(CC) $(CFLAGS) $^ $(ALL_LDFLAGS) -lelf -lz -o $@ + +# delete failed targets +.DELETE_ON_ERROR: + +# keep intermediate (.skel.h, .bpf.o, etc) targets +.SECONDARY: diff --git a/ebpf/c/replica.bpf.c b/ebpf/c/replica.bpf.c new file mode 100644 index 0000000..b9e43b0 --- /dev/null +++ b/ebpf/c/replica.bpf.c @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause +/* Copyright (c) 2020 Facebook */ +#include +#include +#include +#include + +#include "replica.h" + +char LICENSE[] SEC("license") = "Dual BSD/GPL"; + +int my_pid = 0; + +struct { + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(key_size, sizeof(int)); + __uint(value_size, sizeof(int)); +} channel SEC(".maps"); + +SEC("uprobe/kvs_create_snapshot_async") +int uprobe_create_snapshot_async(struct pt_regs *ctx) +{ + struct event ev; + __builtin_memset(&ev, 0, sizeof(ev)); + + const char *ip; + __u32 port; + + ev.type = EVENT_CREATE_SNAPSHOT_ASYNC; + + ip = (const char *)PT_REGS_PARM1(ctx); + port = (__u32)PT_REGS_PARM2(ctx); + + bpf_probe_read_user_str(ev.data.sync.ip, + sizeof(ev.data.sync.ip), + ip); + ev.data.sync.port = port; + + bpf_perf_event_output(ctx, &channel, + BPF_F_CURRENT_CPU, + &ev, sizeof(ev)); + return 0; +} + +SEC("uprobe/__compeleted_cmd") +int uprobe_completed_cmd(struct pt_regs *ctx) +{ + struct event ev; + __builtin_memset(&ev, 0, sizeof(ev)); + + const __u8 *cmd; + __u32 len; + + ev.type = EVENT_COMPLETED_CMD; + + cmd = (const __u8 *)PT_REGS_PARM1(ctx); + len = (__u32)PT_REGS_PARM2(ctx); + + if (len > sizeof(ev.data.cmd.cmd)) + len = sizeof(ev.data.cmd.cmd); + + ev.data.cmd.len = len; + + bpf_probe_read_user(ev.data.cmd.cmd, len, cmd); + + bpf_perf_event_output(ctx, &channel, + BPF_F_CURRENT_CPU, + &ev, sizeof(ev)); + return 0; +} + + +SEC("uprobe/__create_snapshot_ok") +int uprobe_create_snapshot_ok(struct pt_regs *ctx) +{ + struct event ev; + __builtin_memset(&ev, 0, sizeof(ev)); + + const char *array_file; + const char *rbtree_file; + const char *hash_file; + + ev.type = EVENT_CREATE_SNAPSHOT_OK; + + array_file = (const char *)PT_REGS_PARM1(ctx); + rbtree_file = (const char *)PT_REGS_PARM2(ctx); + hash_file = (const char *)PT_REGS_PARM3(ctx); + + bpf_probe_read_user_str(ev.data.ok.array_file, + sizeof(ev.data.ok.array_file), + array_file); + bpf_probe_read_user_str(ev.data.ok.rbtree_file, + sizeof(ev.data.ok.rbtree_file), + rbtree_file); + bpf_probe_read_user_str(ev.data.ok.hash_file, + sizeof(ev.data.ok.hash_file), + hash_file); + + bpf_perf_event_output(ctx, &channel, + BPF_F_CURRENT_CPU, + &ev, sizeof(ev)); + return 0; +} \ No newline at end of file diff --git a/ebpf/c/replica.c b/ebpf/c/replica.c new file mode 100644 index 0000000..c373ab2 --- /dev/null +++ b/ebpf/c/replica.c @@ -0,0 +1,257 @@ +// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) +/* Copyright (c) 2020 Facebook */ +#include +#include +#include +#include +#include "replica.skel.h" +#include +#include +#include +#include + +#include "replica.h" + + +struct cmd_node { + uint8_t *cmd; + size_t len; + struct cmd_node *next; +}; + +struct pending_queue { + struct cmd_node *head; + struct cmd_node *tail; + int count; +}; + +static void queue_init(struct pending_queue *q) { + q->head = q->tail = NULL; + q->count = 0; +} + +static void queue_push(struct pending_queue *q, const uint8_t *cmd, size_t len) { + struct cmd_node *node = malloc(sizeof(*node)); + if (!node) return; + node->cmd = malloc(len); + if (!node->cmd) { free(node); return; } + memcpy(node->cmd, cmd, len); + node->len = len; + node->next = NULL; + if (q->tail) q->tail->next = node; + else q->head = node; + q->tail = node; + q->count++; +} + +static void queue_send_and_clear(struct pending_queue *q, int sock) { + struct cmd_node *node = q->head; + int sent_count = 0; + while (node) { + if (send(sock, node->cmd, node->len, 0) > 0) { + sent_count++; + } + struct cmd_node *tmp = node; + node = node->next; + free(tmp->cmd); + free(tmp); + } + if (sent_count > 0) { + printf("[QUEUE] Sent %d commands to slave\n", sent_count); + } + queue_init(q); +} + +static void queue_free(struct pending_queue *q) { + struct cmd_node *node = q->head; + while (node) { + struct cmd_node *tmp = node; + node = node->next; + free(tmp->cmd); + free(tmp); + } + queue_init(q); +} + +static int send_file(int sock, const char *path) { + FILE *fp = fopen(path, "rb"); + if (!fp) { + printf("[ERROR] Failed to open file: %s\n", path); + return -1; + } + + char buf[4096]; + size_t n, total = 0; + while ((n = fread(buf, 1, sizeof(buf), fp)) > 0) { + if (send(sock, buf, n, 0) < 0) { + fclose(fp); + printf("[ERROR] Failed to send file: %s (sent %zu bytes)\n", path, total); + return -1; + } + total += n; + } + fclose(fp); + printf("[FILE] Sent %s (%zu bytes)\n", path, total); + return 0; +} + +// 全局状态(单 Slave 简化) +static enum state current_state = NOSLAVE; +static char slave_ip[16] = {0}; +static int slave_port = 0; +static char array_file[128] = {0}; +static char rbtree_file[128] = {0}; +static char hash_file[128] = {0}; +static struct pending_queue pending; +static int slave_sock = -1; // 连接 Slave 的 socket + +// 连接 Slave +static int connect_slave() { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) return -1; + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(slave_port); + inet_pton(AF_INET, slave_ip, &addr.sin_addr); + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + close(sock); + return -1; + } + return sock; +} + +static void handle_event(void *ctx, int cpu, void *data, __u32 size) { + struct event *ev = (struct event *)data; + + switch (ev->type) { + case EVENT_CREATE_SNAPSHOT_ASYNC: + printf("[EVENT] Type: CREATE_SNAPSHOT_ASYNC\n"); + printf("[EVENT] Slave IP: %s, Port: %u\n", ev->data.sync.ip, ev->data.sync.port); + + if (current_state == NOSLAVE) { + current_state = START; + strncpy(slave_ip, ev->data.sync.ip, sizeof(slave_ip)); + slave_port = ev->data.sync.port; + queue_init(&pending); + slave_sock = connect_slave(); // 连接 Slave + if (slave_sock < 0) { + printf("Failed to connect to Slave %s:%d\n", slave_ip, slave_port); + current_state = NOSLAVE; + } + } + break; + case EVENT_COMPLETED_CMD: + printf("[EVENT] Type: COMPLETED_CMD\n"); + printf("[EVENT] Command length: %llu bytes\n", ev->data.cmd.len); + + if (current_state != NOSLAVE) { + queue_push(&pending, ev->data.cmd.cmd, ev->data.cmd.len); + } + break; + case EVENT_CREATE_SNAPSHOT_OK: + printf("[EVENT] Type: CREATE_SNAPSHOT_OK\n"); + printf("[EVENT] Array file: %s\n", ev->data.ok.array_file); + printf("[EVENT] RBTree file: %s\n", ev->data.ok.rbtree_file); + printf("[EVENT] Hash file: %s\n", ev->data.ok.hash_file); + + if (current_state == START) { + current_state = DONE; + strncpy(array_file, ev->data.ok.array_file, sizeof(array_file)); + strncpy(rbtree_file, ev->data.ok.rbtree_file, sizeof(rbtree_file)); + strncpy(hash_file, ev->data.ok.hash_file, sizeof(hash_file)); + } + break; + } +} + + +static void lost_event(void *ctx, int cpu, __u64 cnt) { + printf("Lost %llu events\n", cnt); +} + +static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args) +{ + return vfprintf(stderr, format, args); +} + +int main(int argc, char **argv) +{ + struct replica_bpf *skel; + int err; + + /* Set up libbpf errors and debug info callback */ + libbpf_set_print(libbpf_print_fn); + + /* Open BPF application */ + skel = replica_bpf__open(); + if (!skel) { + fprintf(stderr, "Failed to open BPF skeleton\n"); + return 1; + } + + /* ensure BPF program only handles write() syscalls from our process */ + skel->bss->my_pid = getpid(); + + /* Load & verify BPF programs */ + err = replica_bpf__load(skel); + if (err) { + fprintf(stderr, "Failed to load and verify BPF skeleton\n"); + goto cleanup; + } + + /* Attach tracepoint handler */ + err = replica_bpf__attach(skel); + if (err) { + fprintf(stderr, "Failed to attach BPF skeleton\n"); + goto cleanup; + } + + printf("Successfully started! Please run `sudo cat /sys/kernel/debug/tracing/trace_pipe` " + "to see output of the BPF programs.\n"); + + + struct perf_buffer *pb = perf_buffer__new(bpf_map__fd(skel->maps.channel), 8, handle_event, lost_event, NULL, NULL); + if(!pb){ + goto cleanup; + } + +#if 0 + while(1){ + perf_buffer__poll(pb, 1000); + } +#else + while (1) { + perf_buffer__poll(pb, 1000); // 处理事件 + + // 循环中检查状态并发送 + if (current_state == DONE && slave_sock >= 0) { + // 发送快照文件 + if (send_file(slave_sock, array_file) == 0 && + send_file(slave_sock, rbtree_file) == 0 && + send_file(slave_sock, hash_file) == 0) { + current_state = ONLINE; + printf("Snapshot sent, state to ONLINE\n"); + } else { + printf("Failed to send snapshot\n"); + current_state = NOSLAVE; + close(slave_sock); + slave_sock = -1; + } + } + + if (current_state == ONLINE && slave_sock >= 0) { + // 发送 pending + queue_send_and_clear(&pending, slave_sock); + } + } +#endif + + perf_buffer__free(pb); + +cleanup: + queue_free(&pending); + if (slave_sock >= 0) close(slave_sock); + replica_bpf__destroy(skel); + return -err; +} diff --git a/ebpf/c/replica.h b/ebpf/c/replica.h new file mode 100644 index 0000000..9cf3c1c --- /dev/null +++ b/ebpf/c/replica.h @@ -0,0 +1,36 @@ +#ifndef __REPLICA_H__ +#define __REPLICA_H__ + +enum event_type { + EVENT_CREATE_SNAPSHOT_ASYNC, + EVENT_CREATE_SNAPSHOT_OK, + EVENT_CREATE_SNAPSHOT_READY, + EVENT_COMPLETED_CMD +}; + +struct event { + enum event_type type; + union { + struct { + char ip[16]; + __u32 port; + } sync; + struct { + __u8 cmd[256]; + __u64 len; + } cmd; + struct { + char array_file[128]; + char rbtree_file[128]; + char hash_file[128]; + } ok; + } data; +}; + +enum state { + NOSLAVE, + PREPARING, + ONLINE +}; + +#endif \ No newline at end of file diff --git a/ebpf/c/xmake.lua b/ebpf/c/xmake.lua new file mode 100644 index 0000000..c4a5f3d --- /dev/null +++ b/ebpf/c/xmake.lua @@ -0,0 +1,126 @@ +add_rules("mode.release", "mode.debug") +add_rules("platform.linux.bpf") +set_license("GPL-2.0") + +if xmake.version():satisfies(">=2.5.7 <=2.5.9") then + on_load(function (target) + raise("xmake(%s) has a bug preventing BPF source code compilation. Please run `xmake update -f 2.5.6` to revert to v2.5.6 version or upgrade to xmake v2.6.1 that fixed the issue.", xmake.version()) + end) +end + +option("system-libbpf", {showmenu = true, default = false, description = "Use system-installed libbpf"}) +option("require-bpftool", {showmenu = true, default = false, description = "Require bpftool package"}) + +add_requires("elfutils", "zlib") +if is_plat("android") then + add_requires("ndk >=22.x <26", "argp-standalone") + set_toolchains("@ndk", {sdkver = "23"}) +else + add_requires("llvm >=10.x") + set_toolchains("@llvm") + add_requires("linux-headers") +end + +-- fix error: libbpf: map 'my_pid_map': unsupported map linkage static. for bpftool >= 7.2.0 +-- we cannot add `"-fvisibility=hidden"` when compiling *.bpf.c +set_symbols("none") + +if is_arch("arm64", "arm64-v8a") then + add_includedirs("../../vmlinux.h/include/arm64") +elseif is_arch("arm.*") then + add_includedirs("../../vmlinux.h/include/arm") +elseif is_arch("riscv32", "riscv64") then + add_includedirs("../../vmlinux.h/include/riscv") +elseif is_arch("loongarch") then + add_includedirs("../../vmlinux.h/include/loongarch") +elseif is_arch("ppc", "powerpc") then + add_includedirs("../../vmlinux.h/include/powerpc") +elseif is_arch("x86_64", "i386") then + add_includedirs("../../vmlinux.h/include/x86") +else + add_includedirs("../../vmlinux.h/include") +end + +-- we can run `xmake f --require-bpftool=y` to pull bpftool from xmake-repo repository +if has_config("require-bpftool") then + add_requires("linux-tools", {configs = {bpftool = true}}) + add_packages("linux-tools") +else + before_build(function (target) + os.addenv("PATH", path.join(os.scriptdir(), "..", "..", "tools")) + end) +end + +-- we use the vendored libbpf sources for libbpf-bootstrap. +-- for some projects you may want to use the system-installed libbpf, so you can run `xmake f --system-libbpf=y` +if has_config("system-libbpf") then + add_requires("libbpf", {system = true}) +else + target("libbpf") + set_kind("static") + set_basename("bpf") + add_files("../../libbpf/src/*.c") + add_includedirs("../../libbpf/include") + add_includedirs("../../libbpf/include/uapi", {public = true}) + add_includedirs("$(buildir)", {interface = true}) + add_configfiles("../../libbpf/src/(*.h)", {prefixdir = "bpf"}) + add_packages("elfutils", "zlib") + if is_plat("android") then + add_defines("__user=", "__force=", "__poll_t=uint32_t") + end +end + +target("minimal") + set_kind("binary") + add_files("minimal.c", "minimal.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("minimal_legacy") + set_kind("binary") + add_files("minimal_legacy.c", "minimal_legacy.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("bootstrap") + set_kind("binary") + add_files("bootstrap.c", "bootstrap.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + if is_plat("android") then + add_packages("argp-standalone") + end + +target("fentry") + set_kind("binary") + add_files("fentry.c", "fentry.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("uprobe") + set_kind("binary") + add_files("uprobe.c", "uprobe.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + +target("kprobe") + set_kind("binary") + add_files("kprobe.c", "kprobe.bpf.c") + add_packages("linux-headers") + if not has_config("system-libbpf") then + add_deps("libbpf") + end + if is_plat("android") then + -- TODO we need fix vmlinux.h to support android + set_default(false) + end diff --git a/init.sh b/init.sh new file mode 100755 index 0000000..a0732c0 --- /dev/null +++ b/init.sh @@ -0,0 +1,7 @@ +rm -rf libbpf-bootstrap/examples/c + +cp -R ebpf/c libbpf-bootstrap/examples + +cd libbpf-bootstrap/examples/c + +make \ No newline at end of file diff --git a/kvs_array_bin.c b/kvs_array_bin.c index 2ea2269..4a1046d 100644 --- a/kvs_array_bin.c +++ b/kvs_array_bin.c @@ -191,8 +191,8 @@ int kvs_array_exist_bin(kvs_array_t *inst, const void *key, uint32_t key_len) { } // return: 0 success, <0 error -int kvs_array_save(kvs_array_t *inst, const char* filename){ - if(!inst || !filename) return -1; +int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename){ + if(!uring || !inst || !filename) return -1; int fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); if(fd < 0) return -2; @@ -234,7 +234,7 @@ int kvs_array_save(kvs_array_t *inst, const char* filename){ size_t total = 0; for (int i = 0; i < count; i++) total += lens[i]; - task_t *t = submit_write(&global_uring_ctx, fd, bufs, lens, count, current_off); + task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); if (!t) { close(fd); return -4; } int res = task_wait(t); diff --git a/kvs_hash b/kvs_hash deleted file mode 100755 index 425a61f11d70259b07301d03a7675d695331bcbc..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16616 zcmeHOeQ;dWb-ydcSQud~5Wtv#HxBj~Vyz{Ajh)nG{jv5sk&R=dWJu#aR=ZE~hW$`> z-(s18pdz>NMmTZEq$LdLByAZyDZ@}gQh}5y3Q@)Uf@X zd(U}#`&flclbQC9y_%Xmp?z!jQ``*28-@P;1vArP>P?8$eA1W$^E;5`53s!98 z3`j(6P#5C=5_PpY59Rl0o{>ikfz+h2hHBcx{6yO(i7 zXUHKWp%PDu30)i5A3tp)5RyuFz&k{ZASU&A;VTe zrM`~=M{)UWlXUZ5u5Ye>=;!zg~HPgu)cNu+HfHkUaegq zze2cZOzqg+rC?^5cpOI6Fks4Jk7VyI{8NF-$;l2 zCJgCNB6~7Ks7(20T%=>-@h8X`9?|FQXEQ|SO6NmxaSb8Ovs!$rrxw2eeh8O8Ed-;M z{H17r_tV@~2C|sAcCb!4LVBQ4Zr;T7$0&PClFHJE(f{woQe!omH85 zI-N@>%fg^YSOq7ZcdSf2l?5l!AGdl^*?2m2o2@c-reHg&FQ3XfJp@i<`V?Vk+^knS zI(BZ^YOM~hS?gw3hu0}}Y3J?URKcXX;~u`5Z$>ce;qxO$3r>0X5(^THdiV>BiTeL~1nLo}N1z^odIahbs7K)c zBm!@=T>b0V;B-@LXx_i9QY!YR!%jnGG&cDCrl+(&m35y6yRs~e|1C=*iuCuBWb&0t zrP6a$muZ1D`TT5|7J8H4n=R8qYx2?AGA;Nf|7x~O3$Dp~X3MnDn!L*`SC;KY?L93^ zu!*4hD$9O~<-521@4oVjzVfra@=0I$gs=SVs&f0!c9x#|<5=mH*x;F&J)P0^;r7$9 zBb(M@eOI~k%TPVr6K+{@2u>q_M>ahUPVC6MM~Q4KJC`D~w^AOVJ-;&EvSfhto?;69 zlGa~$7m;hqGnbO9GD-7!%ro{0_KF`lhY&7b3~>V4@J+#5~H!9=tL|q+VXAHa$-0TEe}S|1^kjj!^P)^qUEpt z@{Tj5(S1+3wG2+Tp{B7|sW|$?BG{Z5gqN`r4C;y(9H)vVgBV$;1!^M`OG#mHI6(Q2 z8|4`&&!{3R{{#q8e)BCTAK4Q?dFBL}B9tN&TYG^NO5ckO?HVJUSm{~Pc$m1v7@fZTON^$Fzy9ACnt`KL z-bQ3l#Y$bLU}8);3qeMeGauv>jTIWvlh2?DFg1EYw;#sdLY1ZHG@Gy4npOzmJ#by4 zvC@m=!U?#bV@h|xsoRVKH&grn43tk%MGQDcEi(bB5w##Rsn3b}#S|mRC`JfIvl&-o zr9I8>?6MGoFoBshjwVl-mXFh1rhc2sz`(fM?DKT9&w_{XhE6=|@5J&{o32R=v@r{C#Ujn1K78FhPwcIvP++bg|@@4NKMasYh2vO>E@H(ru> zyW@u#FQ`%9GpNfZzkq+N)j0Su(Gu zT$L@ZN;|9kz^JTtRa#w@g{<;TqjHt2vdC4LT4vn+oKbliYqwa5oP3@I1#(g%uO(-+ z&WLXQ#4Z1&+wdQnN6j2`EgWSFr_HP%icV|zcf9bG2EN+`{SkxCkw5aN(YX7(xU+BS zYAHn;XAW=6FF*7|_e~jMOGyyT&yBS9rSOz9gWVyr9nm#668(rv;B*U%M-6 zFzcxvfqDe$5%_-)0s39&qp57K>a^pD{dT@xZHwpg_|1#UtJSV}{uU{%Q6Enwa#C8W zcBc}qY@M*%Lmvt)D=g4v+f^o)v_oP%l(!v{CKrhEYF19#Y1^r(812W;Vzn%&z?F?3 z9zYCRa3z0UsXPXH2J|J+v!G{*KU1mP54!91N~IZ(;ctPi1TFufQt1HwY;Q^x7;yU(Pr7{4j76!L3T=bEa3l205sOuMRyl%}kONmYR zPf-SV`OS`s#CLF&f$t**C#vgiTpQ8e9^yq82LG&K>xJhv?1M+fmxsW<6Y?nhbLEYH z5FiD@)28Te5T9;PSKe@wi=PJjr;tBfC4az`H^Z)jxX$z1Z#d`5SAu^N;(c!w{)?`> z1M=q}U+u*=UhDdo2LC7MC;F||^{+$2wf}cRJ_h+v75*+4{}A|F(T@=?{=h8$8ORfm z|7R8cWsN6DJ=G&nk3c;F^$64>P>(=80`&;gBk+5R0DrtF-_y|&loEZjN2%Em_XEBQ zxqS1LCUXUs<-0v*EBOX(G0WxqJzDQm68W2NS8`N-gg=VI8!erJoWc7)oyJ%$->~U7 zU69B(d5c(2uhBVAgkkD1e+P%HlTOE&(!Pe0d`ow^DXKB9SoriJl9Kp6tV>GCw}Mjt z)yx+?yi(NdLtHfrFUxg#g4>6e-y+vYY?E~P6wC3dUYDgEZ2tE~SFt6SEID{T4w)d zi!^@Uqh7E0wWyy0klf9I46(u`ptba=9~)Fq?cwiCSzU#gKj7o9VZO{G(cg(oKrL3m zh~dcm>eBqnl*|hwRqA$Rs9yQ*RrrUs{-sLhneZP(rk4MYf=_nN=lQMo=HSiMUYbt< z?DAK902yDZW&b2}0_t+*A3y)3v6r~t*NdrFwEiM@9tyvV%*F8YGMeo$TE_M@NskNH9Dli= z?*>1JxV1$z-2L~jut)YoJRg^{0)`}P~7omS`OEgew{Eh{+M zlB3Tj&?$+CoNLgB8T@A?Tq#aH@YxEN^=_vh#Cx2MNGr=u#+|s*;oyTARff!|iz=2m zk5R>PP4?oThFh@d9W#X=HJHN|e0rk_>pzTPB?~#kBAcXx8arA*Yr3PQ#hD`6;LN{=zhuJM2ns8Q{imRvBSODVz@7#>$CIDK@Y3Dn8JCf zRFYYnx9n`Cd7;(GK^0CO%%TcIoxG9UYUc~7T(+uUL6*1EaRP9zFYTx>wN-_Y4fp0a zQ?L^%?AZM%(nNuFPLGqYy`QJX{v@nP$=EP+#2_S(s^Xaxj2SrkLWMCoGnjLHkN8~( zt;gw>g4>Bs{&SDA9%r%El{`^e4$vA{@N=E-BV9_e?lmQ))+6J^`_K8&?-Jf5&kf?sl*=Goq`gwW(4*%o8(;h?cvJVluf$!JkBW0!ivL8r+ zM@vq@%f3b^t-HL5&AF`4AV+%y!OOltsO+!Af6){AS>S2UA!XTj2yNq<{QiprN06bI z3SRa>LWfyV>i4(*FBrdt^<-Zq^q2=vG4tcU0UpIf?7PoTevZn1lg(&fB~SEzTwZ*L zaY6@3$&=*qEc8Lhy!fxPK&a@62`LMG#D{-`1w#7-$W(AbANS#9og;LUY>0h$G5drM zFY8pHvY!`y|NMEH@zVb)#F|Y1k&8n7_#Z$(@e}-aLq=2RAyah!`|&@9#EXx-->3?m zCZQ+E_z~8t$SlOg+dg;yBargc != 1) { *out_value = resp_error("ERR wrong number of arguments for 'save'"); return 0; } - int r = kvs_create_snapshot(global_array_file, global_rbtree_file, global_hash_file); + int r = kvs_create_snapshot(&global_uring_ctx, global_array_file, global_rbtree_file, global_hash_file); if(r == 0) ksv_clear_log(global_oplog_fd); if (r < 0) { *out_value = resp_error("ERR save failed"); return 0; } *out_value = resp_simple("OK"); return 0; } - case KVS_CMD_PSYNC: + case KVS_CMD_SSYNC: kvs_create_snapshot_async(cmd->argv[1].ptr, atoi(cmd->argv[2].ptr)); *out_value = resp_simple("OK"); return 0; + case KVS_CMD_SREADY: + *out_value = resp_simple("OK"); + return 0; default: break; } diff --git a/kvs_protocol_resp.h b/kvs_protocol_resp.h index 27a2141..b670737 100644 --- a/kvs_protocol_resp.h +++ b/kvs_protocol_resp.h @@ -37,7 +37,8 @@ typedef enum { KVS_CMD_HEXIST, KVS_CMD_SAVE, - KVS_CMD_PSYNC, + KVS_CMD_SSYNC, + KVS_CMD_SREADY, KVS_CMD_COUNT, }kvs_cmd_t; diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index b5934b7..9ee7a06 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -464,13 +464,13 @@ int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len) { return 0; } -static int kvs_rbtree_save_node(int fd, off_t *current_off, kvs_rbtree_t *inst, rbtree_node *node) { +static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off, kvs_rbtree_t *inst, rbtree_node *node) { if (!current_off || !inst || !node) return -1; if (node == inst->nil) return 0; int rc = 0; - rc = kvs_rbtree_save_node(fd, current_off, inst, node->left); + rc = kvs_rbtree_save_node(uring, fd, current_off, inst, node->left); if (rc < 0) return rc; uint32_t klen = htonl(node->key_len); @@ -503,7 +503,7 @@ static int kvs_rbtree_save_node(int fd, off_t *current_off, kvs_rbtree_t *inst, size_t total = 0; for (int i = 0; i < count; i++) total += lens[i]; - task_t *t = submit_write(&global_uring_ctx, fd, bufs, lens, count, *current_off); + task_t *t = submit_write(uring, fd, bufs, lens, count, *current_off); if (!t) { return -4; } @@ -516,22 +516,22 @@ static int kvs_rbtree_save_node(int fd, off_t *current_off, kvs_rbtree_t *inst, *current_off += (off_t) total; - rc = kvs_rbtree_save_node(fd, current_off, inst, node->right); + rc = kvs_rbtree_save_node(uring, fd, current_off, inst, node->right); if (rc < 0) return rc; return 0; } // 0 success, <0 error -int kvs_rbtree_save(kvs_rbtree_t *inst, const char* filename){ - if (!inst || !filename) return -1; +int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename){ + if (!uring || !inst || !filename) return -1; int fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); if(fd < 0) return -2; off_t current_off = 0; - int rc = kvs_rbtree_save_node(fd, ¤t_off, inst, inst->root); + int rc = kvs_rbtree_save_node(uring, fd, ¤t_off, inst, inst->root); close(fd); return rc; diff --git a/kvs_slave.c b/kvs_slave.c index 8cfa9d7..d0bdc9e 100644 --- a/kvs_slave.c +++ b/kvs_slave.c @@ -3,40 +3,56 @@ #include #include #include +#include +#include +#include +#include +#include -/* 创建并监听用于接收快照的 TCP 监听 socket,成功返回 listen fd,失败返回 -1 */ -static int create_listen_socket(const char *ip, int port){ +static int recv_exact(int sockfd, char *buf, size_t len) { + size_t received = 0; + while (received < len) { + ssize_t ret = recv(sockfd, buf + received, len - received, 0); + if (ret <= 0) { + if (ret == 0) { + fprintf(stderr, "connection closed by peer\n"); + } else { + perror("recv"); + } + return -1; + } + received += ret; + } + return 0; } -/* 主动连接 master 指定地址,用于控制面通信(SSYNC / SREADY),返回连接 fd 或 -1 */ -static int connect_master(const char *master_ip, int master_port){ - -} - -/* 通过控制连接向 master 发送 SSYNC 请求,声明本 slave 的快照接收地址 */ -static int send_ssync(int ctrl_fd, const char *listen_ip, int listen_port){ - -} - -/* 接收并校验 master 对 SSYNC 的确认响应(如 +OK),成功返回 0 */ -static int recv_ssync_ok(int ctrl_fd){ - -} - -/* 在快照监听 socket 上阻塞等待 master 的快照发送连接,返回已建立连接的 fd */ -static int accept_snapshot_conn(int listen_fd){ - -} - -/* 从快照连接中接收完整快照数据并构建内存状态,确保快照已完全应用 */ -static int recv_and_apply_snapshot(int snapshot_fd){ - -} - -/* 通过控制连接向 master 发送 SREADY 通知,表示快照已应用,slave 即将进入服务态 */ -int send_sready(int ctrl_fd){ +static int receive_snapshot_file(int sockfd, const char *filename) { + FILE *fp = fopen(filename, "wb"); + if (!fp) { + perror("fopen"); + return -1; + } + char buf[4096]; + ssize_t n; + + while ((n = recv(sockfd, buf, sizeof(buf), 0)) > 0) { + if (fwrite(buf, 1, n, fp) != n) { + perror("fwrite"); + fclose(fp); + return -1; + } + } + + if (n < 0) { + perror("recv"); + fclose(fp); + return -1; + } + + fclose(fp); + return 0; } @@ -47,64 +63,155 @@ int slave_bootstrap( int master_port ) { int listen_fd = -1; - int ctrl_fd = -1; - int snap_fd = -1; + int master_fd = -1; + int snapshot_fds[3] = {-1, -1, -1}; + int ret = -1; - /* 1. 监听 snapshot port */ - listen_fd = create_listen_socket(listen_ip, listen_port); + // 1. 创建监听socket + listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { - goto fail; + perror("socket"); + goto cleanup; } - /* 2. 连接 master, 发送 SSYNC */ - ctrl_fd = connect_master(master_ip, master_port); - if (ctrl_fd < 0) { - goto fail; + // 设置SO_REUSEADDR + int opt = 1; + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + perror("setsockopt"); + goto cleanup; } - if (send_ssync(ctrl_fd, listen_ip, listen_port) < 0) { - goto fail; + struct sockaddr_in listen_addr; + memset(&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_port = htons(listen_port); + listen_addr.sin_addr.s_addr = inet_addr(listen_ip); + + if (bind(listen_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) < 0) { + perror("bind"); + goto cleanup; } - if (recv_ssync_ok(ctrl_fd) < 0) { - goto fail; + if (listen(listen_fd, 5) < 0) { + perror("listen"); + goto cleanup; } - close(ctrl_fd); - ctrl_fd = -1; + printf("Slave listening on %s:%d\n", listen_ip, listen_port); - /* 3. accept snapshot 连接 */ - snap_fd = accept_snapshot_conn(listen_fd); - if (snap_fd < 0) { - goto fail; + + // 2. 连接master并发送SSYNC命令 + master_fd = socket(AF_INET, SOCK_STREAM, 0); + if (master_fd < 0) { + perror("socket"); + goto cleanup; } - /* 4. 接收 snapshot */ - if (recv_and_apply_snapshot(snap_fd) < 0) { - goto fail; + struct sockaddr_in master_addr; + memset(&master_addr, 0, sizeof(master_addr)); + master_addr.sin_family = AF_INET; + master_addr.sin_port = htons(master_port); + master_addr.sin_addr.s_addr = inet_addr(master_ip); + + if (connect(master_fd, (struct sockaddr*)&master_addr, sizeof(master_addr)) < 0) { + perror("connect to master"); + goto cleanup; } - close(snap_fd); - snap_fd = -1; + printf("Connected to master %s:%d\n", master_ip, master_port); + + // 构造RESP协议消息: SSYNC listen_ip listen_port + char ssync_cmd[256]; + int cmd_len = snprintf(ssync_cmd, sizeof(ssync_cmd), + "*3\r\n$5\r\nSSYNC\r\n$%zu\r\n%s\r\n$%d\r\n%d\r\n", + strlen(listen_ip), listen_ip, + (int)snprintf(NULL, 0, "%d", listen_port), listen_port); + + if (send(master_fd, ssync_cmd, cmd_len, 0) < 0) { + perror("send SSYNC"); + goto cleanup; + } + + printf("Sent SSYNC command to master\n"); + + // 3. 接收master的+OK\r\n回包 + char resp[32]; + if (recv_exact(master_fd, resp, 5) < 0) { + fprintf(stderr, "Failed to receive OK from master\n"); + goto cleanup; + } + + if (memcmp(resp, "+OK\r\n", 5) != 0) { + fprintf(stderr, "Unexpected response from master: %.5s\n", resp); + goto cleanup; + } + + printf("Received OK from master\n"); + // 4. Accept来自master的三个连接,接收三个snapshot文件 + const char *snapshot_files[3] = { + "data/kvs_array.db", + "data/kvs_rbtree.db", + "data/kvs_hash.db" + }; + + for (int i = 0; i < 3; i++) { + struct sockaddr_in client_addr; + socklen_t addr_len = sizeof(client_addr); + + snapshot_fds[i] = accept(listen_fd, (struct sockaddr*)&client_addr, &addr_len); + if (snapshot_fds[i] < 0) { + perror("accept"); + goto cleanup; + } + + printf("Accepted connection %d from %s:%d\n", + i + 1, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); + + if (receive_snapshot_file(snapshot_fds[i], snapshot_files[i]) < 0) { + fprintf(stderr, "Failed to receive snapshot file %d\n", i + 1); + goto cleanup; + } + + printf("Received snapshot file: %s\n", snapshot_files[i]); + close(snapshot_fds[i]); + snapshot_fds[i] = -1; + } + + // 5. 关闭监听和snapshot连接,发送SREADY close(listen_fd); listen_fd = -1; - /* 5. 通知 master 快照传输完毕 */ - ctrl_fd = connect_master(master_ip, master_port); - if (ctrl_fd >= 0) { - send_sready(ctrl_fd); - close(ctrl_fd); - ctrl_fd = -1; + const char *sready_cmd = "*1\r\n$6\r\nSREADY\r\n"; + if (send(master_fd, sready_cmd, strlen(sready_cmd), 0) < 0) { + perror("send SREADY"); + goto cleanup; } - /* 6. bootstrap complete */ - return 0; + printf("Sent SREADY to master\n"); + + // 6. 接收回包+OK\r\n + if (recv_exact(master_fd, resp, 5) < 0) { + fprintf(stderr, "Failed to receive final OK from master\n"); + goto cleanup; + } -fail: - if (snap_fd >= 0) close(snap_fd); - if (ctrl_fd >= 0) close(ctrl_fd); + if (memcmp(resp, "+OK\r\n", 5) != 0) { + fprintf(stderr, "Unexpected final response from master: %.5s\n", resp); + goto cleanup; + } + + printf("Received final OK from master, bootstrap complete\n"); + + ret = 0; + +cleanup: + if (master_fd >= 0) close(master_fd); if (listen_fd >= 0) close(listen_fd); - return -1; + for (int i = 0; i < 3; i++) { + if (snapshot_fds[i] >= 0) close(snapshot_fds[i]); + } + + return ret; } diff --git a/kvstore.c b/kvstore.c index e40ec21..a426294 100644 --- a/kvstore.c +++ b/kvstore.c @@ -26,11 +26,13 @@ extern mp_pool_t global_mempool; #endif AppConfig global_cfg; +iouring_ctx_t global_uring_ctx; extern int global_oplog_fd; -extern iouring_ctx_t global_uring_ctx; +void __completed_cmd(const uint8_t *cmd, size_t len){ +} int kvs_protocol(struct conn* conn){ if (!conn) return -1; @@ -67,6 +69,9 @@ int kvs_protocol(struct conn* conn){ int dr = resp_dispatch(&cmd, &val); + __completed_cmd(p, len); + + /* * 语义建议: * - resp_dispatch() 即使返回 -1(比如 unknown command / wrong argc), diff --git a/kvstore.h b/kvstore.h index 8f1e736..704c457 100644 --- a/kvstore.h +++ b/kvstore.h @@ -4,7 +4,7 @@ #ifndef __KV_STORE_H__ #define __KV_STORE_H__ - +#include "diskuring/diskuring.h" #include #include #include @@ -82,7 +82,7 @@ int kvs_array_mod_bin(kvs_array_t *inst, int kvs_array_exist_bin(kvs_array_t *inst, const void *key, uint32_t key_len); -int kvs_array_save(kvs_array_t *inst, const char* filename); +int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename); int kvs_array_load(kvs_array_t *inst, const char* filename); #else @@ -146,7 +146,7 @@ int kvs_rbtree_del(rbtree *inst, const void *key, uint32_t key_len); int kvs_rbtree_mod(kvs_rbtree_t *inst, const void *key, uint32_t key_len, const void *value, uint32_t value_len); int kvs_rbtree_exist(kvs_rbtree_t *inst, const void *key, uint32_t key_len); -int kvs_rbtree_save(kvs_rbtree_t *inst, const char* filename); +int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filename); int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename); #else #define ENABLE_KEY_CHAR 1 @@ -220,7 +220,7 @@ int kvs_hash_del_bin(kvs_hash_t *h, const void *key, uint32_t key_len); int kvs_hash_exist_bin(kvs_hash_t *h, const void *key, uint32_t key_len); int kvs_hash_count(kvs_hash_t *h); -int kvs_hash_save(kvs_hash_t *inst, const char* filename); +int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename); int kvs_hash_load(kvs_hash_t *inst, const char* filename); #else @@ -279,7 +279,7 @@ extern kvs_rbtree_t global_rbtree; extern kvs_hash_t global_hash; #endif -void __compeleted_cmd(const uint8_t *cmd, size_t len); +void __completed_cmd(const uint8_t *cmd, size_t len); #endif diff --git a/libbpf-bootstrap b/libbpf-bootstrap new file mode 160000 index 0000000..b2b9960 --- /dev/null +++ b/libbpf-bootstrap @@ -0,0 +1 @@ +Subproject commit b2b9960d32eb0e99af8e90ceae4b46b96093acb1 diff --git a/reactor.c b/reactor.c index a9a3cac..1c4e490 100644 --- a/reactor.c +++ b/reactor.c @@ -104,7 +104,7 @@ int event_register(int fd, int event) { memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH*2); conn_list[fd].wlength = 0; - conn_list[fd].is_from_master = 0; + conn_list[fd].is_stop = 0; set_event(fd, event, 1); } @@ -147,24 +147,24 @@ int recv_cb(int fd) { // printf("avail: %d\n", avail); if (avail <= 0) { // 缓冲满了还没解析出来:协议异常或包过大 - close(fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + close(fd); return 0; } int count = recv(fd, c->rbuffer + c->rlength, avail, 0); if (count == 0) { // disconnect //printf("client disconnect: %d\n", fd); - close(fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished + close(fd); return 0; } else if (count < 0) { // printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno)); - close(fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + close(fd); return 0; } @@ -191,8 +191,8 @@ int recv_cb(int fd) { #elif ENABLE_KVSTORE int consumed = kvs_request(c); if(consumed < 0){ - close(fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + close(fd); return 0; } @@ -274,8 +274,8 @@ int send_cb(int fd) { pthread_mutex_unlock(&c->g_sync_lock); printf("send fd=%d errno=%d %s\n", fd, e, strerror(e)); - close(fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + close(fd); return 0; } @@ -372,7 +372,7 @@ int reactor_start(unsigned short port, msg_handler handler) { conn_list[sockfd].fd = sockfd; conn_list[sockfd].r_action.recv_callback = accept_cb; - conn_list[sockfd].is_from_master = 0; + conn_list[sockfd].is_stop = 0; set_event(sockfd, EPOLLIN, 1); } diff --git a/server.h b/server.h index 25f5b47..1025736 100644 --- a/server.h +++ b/server.h @@ -33,7 +33,7 @@ struct conn { RCALLBACK accept_callback; } r_action; - int is_from_master; + int is_stop; pthread_mutex_t g_sync_lock; diff --git a/test/test_client.h b/test/test_client.h index 2e4a311..7325862 100644 --- a/test/test_client.h +++ b/test/test_client.h @@ -52,7 +52,7 @@ enum { KVS_CMD_HMOD, KVS_CMD_HEXIST, - KVS_CMD_PSYNC, + KVS_CMD_SSYNC, KVS_CMD_SAVE, KVS_CMD_COUNT,