add network

This commit is contained in:
King
2024-05-18 14:10:08 +00:00
parent f4c0b74285
commit 5a7fa95d2c
6 changed files with 738 additions and 0 deletions

189
proactor.c Executable file
View File

@@ -0,0 +1,189 @@
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
extern int kvs_protocol(char *msg, int length, char *response);
struct conn_info {
int fd;
int event;
};
int p_init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);
if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {
perror("bind");
return -1;
}
listen(sockfd, 10);
return sockfd;
}
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
int set_event_recv(struct io_uring *ring, int sockfd,
void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_READ,
};
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_send(struct io_uring *ring, int sockfd,
void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_WRITE,
};
io_uring_prep_send(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
socklen_t *addrlen, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_ACCEPT,
};
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
typedef int (*msg_handler)(char *msg, int length, char *response);
static msg_handler kvs_handler;
int proactor_start(unsigned short port, msg_handler handler) {
int sockfd = p_init_server(port);
kvs_handler = handler;
struct io_uring_params params;
memset(&params, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
#if 0
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
char response[BUFFER_LENGTH] = {0};
while (1) {
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait
int i = 0;
for (i = 0;i < nready;i ++) {
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == EVENT_ACCEPT) {
set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
//printf("set_event_accept\n"); //
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
} else if (result.event == EVENT_READ) { //
int ret = entries->res;
if (ret == 0) {
close(result.fd);
} else if (ret > 0) {
//int kvs_protocol(char *msg, int length, char *response);
ret = kvs_handler(buffer, ret, response);
set_event_send(&ring, result.fd, response, ret, 0);
}
} else if (result.event == EVENT_WRITE) {
//
int ret = entries->res;
//printf("set_event_send ret: %d, %s\n", ret, buffer);
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}
}
io_uring_cq_advance(&ring, nready);
}
}