Files
ldb/ebpf/old.c/replica.c

337 lines
8.3 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>
#include <bpf/libbpf.h>
#include "replica.skel.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include "replica.h"
#define DEBUGLOG printf
typedef enum {
OFFLINE = 0,
ONLINE = 1,
} replica_state_e;
struct cmd_node {
__u32 len;
uint8_t *cmd;
struct cmd_node *next;
};
struct pending_queue {
struct cmd_node *head;
struct cmd_node *tail;
int count;
};
/* ================= 全局状态 ================= */
static replica_state_e state = OFFLINE;
static int sockfd = -1;
static int epollfd = -1;
static char peer_ip[MAX_IP_LEN];
static int peer_port;
static struct pending_queue pending = {
.head = NULL,
.tail = NULL,
.count = 0,
};
/* ================= pending 队列操作 ================= */
static void pending_free()
{
struct pending_queue *q = &pending;
struct cmd_node *cur = q->head;
while (cur) {
struct cmd_node *tmp = cur;
cur = cur->next;
free(tmp->cmd);
free(tmp);
}
q->head = q->tail = NULL;
q->count = 0;
}
static void pending_push(__u32 len, const uint8_t *cmd)
{
struct cmd_node *node = malloc(sizeof(*node));
if (!node)
return;
node->cmd = malloc(len);
if (!node->cmd) {
free(node);
return;
}
memcpy(node->cmd, cmd, len);
node->len = len;
node->next = NULL;
if (!pending.tail) {
pending.head = pending.tail = node;
} else {
pending.tail->next = node;
pending.tail = node;
}
pending.count++;
}
static long long int sendn = 0;
static void pending_send_all(void)
{
struct cmd_node *cur = pending.head;
int need_out = 0;
int sent_count = 0;
const int MAX_BATCH = 100; // 批量发送上限,避免阻塞过久
while (cur && sent_count < MAX_BATCH) {
// 使用 MSG_MORE 合并多个小包
int flags = (cur->next && sent_count < MAX_BATCH - 1) ? MSG_MORE : 0;
int rt = send(sockfd, cur->cmd, cur->len, flags);
if (rt == (int)cur->len) {
sendn += rt;
printf("%s\n", cur->cmd);
struct cmd_node *tmp = cur;
cur = cur->next;
free(tmp->cmd);
free(tmp);
pending.count--;
pending.head = cur;
sent_count++;
} else if (rt > 0) {
sendn += rt;
memmove(cur->cmd, cur->cmd + rt, cur->len - rt);
cur->len -= rt;
need_out = 1;
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
need_out = 1;
break;
} else {
perror("send failed");
state = OFFLINE;
break;
}
}
}
DEBUGLOG("sendn :%lld\n", sendn);
pending.head = cur;
if (!cur) pending.tail = NULL;
if (sockfd >= 0 && state == ONLINE) {
struct epoll_event ev = {0};
ev.data.fd = sockfd;
ev.events = EPOLLIN;
if (need_out || pending.head) {
ev.events |= EPOLLOUT;
}
epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev);
}
}
/* ================= 网络逻辑 ================= */
static void try_connect(void)
{
if (sockfd > 0) {
close(sockfd);
sockfd = -1;
}
struct sockaddr_in addr = {};
int i = 0;
addr.sin_family = AF_INET;
addr.sin_port = htons(peer_port);
inet_pton(AF_INET, peer_ip, &addr.sin_addr);
for (i = 0; i < 10; ++i) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket");
return;
}
DEBUGLOG("connect try %d... %s:%d\n", i + 1, peer_ip, peer_port);
if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
DEBUGLOG("connect success: %s:%d\n", peer_ip, peer_port);
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev);
state = ONLINE;
if (pending.head) {
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev);
}
return;
}
perror("connect");
close(sockfd);
sockfd = -1;
sleep(1);
}
DEBUGLOG("connect failed after 10 retries\n");
}
static void handle_socket_readable(void)
{
char buf[65536];
while (1) {
int n = recv(sockfd, buf, sizeof(buf), MSG_DONTWAIT);
if (n > 0) {
continue;
} else if (n == 0) {
state = OFFLINE;
epoll_ctl(epollfd, EPOLL_CTL_DEL, sockfd, NULL);
close(sockfd);
sockfd = -1;
DEBUGLOG("connection closed\n");
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
perror("recv");
state = OFFLINE;
epoll_ctl(epollfd, EPOLL_CTL_DEL, sockfd, NULL);
close(sockfd);
sockfd = -1;
break;
}
}
}
static void handle_socket_writable(void)
{
pending_send_all();
}
/* ================= ring buffer 回调 ================= */
static int handle_event(void *ctx, void *data, size_t size)
{
struct replica_event *evt = data;
switch (evt->type) {
case EVENT_SSYNC:
strncpy(peer_ip, evt->sync.ip, sizeof(peer_ip));
peer_port = evt->sync.port;
DEBUGLOG("SSYNC [%s:%d]\n", peer_ip, peer_port);
state = OFFLINE;
break;
case EVENT_COMPLETED_CMD:
// 这里收到的可能是半个命令,或者是多个命令的粘包
// 但对于转发器来说,只是字节流,直接 push 即可
if (evt->complete.len > 0) {
pending_push(evt->complete.len, evt->complete.cmd);
}
if (state == ONLINE && sockfd >= 0 && pending.head) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT;
ev.data.fd = sockfd;
epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev);
}
break;
case EVENT_SREADY:
DEBUGLOG("SREADY\n");
if (state == OFFLINE)
try_connect();
break;
}
return 0;
}
int main(int argc, char **argv)
{
struct replica_bpf *skel;
struct ring_buffer *rb = NULL;
int err;
// 提高 rlimit 以允许加载 BPF
struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
setrlimit(RLIMIT_MEMLOCK, &r);
skel = replica_bpf__open();
if (!skel) {
fprintf(stderr, "Failed to open BPF skeleton\n");
return 1;
}
err = replica_bpf__load(skel);
if (err) {
fprintf(stderr, "Failed to load BPF skeleton\n");
goto cleanup;
}
err = replica_bpf__attach(skel);
if (err) {
fprintf(stderr, "Failed to attach BPF skeleton\n");
goto cleanup;
}
printf("Successfully started! Monitoring TCP port 8888 (Kernel Side)...\n");
rb = ring_buffer__new(bpf_map__fd(skel->maps.rb), handle_event, NULL, NULL);
if (!rb) {
fprintf(stderr, "Failed to create ring buffer\n");
goto cleanup;
}
epollfd = epoll_create1(0);
// ... (主循环保持不变) ...
// 主循环建议:
while (1) {
struct epoll_event events[10];
// 既然追求性能Polling 依然是必要的
// 10ms 的延迟对于 RingBuffer 消费是可以接受的
int poll_timeout = (state == ONLINE) ? 10 : 100;
ring_buffer__poll(rb, poll_timeout);
if (state == OFFLINE) continue;
int nfds = epoll_wait(epollfd, events, 10, 0);
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == sockfd) {
if (events[i].events & EPOLLIN) handle_socket_readable();
if (events[i].events & EPOLLOUT) handle_socket_writable();
}
}
}
cleanup:
// ... (清理代码保持不变) ...
if (rb) ring_buffer__free(rb);
pending_free();
if (sockfd >= 0) close(sockfd);
if (epollfd >= 0) close(epollfd);
replica_bpf__destroy(skel);
return -err;
}