// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) /* Copyright (c) 2020 Facebook */ #include #include #include #include #include #include #include "replica.skel.h" #include #include #include #include #include #include #include #include #include #include "replica_shm.h" #include "replica.h" #define DEBUGLOG(...) fprintf(stderr, __VA_ARGS__) /* ============================================================ */ #define REPLICA_SHM_MAGIC 0x52504C43u /* 'RPLC' */ #define REPLICA_SHM_VER 1 static inline uint64_t align8_u64(uint64_t x) { return (x + 7u) & ~7ull; } int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create) { if (!s || !name || total_size < (sizeof(replica_shm_hdr_t) + 4096)) return -EINVAL; memset(s, 0, sizeof(*s)); int flags = O_RDWR; if (create) flags |= O_CREAT; int fd = shm_open(name, flags, 0666); if (fd < 0) return -errno; if (create) { if (ftruncate(fd, (off_t)total_size) != 0) { int e = -errno; close(fd); return e; } } void *p = mmap(NULL, total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (p == MAP_FAILED) { int e = -errno; close(fd); return e; } s->fd = fd; s->map_size = total_size; s->hdr = (replica_shm_hdr_t *)p; s->data = (uint8_t *)p + sizeof(replica_shm_hdr_t); // 初始化头 if (create || s->hdr->magic != REPLICA_SHM_MAGIC) { memset(s->hdr, 0, sizeof(*s->hdr)); s->hdr->magic = REPLICA_SHM_MAGIC; s->hdr->version = REPLICA_SHM_VER; s->hdr->capacity = total_size - sizeof(replica_shm_hdr_t); s->hdr->write_off = 0; s->hdr->last_seq = 0; } return 0; } int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr) { if (!s || !s->hdr || !s->data || !out_hdr) return -EINVAL; if ((uint64_t)off + sizeof(replica_rec_hdr_t) > s->hdr->capacity) return -EINVAL; memcpy(out_hdr, s->data + off, sizeof(*out_hdr)); return 0; } void replica_shm_close(replica_shm_t *s) { if (!s) return; if (s->hdr && s->map_size) munmap(s->hdr, s->map_size); if (s->fd > 0) close(s->fd); memset(s, 0, sizeof(*s)); } /* ================================================================================*/ /* ================= 全局状态 ================= */ #define DEBUGLOG(...) fprintf(stderr, __VA_ARGS__) static replica_shm_t g_shm; static int g_sockfd = -1; static char peer_ip[MAX_IP_LEN]; static int peer_port = 0; static uint64_t SYNC_SEQ = 0; static uint64_t local_seq = 0; static uint32_t read_off = 0; static pthread_t reader_thread; static pthread_t sender_thread; static volatile int should_stop = 0; /* ================= ================= */ struct send_node { uint8_t *data; uint32_t len; uint32_t sent; struct send_node *next; }; static struct { struct send_node *head; struct send_node *tail; int count; pthread_mutex_t lock; pthread_cond_t not_empty; } sendq = { .lock = PTHREAD_MUTEX_INITIALIZER, .not_empty = PTHREAD_COND_INITIALIZER }; static void sendq_free_all(void) { pthread_mutex_lock(&sendq.lock); struct send_node *c = sendq.head; while (c) { struct send_node *n = c->next; free(c->data); free(c); c = n; } sendq.head = sendq.tail = NULL; sendq.count = 0; pthread_mutex_unlock(&sendq.lock); } static void sendq_push(uint8_t *data, uint32_t len) { struct send_node *n = (struct send_node *)malloc(sizeof(*n)); if (!n) { free(data); return; } n->data = data; n->len = len; n->sent = 0; n->next = NULL; pthread_mutex_lock(&sendq.lock); if (!sendq.tail) { sendq.head = sendq.tail = n; } else { sendq.tail->next = n; sendq.tail = n; } sendq.count++; pthread_cond_signal(&sendq.not_empty); pthread_mutex_unlock(&sendq.lock); } static void sendq_pop(void) { if (!sendq.head) return; struct send_node *n = sendq.head; sendq.head = n->next; if (!sendq.head) sendq.tail = NULL; free(n->data); free(n); sendq.count--; } /* ================= Reader 线程:读共享内存 ================= */ static void* reader_thread_func(void *arg) { (void)arg; DEBUGLOG("Reader thread started\n"); while (!should_stop) { replica_rec_hdr_t h; uint64_t last = __atomic_load_n(&g_shm.hdr->last_seq, __ATOMIC_ACQUIRE); if (local_seq > last) { // 没有新数据,短暂休眠避免空转 usleep(500); continue; } if (read_off+ sizeof(replica_rec_hdr_t) >= g_shm.hdr->capacity) { DEBUGLOG("Reader: read_off overflow, reset\n"); // read_off = 0; break; // continue; } if (replica_shm_peek(&g_shm, read_off, &h) != 0) { DEBUGLOG("Reader: peek failed at %u\n", read_off); break; // continue; } // 检测 wrap if (h.len == 0) { DEBUGLOG("Reader: wrap at offset %u\n", read_off); read_off = 0; continue; } // 跳过 SYNC_SEQ 之前的 if (h.seq < SYNC_SEQ) { uint64_t step = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)h.len); if (read_off + step > g_shm.hdr->capacity) { read_off = 0; } else { read_off += (uint32_t)step; } continue; } // 序列号检查 if (h.seq != local_seq) { // DEBUGLOG("Reader: seq mismatch! h.seq=%lu, local_seq=%lu, off=%u\n", h.seq, local_seq, read_off); continue; } // 读取数据 uint8_t *buf = (uint8_t *)malloc(h.len); if (!buf) { DEBUGLOG("Reader: malloc failed\n"); usleep(1000); continue; } memcpy(buf, g_shm.data + read_off + sizeof(replica_rec_hdr_t), h.len); sendq_push(buf, h.len); uint64_t step = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)h.len); if (read_off + step > g_shm.hdr->capacity) { read_off = 0; } else { read_off += (uint32_t)step; } local_seq++; } DEBUGLOG("Reader thread stopped\n"); return NULL; } /* ================= Sender 线程:发送数据 ================= */ static void* sender_thread_func(void *arg) { (void)arg; DEBUGLOG("Sender thread started\n"); int epfd = epoll_create1(0); if (epfd < 0) { perror("epoll_create1"); return NULL; } struct epoll_event ev; memset(&ev, 0, sizeof(ev)); ev.events = EPOLLIN | EPOLLOUT; ev.data.fd = g_sockfd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, g_sockfd, &ev) != 0) { perror("epoll_ctl ADD"); close(epfd); return NULL; } while (!should_stop && g_sockfd >= 0) { struct epoll_event events[4]; int nfds = epoll_wait(epfd, events, 4, 100); // 100ms timeout if (nfds < 0) { if (errno == EINTR) continue; perror("epoll_wait"); break; } for (int i = 0; i < nfds; i++) { if (events[i].data.fd != g_sockfd) continue; if (events[i].events & (EPOLLERR | EPOLLHUP)) { DEBUGLOG("Sender: EPOLLERR/EPOLLHUP\n"); close(g_sockfd); g_sockfd = -1; break; } if (events[i].events & EPOLLIN) { char buf[4096]; recv(g_sockfd, buf, sizeof(buf), 0); } if (events[i].events & EPOLLOUT) { pthread_mutex_lock(&sendq.lock); while (sendq.head) { struct send_node *n = sendq.head; pthread_mutex_unlock(&sendq.lock); int nbytes = send(g_sockfd, n->data + n->sent, (int)(n->len - n->sent), MSG_NOSIGNAL); pthread_mutex_lock(&sendq.lock); if (nbytes > 0) { n->sent += (uint32_t)nbytes; if (n->sent == n->len) { sendq_pop(); continue; } // partial send break; } if (nbytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; } DEBUGLOG("Sender: send error errno=%d\n", errno); pthread_mutex_unlock(&sendq.lock); close(g_sockfd); g_sockfd = -1; goto out; } // nbytes == 0 DEBUGLOG("Sender: send returned 0\n"); pthread_mutex_unlock(&sendq.lock); close(g_sockfd); g_sockfd = -1; goto out; } pthread_mutex_unlock(&sendq.lock); } } } out: close(epfd); DEBUGLOG("Sender thread stopped\n"); return NULL; } /* ================= 网络逻辑 ================= */ static int connect_peer(void) { if (peer_port <= 0 || peer_ip[0] == '\0') return -1; if (g_sockfd >= 0) { close(g_sockfd); g_sockfd = -1; } int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { perror("socket"); return -1; } struct sockaddr_in a; memset(&a, 0, sizeof(a)); a.sin_family = AF_INET; a.sin_port = htons(peer_port); if (inet_pton(AF_INET, peer_ip, &a.sin_addr) != 1) { DEBUGLOG("inet_pton failed for ip=%s\n", peer_ip); close(fd); return -1; } if (connect(fd, (struct sockaddr *)&a, sizeof(a)) != 0) { // 这里可以重试;按你的要求先简单返回失败 // perror("connect"); close(fd); return -1; } // non-blocking(配合 epoll) int flags = fcntl(fd, F_GETFL, 0); if (flags >= 0) fcntl(fd, F_SETFL, flags | O_NONBLOCK); g_sockfd = fd; DEBUGLOG("connect ok %s:%d\n", peer_ip, peer_port); return 0; } /* ================= perf buffer 回调 ================= */ static void handle_event(void *ctx, int cpu, void *data, __u32 size) { (void)ctx; (void)cpu; if (size < sizeof(struct replica_event)) return; struct replica_event *e = (struct replica_event*)data; if (e->type == EVENT_SSYNC) { memset(peer_ip, 0, sizeof(peer_ip)); memcpy(peer_ip, e->sync.ip, e->sync.ip_len); peer_port = e->sync.port; SYNC_SEQ = e->sync.seq; local_seq = SYNC_SEQ; read_off = 0; DEBUGLOG("SSYNC: peer=%s:%d SYNC_SEQ=%llu\n", peer_ip, peer_port, (unsigned long long)SYNC_SEQ); // 停止旧线程 should_stop = 1; if (reader_thread) { pthread_join(reader_thread, NULL); reader_thread = 0; } if (sender_thread) { pthread_join(sender_thread, NULL); sender_thread = 0; } if (g_sockfd >= 0) { close(g_sockfd); g_sockfd = -1; } sendq_free_all(); return; } if (e->type == EVENT_SREADY) { DEBUGLOG("SREADY\n"); if (connect_peer() != 0) { DEBUGLOG("connect_peer failed\n"); return; } // 启动双线程 should_stop = 0; if (pthread_create(&reader_thread, NULL, reader_thread_func, NULL) != 0) { perror("pthread_create reader"); return; } if (pthread_create(&sender_thread, NULL, sender_thread_func, NULL) != 0) { perror("pthread_create sender"); pthread_cancel(reader_thread); return; } DEBUGLOG("Reader and Sender threads started\n"); return; } } /* ================= main ================= */ int main(int argc, char **argv) { int rc = replica_shm_open(&g_shm, REPLICA_SHM_NAME, REPLICA_SHM_SIZE, 0); if (rc != 0) { fprintf(stderr, "replica_shm_open failed rc=%d (did you create it in kvstore?)\n", rc); return 1; } struct replica_bpf *skel; struct perf_buffer *pb = NULL; int err; /* Open BPF application */ skel = replica_bpf__open(); if (!skel) { fprintf(stderr, "Failed to open BPF skeleton\n"); return 1; } /* Load & verify BPF programs */ err = replica_bpf__load(skel); if (err) { fprintf(stderr, "Failed to load and verify BPF skeleton\n"); goto cleanup; } /* Attach tracepoint handler */ err = replica_bpf__attach(skel); if (err) { fprintf(stderr, "Failed to attach BPF skeleton\n"); goto cleanup; } printf("Successfully started! \n"); pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8, handle_event, NULL, NULL, NULL); if(!pb){ goto cleanup; } while (1) { perf_buffer__poll(pb, 1000); // 处理事件 } perf_buffer__free(pb); cleanup: should_stop = 1; if (reader_thread) pthread_join(reader_thread, NULL); if (sender_thread) pthread_join(sender_thread, NULL); if (g_sockfd >= 0) close(g_sockfd); replica_shm_close(&g_shm); sendq_free_all(); replica_bpf__destroy(skel); return -err; }