From 5a7fa95d2c7fe60faa52abc9ef861e1bf3cb74b1 Mon Sep 17 00:00:00 2001 From: King <1989wangbojing@163.com> Date: Sat, 18 May 2024 14:10:08 +0000 Subject: [PATCH] add network --- kvstore.c | 43 ++++++++ kvstore.h | 37 +++++++ ntyco.c | 91 +++++++++++++++ proactor.c | 189 ++++++++++++++++++++++++++++++++ reactor.c | 316 +++++++++++++++++++++++++++++++++++++++++++++++++++++ server.h | 62 +++++++++++ 6 files changed, 738 insertions(+) create mode 100644 kvstore.c create mode 100644 kvstore.h create mode 100755 ntyco.c create mode 100755 proactor.c create mode 100644 reactor.c create mode 100644 server.h diff --git a/kvstore.c b/kvstore.c new file mode 100644 index 0000000..fca8bc5 --- /dev/null +++ b/kvstore.c @@ -0,0 +1,43 @@ + + + +#include +#include +#include + +#include "kvstore.h" + +/* + * msg: request message + * length: length of request message + * response: need to send + * @return : length of response + */ + +int kvs_protocol(char *msg, int length, char *response) { + + printf("recv %d : %s\n", length, msg); + + memcpy(response, msg, length); + + return strlen(response); +} + + + +int main(int argc, char *argv[]) { + + if (argc != 2) return -1; + + int port = atoi(argv[1]); + +#if (NETWORK_SELECT == NETWORK_REACTOR) + reactor_start(port, kvs_protocol); // +#elif (NETWORK_SELECT == NETWORK_PROACTOR) + ntyco_start(port, kvs_protocol); +#elif (NETWORK_SELECT == NETWORK_NTYCO) + proactor_start(port, kvs_protocol); +#endif +} + + diff --git a/kvstore.h b/kvstore.h new file mode 100644 index 0000000..4cb3bbf --- /dev/null +++ b/kvstore.h @@ -0,0 +1,37 @@ + + + +#ifndef __KV_STORE_H__ +#define __KV_STORE_H__ + + +#define NETWORK_REACTOR 0 +#define NETWORK_PROACTOR 1 +#define NETWORK_NTYCO 2 + +#define NETWORK_SELECT NETWORK_REACTOR + + +typedef int (*msg_handler)(char *msg, int length, char *response); + + +extern int reactor_start(unsigned short port, msg_handler handler); +extern int proactor_start(unsigned short port, msg_handler handler); +extern int ntyco_start(unsigned short port, msg_handler handler); + + + +const char *command[] = { + "SET", "GET", "DEL", "MOD", "EXIST" +}; + +const char *response[] = { + +}; + + + +#endif + + + diff --git a/ntyco.c b/ntyco.c new file mode 100755 index 0000000..7ce57f8 --- /dev/null +++ b/ntyco.c @@ -0,0 +1,91 @@ + + + + +#include "nty_coroutine.h" + +#include + + +typedef int (*msg_handler)(char *msg, int length, char *response); +static msg_handler kvs_handler; + + +void server_reader(void *arg) { + int fd = *(int *)arg; + int ret = 0; + + + while (1) { + + char buf[1024] = {0}; + ret = recv(fd, buf, 1024, 0); + if (ret > 0) { + + char response[1024] = {0}; + int slength = kvs_handler(buf, ret, response); + + ret = send(fd, response, slength, 0); + if (ret == -1) { + close(fd); + break; + } + } else if (ret == 0) { + close(fd); + break; + } + + } +} + + + +void server(void *arg) { + + unsigned short port = *(unsigned short *)arg; + + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) return ; + + struct sockaddr_in local, remote; + local.sin_family = AF_INET; + local.sin_port = htons(port); + local.sin_addr.s_addr = INADDR_ANY; + bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in)); + + listen(fd, 20); + printf("listen port : %d\n", port); + + + while (1) { + socklen_t len = sizeof(struct sockaddr_in); + int cli_fd = accept(fd, (struct sockaddr*)&remote, &len); + + + nty_coroutine *read_co; + nty_coroutine_create(&read_co, server_reader, &cli_fd); + + } + +} + + + + + +int ntyco_start(unsigned short port, msg_handler handler) { + + //int port = atoi(argv[1]); + kvs_handler = handler; + + + nty_coroutine *co = NULL; + nty_coroutine_create(&co, server, &port); + + nty_schedule_run(); + +} + + + + diff --git a/proactor.c b/proactor.c new file mode 100755 index 0000000..2e4ff78 --- /dev/null +++ b/proactor.c @@ -0,0 +1,189 @@ + + + +#include +#include +#include +#include +#include + + +#define EVENT_ACCEPT 0 +#define EVENT_READ 1 +#define EVENT_WRITE 2 + +extern int kvs_protocol(char *msg, int length, char *response); + + + +struct conn_info { + int fd; + int event; +}; + + +int p_init_server(unsigned short port) { + + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in serveraddr; + memset(&serveraddr, 0, sizeof(struct sockaddr_in)); + serveraddr.sin_family = AF_INET; + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); + serveraddr.sin_port = htons(port); + + if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { + perror("bind"); + return -1; + } + + listen(sockfd, 10); + + return sockfd; +} + + + +#define ENTRIES_LENGTH 1024 +#define BUFFER_LENGTH 1024 + +int set_event_recv(struct io_uring *ring, int sockfd, + void *buf, size_t len, int flags) { + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + + struct conn_info accept_info = { + .fd = sockfd, + .event = EVENT_READ, + }; + + io_uring_prep_recv(sqe, sockfd, buf, len, flags); + memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); + +} + + +int set_event_send(struct io_uring *ring, int sockfd, + void *buf, size_t len, int flags) { + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + + struct conn_info accept_info = { + .fd = sockfd, + .event = EVENT_WRITE, + }; + + io_uring_prep_send(sqe, sockfd, buf, len, flags); + memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); + +} + + + +int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, + socklen_t *addrlen, int flags) { + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + + struct conn_info accept_info = { + .fd = sockfd, + .event = EVENT_ACCEPT, + }; + + io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags); + memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); + +} + + +typedef int (*msg_handler)(char *msg, int length, char *response); +static msg_handler kvs_handler; + + + + +int proactor_start(unsigned short port, msg_handler handler) { + + int sockfd = p_init_server(port); + kvs_handler = handler; + + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + + struct io_uring ring; + io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms); + + +#if 0 + struct sockaddr_in clientaddr; + socklen_t len = sizeof(clientaddr); + accept(sockfd, (struct sockaddr*)&clientaddr, &len); +#else + + struct sockaddr_in clientaddr; + socklen_t len = sizeof(clientaddr); + set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0); + +#endif + + char buffer[BUFFER_LENGTH] = {0}; + char response[BUFFER_LENGTH] = {0}; + + + while (1) { + + io_uring_submit(&ring); + + + struct io_uring_cqe *cqe; + io_uring_wait_cqe(&ring, &cqe); + + struct io_uring_cqe *cqes[128]; + int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait + + int i = 0; + for (i = 0;i < nready;i ++) { + + struct io_uring_cqe *entries = cqes[i]; + struct conn_info result; + memcpy(&result, &entries->user_data, sizeof(struct conn_info)); + + if (result.event == EVENT_ACCEPT) { + + set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0); + //printf("set_event_accept\n"); // + + int connfd = entries->res; + + set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); + + + } else if (result.event == EVENT_READ) { // + + int ret = entries->res; + + if (ret == 0) { + close(result.fd); + } else if (ret > 0) { + + //int kvs_protocol(char *msg, int length, char *response); + ret = kvs_handler(buffer, ret, response); + + set_event_send(&ring, result.fd, response, ret, 0); + } + } else if (result.event == EVENT_WRITE) { // + + int ret = entries->res; + //printf("set_event_send ret: %d, %s\n", ret, buffer); + + set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); + + } + + } + + io_uring_cq_advance(&ring, nready); + } + +} + + diff --git a/reactor.c b/reactor.c new file mode 100644 index 0000000..616eb0e --- /dev/null +++ b/reactor.c @@ -0,0 +1,316 @@ + + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "server.h" + + +#define CONNECTION_SIZE 1024 // 1024 * 1024 + +#define MAX_PORTS 20 + +#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) + + + +#if ENABLE_KVSTORE + +typedef int (*msg_handler)(char *msg, int length, char *response); + +static msg_handler kvs_handler; + +int kvs_request(struct conn *c) { + + printf("recv %d : %s\n", c->rlength, c->rbuffer); + + c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer); + +} + +int kvs_response(struct conn *c) { + + + +} + + +#endif + + + +int accept_cb(int fd); +int recv_cb(int fd); +int send_cb(int fd); + + + +int epfd = 0; +struct timeval begin; + + + +struct conn conn_list[CONNECTION_SIZE] = {0}; +// fd + + +int set_event(int fd, int event, int flag) { + + if (flag) { // non-zero add + + struct epoll_event ev; + ev.events = event; + ev.data.fd = fd; + epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); + + } else { // zero mod + + struct epoll_event ev; + ev.events = event; + ev.data.fd = fd; + epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); + + } + + +} + + +int event_register(int fd, int event) { + + if (fd < 0) return -1; + + conn_list[fd].fd = fd; + conn_list[fd].r_action.recv_callback = recv_cb; + conn_list[fd].send_callback = send_cb; + + memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH); + conn_list[fd].rlength = 0; + + memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH); + conn_list[fd].wlength = 0; + + set_event(fd, event, 1); +} + + +// listenfd(sockfd) --> EPOLLIN --> accept_cb +int accept_cb(int fd) { + + struct sockaddr_in clientaddr; + socklen_t len = sizeof(clientaddr); + + int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len); + printf("accept finshed: %d, fd: %d\n", clientfd, fd); + if (clientfd < 0) { + printf("accept errno: %d --> %s\n", errno, strerror(errno)); + return -1; + } + + event_register(clientfd, EPOLLIN); // | EPOLLET + + if ((clientfd % 1000) == 0) { + + struct timeval current; + gettimeofday(¤t, NULL); + + int time_used = TIME_SUB_MS(current, begin); + memcpy(&begin, ¤t, sizeof(struct timeval)); + + + printf("accept finshed: %d, time_used: %d\n", clientfd, time_used); + + } + + return 0; +} + + +int recv_cb(int fd) { + + memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH ); + int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0); + if (count == 0) { // disconnect + printf("client disconnect: %d\n", fd); + close(fd); + + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished + + return 0; + } else if (count < 0) { // + + printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno)); + close(fd); + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + + return 0; + } + + + conn_list[fd].rlength = count; + //printf("RECV: %s\n", conn_list[fd].rbuffer); + +#if 0 // echo + + conn_list[fd].wlength = conn_list[fd].rlength; + memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength); + + printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer); + +#elif ENABLE_HTTP + + http_request(&conn_list[fd]); + +#elif ENABLE_WEBSOCKET + + ws_request(&conn_list[fd]); + +#elif ENABLE_KVSTORE + + kvs_request(&conn_list[fd]); + +#endif + + + set_event(fd, EPOLLOUT, 0); + + return count; +} + + +int send_cb(int fd) { + +#if ENABLE_HTTP + + http_response(&conn_list[fd]); + +#elif ENABLE_WEBSOCKET + + ws_response(&conn_list[fd]); + +#elif ENABLE_KVSTORE + + kvs_response(&conn_list[fd]); + +#endif + + int count = 0; + +#if 0 + if (conn_list[fd].status == 1) { + //printf("SEND: %s\n", conn_list[fd].wbuffer); + count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); + set_event(fd, EPOLLOUT, 0); + } else if (conn_list[fd].status == 2) { + set_event(fd, EPOLLOUT, 0); + } else if (conn_list[fd].status == 0) { + + if (conn_list[fd].wlength != 0) { + count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); + } + + set_event(fd, EPOLLIN, 0); + } +#else + + if (conn_list[fd].wlength != 0) { + count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); + } + + set_event(fd, EPOLLIN, 0); + +#endif + //set_event(fd, EPOLLOUT, 0); + + return count; +} + + + +int r_init_server(unsigned short port) { + + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + + struct sockaddr_in servaddr; + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 + servaddr.sin_port = htons(port); // 0-1023, + + if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { + printf("bind failed: %s\n", strerror(errno)); + } + + listen(sockfd, 10); + //printf("listen finshed: %d\n", sockfd); // 3 + + return sockfd; + +} + +int reactor_start(unsigned short port, msg_handler handler) { + + //unsigned short port = 2000; + kvs_handler = handler; + printf("reactor_entry: %d\n", port); + + epfd = epoll_create(1); + + int i = 0; + + for (i = 0;i < MAX_PORTS;i ++) { + + int sockfd = r_init_server(port + i); + + conn_list[sockfd].fd = sockfd; + conn_list[sockfd].r_action.recv_callback = accept_cb; + + set_event(sockfd, EPOLLIN, 1); + } + + gettimeofday(&begin, NULL); + + while (1) { // mainloop + + struct epoll_event events[1024] = {0}; + int nready = epoll_wait(epfd, events, 1024, -1); + + int i = 0; + for (i = 0;i < nready;i ++) { + + int connfd = events[i].data.fd; + +#if 0 + if (events[i].events & EPOLLIN) { + conn_list[connfd].r_action.recv_callback(connfd); + } else if (events[i].events & EPOLLOUT) { + conn_list[connfd].send_callback(connfd); + } + +#else + if (events[i].events & EPOLLIN) { + conn_list[connfd].r_action.recv_callback(connfd); + } + + if (events[i].events & EPOLLOUT) { + conn_list[connfd].send_callback(connfd); + } +#endif + } + + } + + +} + + diff --git a/server.h b/server.h new file mode 100644 index 0000000..271a05a --- /dev/null +++ b/server.h @@ -0,0 +1,62 @@ + + + + +#ifndef __SERVER_H__ +#define __SERVER_H__ + +#define BUFFER_LENGTH 1024 + +#define ENABLE_HTTP 0 +#define ENABLE_WEBSOCKET 0 +#define ENABLE_KVSTORE 1 + + +typedef int (*RCALLBACK)(int fd); + + +struct conn { + int fd; + + char rbuffer[BUFFER_LENGTH]; + int rlength; + + char wbuffer[BUFFER_LENGTH]; + int wlength; + + RCALLBACK send_callback; + + union { + RCALLBACK recv_callback; + RCALLBACK accept_callback; + } r_action; + + int status; +#if 1 // websocket + char *payload; + char mask[4]; +#endif +}; + +#if ENABLE_HTTP +int http_request(struct conn *c); +int http_response(struct conn *c); +#endif + +#if ENABLE_WEBSOCKET +int ws_request(struct conn *c); +int ws_response(struct conn *c); +#endif + +#if ENABLE_KVSTORE +int kvs_request(struct conn *c); +int kvs_response(struct conn *c); + +#endif + + + + +#endif + +