Files
ldb/ebpf/c/replica.c

534 lines
14 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
/* Copyright (c) 2020 Facebook */
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>
#include <bpf/libbpf.h>
#include "replica.skel.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include "replica_shm.h"
#include "replica.h"
#define DEBUGLOG(...) fprintf(stderr, __VA_ARGS__)
/* ============================================================ */
#define REPLICA_SHM_MAGIC 0x52504C43u /* 'RPLC' */
#define REPLICA_SHM_VER 1
static inline uint64_t align8_u64(uint64_t x) { return (x + 7u) & ~7ull; }
int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create)
{
if (!s || !name || total_size < (sizeof(replica_shm_hdr_t) + 4096)) return -EINVAL;
memset(s, 0, sizeof(*s));
int flags = O_RDWR;
if (create) flags |= O_CREAT;
int fd = shm_open(name, flags, 0666);
if (fd < 0) return -errno;
if (create) {
if (ftruncate(fd, (off_t)total_size) != 0) {
int e = -errno; close(fd); return e;
}
}
void *p = mmap(NULL, total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (p == MAP_FAILED) {
int e = -errno; close(fd); return e;
}
s->fd = fd;
s->map_size = total_size;
s->hdr = (replica_shm_hdr_t *)p;
s->data = (uint8_t *)p + sizeof(replica_shm_hdr_t);
// 初始化头
if (create || s->hdr->magic != REPLICA_SHM_MAGIC) {
memset(s->hdr, 0, sizeof(*s->hdr));
s->hdr->magic = REPLICA_SHM_MAGIC;
s->hdr->version = REPLICA_SHM_VER;
s->hdr->capacity = total_size - sizeof(replica_shm_hdr_t);
s->hdr->write_off = 0;
s->hdr->last_seq = 0;
}
return 0;
}
int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr)
{
if (!s || !s->hdr || !s->data || !out_hdr) return -EINVAL;
if ((uint64_t)off + sizeof(replica_rec_hdr_t) > s->hdr->capacity) return -EINVAL;
memcpy(out_hdr, s->data + off, sizeof(*out_hdr));
return 0;
}
void replica_shm_close(replica_shm_t *s)
{
if (!s) return;
if (s->hdr && s->map_size) munmap(s->hdr, s->map_size);
if (s->fd > 0) close(s->fd);
memset(s, 0, sizeof(*s));
}
/* ================================================================================*/
/* ================= 全局状态 ================= */
#define DEBUGLOG(...) fprintf(stderr, __VA_ARGS__)
static replica_shm_t g_shm;
static int g_sockfd = -1;
static char peer_ip[MAX_IP_LEN];
static int peer_port = 0;
static uint64_t SYNC_SEQ = 0;
static uint64_t local_seq = 0;
static uint32_t read_off = 0;
static pthread_t reader_thread;
static pthread_t sender_thread;
static volatile int should_stop = 0;
/* ================= ================= */
struct send_node {
uint8_t *data;
uint32_t len;
uint32_t sent;
struct send_node *next;
};
static struct {
struct send_node *head;
struct send_node *tail;
int count;
pthread_mutex_t lock;
pthread_cond_t not_empty;
} sendq = {
.lock = PTHREAD_MUTEX_INITIALIZER,
.not_empty = PTHREAD_COND_INITIALIZER
};
static void sendq_free_all(void)
{
pthread_mutex_lock(&sendq.lock);
struct send_node *c = sendq.head;
while (c) {
struct send_node *n = c->next;
free(c->data);
free(c);
c = n;
}
sendq.head = sendq.tail = NULL;
sendq.count = 0;
pthread_mutex_unlock(&sendq.lock);
}
static void sendq_push(uint8_t *data, uint32_t len)
{
struct send_node *n = (struct send_node *)malloc(sizeof(*n));
if (!n) {
free(data);
return;
}
n->data = data;
n->len = len;
n->sent = 0;
n->next = NULL;
pthread_mutex_lock(&sendq.lock);
if (!sendq.tail) {
sendq.head = sendq.tail = n;
} else {
sendq.tail->next = n;
sendq.tail = n;
}
sendq.count++;
pthread_cond_signal(&sendq.not_empty);
pthread_mutex_unlock(&sendq.lock);
}
static void sendq_pop(void)
{
if (!sendq.head) return;
struct send_node *n = sendq.head;
sendq.head = n->next;
if (!sendq.head) sendq.tail = NULL;
free(n->data);
free(n);
sendq.count--;
}
/* ================= Reader 线程:读共享内存 ================= */
static void* reader_thread_func(void *arg)
{
(void)arg;
DEBUGLOG("Reader thread started\n");
while (!should_stop) {
replica_rec_hdr_t h;
uint64_t last = __atomic_load_n(&g_shm.hdr->last_seq, __ATOMIC_ACQUIRE);
if (local_seq > last) {
// 没有新数据,短暂休眠避免空转
usleep(500);
continue;
}
if (read_off+ sizeof(replica_rec_hdr_t) >= g_shm.hdr->capacity) {
DEBUGLOG("Reader: read_off overflow, reset\n");
// read_off = 0;
break;
// continue;
}
if (replica_shm_peek(&g_shm, read_off, &h) != 0) {
DEBUGLOG("Reader: peek failed at %u\n", read_off);
break;
// continue;
}
// 检测 wrap
if (h.len == 0) {
DEBUGLOG("Reader: wrap at offset %u\n", read_off);
read_off = 0;
continue;
}
// 跳过 SYNC_SEQ 之前的
if (h.seq < SYNC_SEQ) {
uint64_t step = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)h.len);
if (read_off + step > g_shm.hdr->capacity) {
read_off = 0;
} else {
read_off += (uint32_t)step;
}
continue;
}
// 序列号检查
if (h.seq != local_seq) {
// DEBUGLOG("Reader: seq mismatch! h.seq=%lu, local_seq=%lu, off=%u\n", h.seq, local_seq, read_off);
continue;
}
// 读取数据
uint8_t *buf = (uint8_t *)malloc(h.len);
if (!buf) {
DEBUGLOG("Reader: malloc failed\n");
usleep(1000);
continue;
}
memcpy(buf, g_shm.data + read_off + sizeof(replica_rec_hdr_t), h.len);
sendq_push(buf, h.len);
uint64_t step = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)h.len);
if (read_off + step > g_shm.hdr->capacity) {
read_off = 0;
} else {
read_off += (uint32_t)step;
}
local_seq++;
}
DEBUGLOG("Reader thread stopped\n");
return NULL;
}
/* ================= Sender 线程:发送数据 ================= */
static void* sender_thread_func(void *arg)
{
(void)arg;
DEBUGLOG("Sender thread started\n");
int epfd = epoll_create1(0);
if (epfd < 0) {
perror("epoll_create1");
return NULL;
}
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT;
ev.data.fd = g_sockfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, g_sockfd, &ev) != 0) {
perror("epoll_ctl ADD");
close(epfd);
return NULL;
}
while (!should_stop && g_sockfd >= 0) {
struct epoll_event events[4];
int nfds = epoll_wait(epfd, events, 4, 100); // 100ms timeout
if (nfds < 0) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd != g_sockfd)
continue;
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
DEBUGLOG("Sender: EPOLLERR/EPOLLHUP\n");
close(g_sockfd);
g_sockfd = -1;
break;
}
if (events[i].events & EPOLLIN) {
char buf[4096];
recv(g_sockfd, buf, sizeof(buf), 0);
}
if (events[i].events & EPOLLOUT) {
pthread_mutex_lock(&sendq.lock);
while (sendq.head) {
struct send_node *n = sendq.head;
pthread_mutex_unlock(&sendq.lock);
int nbytes = send(g_sockfd, n->data + n->sent,
(int)(n->len - n->sent), MSG_NOSIGNAL);
pthread_mutex_lock(&sendq.lock);
if (nbytes > 0) {
n->sent += (uint32_t)nbytes;
if (n->sent == n->len) {
sendq_pop();
continue;
}
// partial send
break;
}
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
DEBUGLOG("Sender: send error errno=%d\n", errno);
pthread_mutex_unlock(&sendq.lock);
close(g_sockfd);
g_sockfd = -1;
goto out;
}
// nbytes == 0
DEBUGLOG("Sender: send returned 0\n");
pthread_mutex_unlock(&sendq.lock);
close(g_sockfd);
g_sockfd = -1;
goto out;
}
pthread_mutex_unlock(&sendq.lock);
}
}
}
out:
close(epfd);
DEBUGLOG("Sender thread stopped\n");
return NULL;
}
/* ================= 网络逻辑 ================= */
static int connect_peer(void)
{
if (peer_port <= 0 || peer_ip[0] == '\0')
return -1;
if (g_sockfd >= 0) {
close(g_sockfd);
g_sockfd = -1;
}
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
return -1;
}
struct sockaddr_in a;
memset(&a, 0, sizeof(a));
a.sin_family = AF_INET;
a.sin_port = htons(peer_port);
if (inet_pton(AF_INET, peer_ip, &a.sin_addr) != 1) {
DEBUGLOG("inet_pton failed for ip=%s\n", peer_ip);
close(fd);
return -1;
}
if (connect(fd, (struct sockaddr *)&a, sizeof(a)) != 0) {
// 这里可以重试;按你的要求先简单返回失败
// perror("connect");
close(fd);
return -1;
}
// non-blocking配合 epoll
int flags = fcntl(fd, F_GETFL, 0);
if (flags >= 0) fcntl(fd, F_SETFL, flags | O_NONBLOCK);
g_sockfd = fd;
DEBUGLOG("connect ok %s:%d\n", peer_ip, peer_port);
return 0;
}
/* ================= perf buffer 回调 ================= */
static void handle_event(void *ctx, int cpu, void *data, __u32 size)
{
(void)ctx; (void)cpu;
if (size < sizeof(struct replica_event)) return;
struct replica_event *e = (struct replica_event*)data;
if (e->type == EVENT_SSYNC) {
memset(peer_ip, 0, sizeof(peer_ip));
memcpy(peer_ip, e->sync.ip, e->sync.ip_len);
peer_port = e->sync.port;
SYNC_SEQ = e->sync.seq;
local_seq = SYNC_SEQ;
read_off = 0;
DEBUGLOG("SSYNC: peer=%s:%d SYNC_SEQ=%llu\n",
peer_ip, peer_port, (unsigned long long)SYNC_SEQ);
// 停止旧线程
should_stop = 1;
if (reader_thread) {
pthread_join(reader_thread, NULL);
reader_thread = 0;
}
if (sender_thread) {
pthread_join(sender_thread, NULL);
sender_thread = 0;
}
if (g_sockfd >= 0) {
close(g_sockfd);
g_sockfd = -1;
}
sendq_free_all();
return;
}
if (e->type == EVENT_SREADY) {
DEBUGLOG("SREADY\n");
if (connect_peer() != 0) {
DEBUGLOG("connect_peer failed\n");
return;
}
// 启动双线程
should_stop = 0;
if (pthread_create(&reader_thread, NULL, reader_thread_func, NULL) != 0) {
perror("pthread_create reader");
return;
}
if (pthread_create(&sender_thread, NULL, sender_thread_func, NULL) != 0) {
perror("pthread_create sender");
pthread_cancel(reader_thread);
return;
}
DEBUGLOG("Reader and Sender threads started\n");
return;
}
}
/* ================= main ================= */
int main(int argc, char **argv)
{
int rc = replica_shm_open(&g_shm, REPLICA_SHM_NAME, REPLICA_SHM_SIZE, 0);
if (rc != 0) {
fprintf(stderr, "replica_shm_open failed rc=%d (did you create it in kvstore?)\n", rc);
return 1;
}
struct replica_bpf *skel;
struct perf_buffer *pb = NULL;
int err;
/* Open BPF application */
skel = replica_bpf__open();
if (!skel) {
fprintf(stderr, "Failed to open BPF skeleton\n");
return 1;
}
/* Load & verify BPF programs */
err = replica_bpf__load(skel);
if (err) {
fprintf(stderr, "Failed to load and verify BPF skeleton\n");
goto cleanup;
}
/* Attach tracepoint handler */
err = replica_bpf__attach(skel);
if (err) {
fprintf(stderr, "Failed to attach BPF skeleton\n");
goto cleanup;
}
printf("Successfully started! \n");
pb = perf_buffer__new(bpf_map__fd(skel->maps.events), 8,
handle_event, NULL, NULL, NULL);
if(!pb){
goto cleanup;
}
while (1) {
perf_buffer__poll(pb, 1000); // 处理事件
}
perf_buffer__free(pb);
cleanup:
should_stop = 1;
if (reader_thread) pthread_join(reader_thread, NULL);
if (sender_thread) pthread_join(sender_thread, NULL);
if (g_sockfd >= 0) close(g_sockfd);
replica_shm_close(&g_shm);
sendq_free_all();
replica_bpf__destroy(skel);
return -err;
}