// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) /* Copyright (c) 2020 Facebook */ #include #include #include #include #include "replica.skel.h" #include #include #include #include #include "replica.h" struct cmd_node { uint8_t *cmd; size_t len; struct cmd_node *next; }; struct pending_queue { struct cmd_node *head; struct cmd_node *tail; int count; }; static void queue_init(struct pending_queue *q) { q->head = q->tail = NULL; q->count = 0; } static void queue_push(struct pending_queue *q, const uint8_t *cmd, size_t len) { struct cmd_node *node = malloc(sizeof(*node)); if (!node) return; node->cmd = malloc(len); if (!node->cmd) { free(node); return; } memcpy(node->cmd, cmd, len); node->len = len; node->next = NULL; if (q->tail) q->tail->next = node; else q->head = node; q->tail = node; q->count++; } static void queue_send_and_clear(struct pending_queue *q, int sock) { struct cmd_node *node = q->head; int sent_count = 0; while (node) { if (send(sock, node->cmd, node->len, 0) > 0) { sent_count++; } struct cmd_node *tmp = node; node = node->next; free(tmp->cmd); free(tmp); } if (sent_count > 0) { printf("[QUEUE] Sent %d commands to slave\n", sent_count); } queue_init(q); } static void queue_free(struct pending_queue *q) { struct cmd_node *node = q->head; while (node) { struct cmd_node *tmp = node; node = node->next; free(tmp->cmd); free(tmp); } queue_init(q); } static int send_file(int sock, const char *path) { FILE *fp = fopen(path, "rb"); if (!fp) { printf("[ERROR] Failed to open file: %s\n", path); return -1; } char buf[4096]; size_t n, total = 0; while ((n = fread(buf, 1, sizeof(buf), fp)) > 0) { if (send(sock, buf, n, 0) < 0) { fclose(fp); printf("[ERROR] Failed to send file: %s (sent %zu bytes)\n", path, total); return -1; } total += n; } fclose(fp); printf("[FILE] Sent %s (%zu bytes)\n", path, total); return 0; } // 全局状态(单 Slave 简化) static enum state current_state = NOSLAVE; static char slave_ip[16] = {0}; static int slave_port = 0; static char array_file[128] = {0}; static char rbtree_file[128] = {0}; static char hash_file[128] = {0}; static struct pending_queue pending; static int slave_sock = -1; // 连接 Slave 的 socket // 连接 Slave static int connect_slave() { int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) return -1; struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(slave_port); inet_pton(AF_INET, slave_ip, &addr.sin_addr); if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { close(sock); return -1; } return sock; } static void handle_event(void *ctx, int cpu, void *data, __u32 size) { struct event *ev = (struct event *)data; switch (ev->type) { case EVENT_CREATE_SNAPSHOT_ASYNC: printf("[EVENT] Type: CREATE_SNAPSHOT_ASYNC\n"); printf("[EVENT] Slave IP: %s, Port: %u\n", ev->data.sync.ip, ev->data.sync.port); if (current_state == NOSLAVE) { current_state = START; strncpy(slave_ip, ev->data.sync.ip, sizeof(slave_ip)); slave_port = ev->data.sync.port; queue_init(&pending); slave_sock = connect_slave(); // 连接 Slave if (slave_sock < 0) { printf("Failed to connect to Slave %s:%d\n", slave_ip, slave_port); current_state = NOSLAVE; } } break; case EVENT_COMPLETED_CMD: printf("[EVENT] Type: COMPLETED_CMD\n"); printf("[EVENT] Command length: %llu bytes\n", ev->data.cmd.len); if (current_state != NOSLAVE) { queue_push(&pending, ev->data.cmd.cmd, ev->data.cmd.len); } break; case EVENT_CREATE_SNAPSHOT_OK: printf("[EVENT] Type: CREATE_SNAPSHOT_OK\n"); printf("[EVENT] Array file: %s\n", ev->data.ok.array_file); printf("[EVENT] RBTree file: %s\n", ev->data.ok.rbtree_file); printf("[EVENT] Hash file: %s\n", ev->data.ok.hash_file); if (current_state == START) { current_state = DONE; strncpy(array_file, ev->data.ok.array_file, sizeof(array_file)); strncpy(rbtree_file, ev->data.ok.rbtree_file, sizeof(rbtree_file)); strncpy(hash_file, ev->data.ok.hash_file, sizeof(hash_file)); } break; } } static void lost_event(void *ctx, int cpu, __u64 cnt) { printf("Lost %llu events\n", cnt); } static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args) { return vfprintf(stderr, format, args); } int main(int argc, char **argv) { struct replica_bpf *skel; int err; /* Set up libbpf errors and debug info callback */ libbpf_set_print(libbpf_print_fn); /* Open BPF application */ skel = replica_bpf__open(); if (!skel) { fprintf(stderr, "Failed to open BPF skeleton\n"); return 1; } /* ensure BPF program only handles write() syscalls from our process */ skel->bss->my_pid = getpid(); /* Load & verify BPF programs */ err = replica_bpf__load(skel); if (err) { fprintf(stderr, "Failed to load and verify BPF skeleton\n"); goto cleanup; } /* Attach tracepoint handler */ err = replica_bpf__attach(skel); if (err) { fprintf(stderr, "Failed to attach BPF skeleton\n"); goto cleanup; } printf("Successfully started! Please run `sudo cat /sys/kernel/debug/tracing/trace_pipe` " "to see output of the BPF programs.\n"); struct perf_buffer *pb = perf_buffer__new(bpf_map__fd(skel->maps.channel), 8, handle_event, lost_event, NULL, NULL); if(!pb){ goto cleanup; } #if 0 while(1){ perf_buffer__poll(pb, 1000); } #else while (1) { perf_buffer__poll(pb, 1000); // 处理事件 // 循环中检查状态并发送 if (current_state == DONE && slave_sock >= 0) { // 发送快照文件 if (send_file(slave_sock, array_file) == 0 && send_file(slave_sock, rbtree_file) == 0 && send_file(slave_sock, hash_file) == 0) { current_state = ONLINE; printf("Snapshot sent, state to ONLINE\n"); } else { printf("Failed to send snapshot\n"); current_state = NOSLAVE; close(slave_sock); slave_sock = -1; } } if (current_state == ONLINE && slave_sock >= 0) { // 发送 pending queue_send_and_clear(&pending, slave_sock); } } #endif perf_buffer__free(pb); cleanup: queue_free(&pending); if (slave_sock >= 0) close(slave_sock); replica_bpf__destroy(skel); return -err; }