Files
ldb/reactor.c
2026-01-20 11:51:38 +00:00

427 lines
9.0 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/eventfd.h>
#include "server.h"
#define CONNECTION_SIZE 1024 // 1024 * 1024
#define MAX_PORTS 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#if ENABLE_KVSTORE
// typedef int (*msg_handler)(char *msg, int length, char *response);
// typedef int (*msg_handler)(char *request, int request_length, char *response, int *response_length);
typedef int (*msg_handler)(struct conn* conn);
extern int try_connect_master(char *ip, int port);
static msg_handler kvs_handler;
// 0 need more, -1 error, =1 suc
int kvs_request(struct conn *c) {
// int consumed_out = kvs_handler(c->rbuffer, c->rlength, c->wbuffer, &c->wlength);
int consumed_out = kvs_handler(c);
return consumed_out;
}
int kvs_response(struct conn *c) {
}
#endif
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int epfd = 0;
struct timeval begin;
int wakeup_fd = -1;
struct conn conn_list[CONNECTION_SIZE] = {0};
// fd
// 1 add, 0 mod
int set_event(int fd, int event, int flag) {
if (flag) { // non-zero add
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else { // zero mod
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event) {
if (fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
conn_list[fd].is_from_master = 0;
set_event(fd, event, 1);
}
// listenfd(sockfd) --> EPOLLIN --> accept_cb
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
if (clientfd < 0) {
printf("accept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
event_register(clientfd, EPOLLIN); // | EPOLLET
if ((clientfd % 1000) == 0) {
struct timeval current;
gettimeofday(&current, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, &current, sizeof(struct timeval));
//printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);
}
return 0;
}
int recv_cb(int fd) {
struct conn *c = &conn_list[fd];
int avail = BUFFER_LENGTH - c->rlength;
// printf("avail: %d\n", avail);
if (avail <= 0) {
// 缓冲满了还没解析出来:协议异常或包过大
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
int count = recv(fd, c->rbuffer + c->rlength, avail, 0);
if (count == 0) { // disconnect
//printf("client disconnect: %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished
return 0;
} else if (count < 0) { //
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
c->rlength += count;
//printf("RECV: %s\n", conn_list[fd].rbuffer);
#if 0 // echo
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
#elif ENABLE_HTTP
http_request(&conn_list[fd]);
#elif ENABLE_WEBSOCKET
ws_request(&conn_list[fd]);
#elif ENABLE_KVSTORE
int consumed = kvs_request(c);
if(consumed < 0){
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
// 清理 buffer
if (consumed > 0 && consumed < c->rlength) {
// 有剩余未处理数据,搬移到 buffer 头部
int left = c->rlength - consumed;
if (left > 0) memmove(c->rbuffer, c->rbuffer + consumed, left);
c->rlength = left;
if (c->wlength > 0) set_event(fd, EPOLLOUT, 0);
return count;
}else{
c->rlength = 0;
if(c->wlength > 0) set_event(fd, EPOLLOUT, 0);
return count;
}
#endif
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd) {
#if ENABLE_HTTP
http_response(&conn_list[fd]);
#elif ENABLE_WEBSOCKET
ws_response(&conn_list[fd]);
#elif ENABLE_KVSTORE
kvs_response(&conn_list[fd]);
#endif
struct conn *c = &conn_list[fd];
int sent_total = 0;
pthread_mutex_lock(&c->g_sync_lock);
while (c->wlength > 0) {
ssize_t n = send(fd, c->wbuffer, (size_t)c->wlength, MSG_NOSIGNAL);
if (n > 0) {
sent_total += (int)n;
if (n == c->wlength) {
/* 全部发完 */
c->wlength = 0;
break;
}
/* 只发了一部分:把剩余数据搬到 buffer 头部 */
int left = c->wlength - (int)n;
memmove(c->wbuffer, c->wbuffer + n, (size_t)left);
c->wlength = left;
/* 不要在这里死循环占用 CPU交给下一次 EPOLLOUT */
break;
}
if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* 暂时发不出去,等下一次可写事件 */
pthread_mutex_unlock(&c->g_sync_lock);
set_event(fd, EPOLLOUT, 0);
return sent_total;
}
/* 对端断开 / 其他错误 */
int e = errno;
pthread_mutex_unlock(&c->g_sync_lock);
printf("send fd=%d errno=%d %s\n", fd, e, strerror(e));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
break;
}
pthread_mutex_unlock(&c->g_sync_lock);
if (c->wlength > 0) {
/* 还有没发完,继续监听可写 */
set_event(fd, EPOLLOUT, 0);
} else {
/* 发完了,回到读 */
set_event(fd, EPOLLIN, 0);
}
return sent_total;
}
// wakup fd
int handle_wakeup_fd_cb(int fd);
int init_wakeup_fd(void) {
wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (wakeup_fd < 0) {
printf("eventfd failed: errno=%d %s\n", errno, strerror(errno));
return -1;
}
conn_list[wakeup_fd].fd = wakeup_fd;
conn_list[wakeup_fd].r_action.recv_callback = handle_wakeup_fd_cb;
set_event(wakeup_fd, EPOLLIN, 1);
return 0;
}
// EPOLLOUT
void sync_wakeup(int fd) {
if (wakeup_fd < 0) return;
set_event(fd, EPOLLOUT, 0);
uint64_t one = 1;
ssize_t n = write(wakeup_fd, &one, sizeof(one));
}
int handle_wakeup_fd_cb(int fd) {
uint64_t v;
while (1) {
ssize_t n = read(wakeup_fd, &v, sizeof(v));
if (n == sizeof(v)) continue;
if (n < 0 && errno == EAGAIN) break; // 已经读空
break;
}
return 0;
}
int r_init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
servaddr.sin_port = htons(port); // 0-1023,
if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
printf("bind failed: %s\n", strerror(errno));
}
listen(sockfd, 10);
//printf("listen finshed: %d\n", sockfd); // 3
return sockfd;
}
int reactor_start(unsigned short port, msg_handler handler, char *m_ip, int m_port) {
//unsigned short port = 2000;
kvs_handler = handler;
epfd = epoll_create(1);
if(init_wakeup_fd() < 0){
close(epfd);
return -1;
}
// slave
if(m_ip != NULL){
int masterfd = try_connect_master(m_ip, m_port);
event_register(masterfd, EPOLLIN);
conn_list[masterfd].is_from_master = 1;
}
int i = 0;
for (i = 0;i < MAX_PORTS;i ++) {
int sockfd = r_init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
conn_list[sockfd].is_from_master = 0;
set_event(sockfd, EPOLLIN, 1);
}
gettimeofday(&begin, NULL);
while (1) { // mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
#if 0
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
} else if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
#else
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
#endif
}
}
if (wakeup_fd >= 0) close(wakeup_fd);
if (epfd >= 0) close(epfd);
return 0;
}