diff --git a/.gitignore b/.gitignore index e744c4d..758b615 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.a /ebpf/libbpf-bootstrap +/doc kvstore testcase diff --git a/Makefile b/Makefile index 35297ea..a2aa845 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC = gcc CFLAGS = -g -DJEMALLOC_NO_DEMANGLE -NET_SRCS = ntyco.c proactor.c reactor.c kvstore.c +NET_SRCS = ntyco.c proactor.c reactor.c kvstore.c network/chainbuffer.c KV_SRCS = kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_protocol_resp.c kvs_slave.c replica_shm.c MEM_SRCS = ./memory/mempool.c ./memory/alloc_dispatch.c COMMON_SRCS = ./common/config.c ./diskuring/diskuring.c diff --git a/Makefile.default b/Makefile.default deleted file mode 100644 index 2b24b49..0000000 --- a/Makefile.default +++ /dev/null @@ -1,29 +0,0 @@ - - -CC = gcc -FLAGS = -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -luring -TARGET = kvstore - -SRCS = kvstore.c ntyco.c proactor.c kvs_array.c kvs_rbtree.c -# INC = -I ./NtyCo/core/ -# LIBS = -L ./NtyCo/ -lntyco -luring -# FLAGS = -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -luring - -OBJS = $(SRCS:.c=.o) -TESTCASE = testcase -SUBDIR = ./NtyCo/ - -all: $(SUBDIR) $(TARGET) # $(TESTCASE) - -$(SUBDIR): ECHO - make -C $@ - -ECHO: - @echo $(SUBDIR) - -$(TARGET): $(OBJS) - $(CC) -o $@ $^ $(FLAGS) - -clean: - rm -rf kvstore *.o - diff --git a/README.md b/README.md index b9a9cb5..d9f3c3d 100644 --- a/README.md +++ b/README.md @@ -23,26 +23,17 @@ sudo apt install libxml2 libxml2-dev sudo apt install -y libhiredis-dev # bpftrace sudo apt install -y bpftrace libelf libelf-dev clang +# jemalloc +sudo apt install libjemalloc-dev git clone git@gitlab.0voice.com:lianyiheng/9.1-kvstore.git cd 9.1-kvstore/ git submodule update --init --recursive -./init.sh make ``` -``` -docker run -it --rm \ - -v "$(pwd)":/workdir \ - -w /workdir \ - --pid=host \ - --privileged \ - ghcr.io/eunomia-bpf/bpftime:latest \ - /bin/bash -``` - ## 测试 ### 测试1:性能测试 测试条件: @@ -57,57 +48,6 @@ docker run -it --rm \ #### 内存分配: malloc ```bash lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 -Connected to 192.168.10.129:8888 -BATCH (N=3000000) --> time_used=3294 ms, qps=910746 -BATCH (N=3000000) --> time_used=3501 ms, qps=856898 -BATCH (N=3000000) --> time_used=3457 ms, qps=867804 -BATCH (N=3000000) --> time_used=3351 ms, qps=895255 -BATCH (N=3000000) --> time_used=3320 ms, qps=903614 -BATCH (N=3000000) --> time_used=3551 ms, qps=844832 -BATCH (N=3000000) --> time_used=3354 ms, qps=894454 -BATCH (N=3000000) --> time_used=3475 ms, qps=863309 -BATCH (N=3000000) --> time_used=3404 ms, qps=881316 -BATCH (N=3000000) --> time_used=3460 ms, qps=867052 -BATCH (N=3000000) --> time_used=3392 ms, qps=884433 -BATCH (N=3000000) --> time_used=3427 ms, qps=875401 -BATCH (N=3000000) --> time_used=3441 ms, qps=871839 -BATCH (N=3000000) --> time_used=3471 ms, qps=864304 -BATCH (N=3000000) --> time_used=3354 ms, qps=894454 -BATCH (N=3000000) --> time_used=3447 ms, qps=870322 -BATCH (N=3000000) --> time_used=3364 ms, qps=891795 -BATCH (N=3000000) --> time_used=3200 ms, qps=937500 -BATCH (N=3000000) --> time_used=3159 ms, qps=949667 -BATCH (N=3000000) --> time_used=3482 ms, qps=861573 -BATCH (N=3000000) --> time_used=3474 ms, qps=863557 -BATCH (N=3000000) --> time_used=3591 ms, qps=835421 -BATCH (N=3000000) --> time_used=3466 ms, qps=865551 -BATCH (N=3000000) --> time_used=3425 ms, qps=875912 -BATCH (N=3000000) --> time_used=3346 ms, qps=896592 -BATCH (N=3000000) --> time_used=3532 ms, qps=849377 -BATCH (N=3000000) --> time_used=3471 ms, qps=864304 -BATCH (N=3000000) --> time_used=3616 ms, qps=829646 -BATCH (N=3000000) --> time_used=3403 ms, qps=881575 -BATCH (N=3000000) --> time_used=3426 ms, qps=875656 -BATCH (N=3000000) --> time_used=3493 ms, qps=858860 -BATCH (N=3000000) --> time_used=3411 ms, qps=879507 -BATCH (N=3000000) --> time_used=3422 ms, qps=876680 -BATCH (N=3000000) --> time_used=3556 ms, qps=843644 -BATCH (N=3000000) --> time_used=3285 ms, qps=913242 -BATCH (N=3000000) --> time_used=3486 ms, qps=860585 -BATCH (N=3000000) --> time_used=3427 ms, qps=875401 -BATCH (N=3000000) --> time_used=3563 ms, qps=841987 -BATCH (N=3000000) --> time_used=3304 ms, qps=907990 -BATCH (N=3000000) --> time_used=3582 ms, qps=837520 -BATCH (N=3000000) --> time_used=3468 ms, qps=865051 -BATCH (N=3000000) --> time_used=3360 ms, qps=892857 -BATCH (N=3000000) --> time_used=3426 ms, qps=875656 -BATCH (N=3000000) --> time_used=3186 ms, qps=941619 -BATCH (N=3000000) --> time_used=3251 ms, qps=922792 -BATCH (N=3000000) --> time_used=3400 ms, qps=882352 -BATCH (N=3000000) --> time_used=3446 ms, qps=870574 -BATCH (N=3000000) --> time_used=3302 ms, qps=908540 -BATCH (N=3000000) --> time_used=3072 ms, qps=976562 -BATCH (N=3000000) --> time_used=3458 ms, qps=867553 average qps:880462 ALL TESTS PASSED. ``` @@ -115,57 +55,6 @@ ALL TESTS PASSED. #### 内存分配: 自实现内存池 ```bash lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 -Connected to 192.168.10.129:8888 -BATCH (N=3000000) --> time_used=3241 ms, qps=925640 -BATCH (N=3000000) --> time_used=3047 ms, qps=984574 -BATCH (N=3000000) --> time_used=3085 ms, qps=972447 -BATCH (N=3000000) --> time_used=3119 ms, qps=961846 -BATCH (N=3000000) --> time_used=3104 ms, qps=966494 -BATCH (N=3000000) --> time_used=3163 ms, qps=948466 -BATCH (N=3000000) --> time_used=3033 ms, qps=989119 -BATCH (N=3000000) --> time_used=3170 ms, qps=946372 -BATCH (N=3000000) --> time_used=3299 ms, qps=909366 -BATCH (N=3000000) --> time_used=3272 ms, qps=916870 -BATCH (N=3000000) --> time_used=3294 ms, qps=910746 -BATCH (N=3000000) --> time_used=3182 ms, qps=942803 -BATCH (N=3000000) --> time_used=3190 ms, qps=940438 -BATCH (N=3000000) --> time_used=3493 ms, qps=858860 -BATCH (N=3000000) --> time_used=3111 ms, qps=964320 -BATCH (N=3000000) --> time_used=3220 ms, qps=931677 -BATCH (N=3000000) --> time_used=3067 ms, qps=978154 -BATCH (N=3000000) --> time_used=3345 ms, qps=896860 -BATCH (N=3000000) --> time_used=3381 ms, qps=887311 -BATCH (N=3000000) --> time_used=3416 ms, qps=878220 -BATCH (N=3000000) --> time_used=3192 ms, qps=939849 -BATCH (N=3000000) --> time_used=3085 ms, qps=972447 -BATCH (N=3000000) --> time_used=3150 ms, qps=952380 -BATCH (N=3000000) --> time_used=3296 ms, qps=910194 -BATCH (N=3000000) --> time_used=3001 ms, qps=999666 -BATCH (N=3000000) --> time_used=3143 ms, qps=954502 -BATCH (N=3000000) --> time_used=3111 ms, qps=964320 -BATCH (N=3000000) --> time_used=3123 ms, qps=960614 -BATCH (N=3000000) --> time_used=3257 ms, qps=921093 -BATCH (N=3000000) --> time_used=3037 ms, qps=987816 -BATCH (N=3000000) --> time_used=3135 ms, qps=956937 -BATCH (N=3000000) --> time_used=3124 ms, qps=960307 -BATCH (N=3000000) --> time_used=3276 ms, qps=915750 -BATCH (N=3000000) --> time_used=3058 ms, qps=981033 -BATCH (N=3000000) --> time_used=3024 ms, qps=992063 -BATCH (N=3000000) --> time_used=3224 ms, qps=930521 -BATCH (N=3000000) --> time_used=3235 ms, qps=927357 -BATCH (N=3000000) --> time_used=3334 ms, qps=899820 -BATCH (N=3000000) --> time_used=3427 ms, qps=875401 -BATCH (N=3000000) --> time_used=3218 ms, qps=932256 -BATCH (N=3000000) --> time_used=3191 ms, qps=940144 -BATCH (N=3000000) --> time_used=3179 ms, qps=943692 -BATCH (N=3000000) --> time_used=3104 ms, qps=966494 -BATCH (N=3000000) --> time_used=3202 ms, qps=936914 -BATCH (N=3000000) --> time_used=3184 ms, qps=942211 -BATCH (N=3000000) --> time_used=3000 ms, qps=1000000 -BATCH (N=3000000) --> time_used=3280 ms, qps=914634 -BATCH (N=3000000) --> time_used=3141 ms, qps=955109 -BATCH (N=3000000) --> time_used=3198 ms, qps=938086 -BATCH (N=3000000) --> time_used=3126 ms, qps=959692 average qps:942837 ALL TESTS PASSED. ``` @@ -173,57 +62,6 @@ ALL TESTS PASSED. #### 内存分配:jemalloc ```shell lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 3 -Connected to 192.168.10.129:8888 -BATCH (N=3000000) --> time_used=3511 ms, qps=854457 -BATCH (N=3000000) --> time_used=3280 ms, qps=914634 -BATCH (N=3000000) --> time_used=3603 ms, qps=832639 -BATCH (N=3000000) --> time_used=3418 ms, qps=877706 -BATCH (N=3000000) --> time_used=3353 ms, qps=894721 -BATCH (N=3000000) --> time_used=3435 ms, qps=873362 -BATCH (N=3000000) --> time_used=3250 ms, qps=923076 -BATCH (N=3000000) --> time_used=3550 ms, qps=845070 -BATCH (N=3000000) --> time_used=3536 ms, qps=848416 -BATCH (N=3000000) --> time_used=3273 ms, qps=916590 -BATCH (N=3000000) --> time_used=3224 ms, qps=930521 -BATCH (N=3000000) --> time_used=3161 ms, qps=949066 -BATCH (N=3000000) --> time_used=3143 ms, qps=954502 -BATCH (N=3000000) --> time_used=3342 ms, qps=897666 -BATCH (N=3000000) --> time_used=3410 ms, qps=879765 -BATCH (N=3000000) --> time_used=3522 ms, qps=851788 -BATCH (N=3000000) --> time_used=3035 ms, qps=988467 -BATCH (N=3000000) --> time_used=3352 ms, qps=894988 -BATCH (N=3000000) --> time_used=3226 ms, qps=929944 -BATCH (N=3000000) --> time_used=3406 ms, qps=880798 -BATCH (N=3000000) --> time_used=3336 ms, qps=899280 -BATCH (N=3000000) --> time_used=3307 ms, qps=907166 -BATCH (N=3000000) --> time_used=3171 ms, qps=946073 -BATCH (N=3000000) --> time_used=3252 ms, qps=922509 -BATCH (N=3000000) --> time_used=3296 ms, qps=910194 -BATCH (N=3000000) --> time_used=3301 ms, qps=908815 -BATCH (N=3000000) --> time_used=3403 ms, qps=881575 -BATCH (N=3000000) --> time_used=3234 ms, qps=927643 -BATCH (N=3000000) --> time_used=3348 ms, qps=896057 -BATCH (N=3000000) --> time_used=3517 ms, qps=852999 -BATCH (N=3000000) --> time_used=3354 ms, qps=894454 -BATCH (N=3000000) --> time_used=3529 ms, qps=850099 -BATCH (N=3000000) --> time_used=3473 ms, qps=863806 -BATCH (N=3000000) --> time_used=3521 ms, qps=852030 -BATCH (N=3000000) --> time_used=3370 ms, qps=890207 -BATCH (N=3000000) --> time_used=3267 ms, qps=918273 -BATCH (N=3000000) --> time_used=3352 ms, qps=894988 -BATCH (N=3000000) --> time_used=3433 ms, qps=873871 -BATCH (N=3000000) --> time_used=3374 ms, qps=889152 -BATCH (N=3000000) --> time_used=3360 ms, qps=892857 -BATCH (N=3000000) --> time_used=3463 ms, qps=866300 -BATCH (N=3000000) --> time_used=3499 ms, qps=857387 -BATCH (N=3000000) --> time_used=3294 ms, qps=910746 -BATCH (N=3000000) --> time_used=3311 ms, qps=906070 -BATCH (N=3000000) --> time_used=3443 ms, qps=871333 -BATCH (N=3000000) --> time_used=3381 ms, qps=887311 -BATCH (N=3000000) --> time_used=3422 ms, qps=876680 -BATCH (N=3000000) --> time_used=3421 ms, qps=876936 -BATCH (N=3000000) --> time_used=3322 ms, qps=903070 -BATCH (N=3000000) --> time_used=3494 ms, qps=858614 average qps:892493 ALL TESTS PASSED. ``` @@ -240,17 +78,6 @@ ALL TESTS PASSED. ```shell lian@ubuntu:~/share/9.1-kvstore$ ./test-redis/testcase 192.168.10.129 8888 4 -Connected to 192.168.10.129:8888 -BATCH (N=3000000) --> time_used=3500 ms, qps=857142 -BATCH (N=3000000) --> time_used=3322 ms, qps=903070 -BATCH (N=3000000) --> time_used=3424 ms, qps=876168 -BATCH (N=3000000) --> time_used=3483 ms, qps=861326 -BATCH (N=3000000) --> time_used=3421 ms, qps=876936 -BATCH (N=3000000) --> time_used=3519 ms, qps=852514 -BATCH (N=3000000) --> time_used=3597 ms, qps=834028 -BATCH (N=3000000) --> time_used=3504 ms, qps=856164 -BATCH (N=3000000) --> time_used=3281 ms, qps=914355 -BATCH (N=3000000) --> time_used=3446 ms, qps=870574 average qps:870227 ALL TESTS PASSED. ``` diff --git a/config/config.xml b/config/config.xml index 81bdbec..4df1944 100644 --- a/config/config.xml +++ b/config/config.xml @@ -1,14 +1,14 @@ - 192.168.10.129 + 192.168.220.134 8888 master disable - 192.168.10.129 + 192.168.220.134 8888 @@ -18,7 +18,7 @@ - none + none data kvs_oplog.db diff --git a/init.sh b/init.sh deleted file mode 100755 index a0732c0..0000000 --- a/init.sh +++ /dev/null @@ -1,7 +0,0 @@ -rm -rf libbpf-bootstrap/examples/c - -cp -R ebpf/c libbpf-bootstrap/examples - -cd libbpf-bootstrap/examples/c - -make \ No newline at end of file diff --git a/kvstore.c b/kvstore.c index 9823b01..868bdac 100644 --- a/kvstore.c +++ b/kvstore.c @@ -14,11 +14,12 @@ #include #include #include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include #define TIME_COLLECT 0 @@ -43,23 +44,25 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ #include #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) #define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) -int kvs_protocol(struct conn* conn){ +int kvs_protocol(struct conn* conn){ #if TIME_COLLECT == 1 struct timeval func_start; gettimeofday(&func_start, NULL); long total_oplog_us = 0; #endif - if (!conn) return -1; - char *request = conn->rbuffer; - int request_length = conn->rlength; - char *response = conn->wbuffer; - int *response_length = &conn->wlength; - - if (!request || request_length <= 0 || !response || !response_length) return -1; - - int consumed = 0; - int out_len = 0; + if (!conn) return -1; + + size_t request_size = 0; + const uint8_t *request = chain_buffer_linearize(&conn->rbuf, &request_size); + if (!request || request_size == 0) return 0; + if (request_size > (size_t)INT_MAX) return -1; + + int request_length = (int)request_size; + uint8_t response[KVS_MAX_RESPONSE]; + + int consumed = 0; + int out_len = 0; while(consumed < request_length ){ const uint8_t *p = request+consumed; @@ -68,12 +71,11 @@ int kvs_protocol(struct conn* conn){ resp_cmd_t cmd; memset(&cmd, 0, sizeof(cmd)); - int len = resp_parse_one_cmd(p, remain, &cmd); - if(len < 0){ - /* 协议错误:直接返回,已构建的响应仍可写回 */ - *response_length = out_len; - return -1; - } + int len = resp_parse_one_cmd(p, remain, &cmd); + if(len < 0){ + /* 协议错误:直接返回 */ + return -1; + } else if(len == 0){ // 半包 break; @@ -179,17 +181,15 @@ int kvs_protocol(struct conn* conn){ #endif /* 构建响应 */ - int cap = KVS_MAX_RESPONSE - out_len; - if (cap <= 0) { - *response_length = out_len; - return consumed; - } - - int resp_len = resp_build_value(&val, response + out_len, (size_t)cap); - if (resp_len < 0) { - *response_length = out_len; - return consumed; - } + int cap = KVS_MAX_RESPONSE - out_len; + if (cap <= 0) { + return consumed; + } + + int resp_len = resp_build_value(&val, response + out_len, (size_t)cap); + if (resp_len < 0) { + return consumed; + } __completed_cmd(request, consumed, 0); @@ -205,9 +205,14 @@ int kvs_protocol(struct conn* conn){ fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); #endif - *response_length = out_len; - return consumed; -} + if (out_len > 0) { + if (chain_buffer_append(&conn->wbuf, response, (size_t)out_len) < 0) { + return -1; + } + } + + return consumed; +} diff --git a/network/chainbuffer.c b/network/chainbuffer.c new file mode 100644 index 0000000..079dfa9 --- /dev/null +++ b/network/chainbuffer.c @@ -0,0 +1,227 @@ +#include "network/chainbuffer.h" + +#include +#include +#include +#include +#include + +#define CHAINBUFFER_DEFAULT_CHUNK 4096 +#define CHAINBUFFER_MAX_IOV 16 + +struct chain_buffer_node { + struct chain_buffer_node *next; + size_t start; + size_t end; + size_t cap; + uint8_t data[]; +}; + +static chain_buffer_node_t *alloc_node(size_t cap) { + chain_buffer_node_t *node = (chain_buffer_node_t *)malloc(sizeof(*node) + cap); + if (!node) { + return NULL; + } + node->next = NULL; + node->start = 0; + node->end = 0; + node->cap = cap; + return node; +} + +void chain_buffer_init(chain_buffer_t *buf, size_t chunk_size) { + if (!buf) { + return; + } + memset(buf, 0, sizeof(*buf)); + buf->chunk_size = chunk_size ? chunk_size : CHAINBUFFER_DEFAULT_CHUNK; +} + +void chain_buffer_reset(chain_buffer_t *buf) { + if (!buf) { + return; + } + + chain_buffer_node_t *node = buf->head; + while (node) { + chain_buffer_node_t *next = node->next; + free(node); + node = next; + } + + free(buf->linear_cache); + buf->linear_cache = NULL; + buf->linear_cap = 0; + buf->head = NULL; + buf->tail = NULL; + buf->total_len = 0; +} + +size_t chain_buffer_len(const chain_buffer_t *buf) { + return buf ? buf->total_len : 0; +} + +int chain_buffer_append(chain_buffer_t *buf, const void *data, size_t len) { + const uint8_t *src = (const uint8_t *)data; + if (!buf || (!src && len > 0)) { + errno = EINVAL; + return -1; + } + if (len == 0) { + return 0; + } + if (buf->total_len > (size_t)-1 - len) { + errno = EOVERFLOW; + return -1; + } + + size_t remain = len; + while (remain > 0) { + chain_buffer_node_t *tail = buf->tail; + size_t writable = 0; + + if (tail && tail->end < tail->cap) { + writable = tail->cap - tail->end; + } + + if (writable == 0) { + size_t cap = remain > buf->chunk_size ? remain : buf->chunk_size; + chain_buffer_node_t *node = alloc_node(cap); + if (!node) { + errno = ENOMEM; + return -1; + } + + if (buf->tail) { + buf->tail->next = node; + buf->tail = node; + } else { + buf->head = node; + buf->tail = node; + } + tail = node; + writable = tail->cap; + } + + size_t n = remain < writable ? remain : writable; + memcpy(tail->data + tail->end, src, n); + tail->end += n; + src += n; + remain -= n; + buf->total_len += n; + } + + return 0; +} + +size_t chain_buffer_drain(chain_buffer_t *buf, size_t len) { + if (!buf || len == 0 || buf->total_len == 0) { + return 0; + } + + size_t remain = len; + size_t drained = 0; + + while (remain > 0 && buf->head) { + chain_buffer_node_t *node = buf->head; + size_t avail = node->end - node->start; + + if (remain < avail) { + node->start += remain; + buf->total_len -= remain; + drained += remain; + break; + } + + remain -= avail; + drained += avail; + buf->total_len -= avail; + buf->head = node->next; + if (!buf->head) { + buf->tail = NULL; + } + free(node); + } + + return drained; +} + +const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len) { + if (!buf) { + return NULL; + } + + if (out_len) { + *out_len = buf->total_len; + } + + if (buf->total_len == 0) { + return NULL; + } + + if (buf->head == buf->tail && buf->head) { + return buf->head->data + buf->head->start; + } + + if (buf->linear_cap < buf->total_len) { + uint8_t *new_cache = (uint8_t *)realloc(buf->linear_cache, buf->total_len); + if (!new_cache) { + return NULL; + } + buf->linear_cache = new_cache; + buf->linear_cap = buf->total_len; + } + + size_t offset = 0; + for (chain_buffer_node_t *node = buf->head; node; node = node->next) { + size_t avail = node->end - node->start; + if (avail == 0) { + continue; + } + memcpy(buf->linear_cache + offset, node->data + node->start, avail); + offset += avail; + } + + return buf->linear_cache; +} + +ssize_t chain_buffer_send_fd(chain_buffer_t *buf, int fd, int flags) { + if (!buf) { + errno = EINVAL; + return -1; + } + if (buf->total_len == 0 || !buf->head) { + return 0; + } + + struct iovec iov[CHAINBUFFER_MAX_IOV]; + size_t iovcnt = 0; + + for (chain_buffer_node_t *node = buf->head; + node && iovcnt < CHAINBUFFER_MAX_IOV; + node = node->next) { + size_t avail = node->end - node->start; + if (avail == 0) { + continue; + } + iov[iovcnt].iov_base = (void *)(node->data + node->start); + iov[iovcnt].iov_len = avail; + iovcnt++; + } + + if (iovcnt == 0) { + return 0; + } + + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + + ssize_t n = sendmsg(fd, &msg, flags); + if (n > 0) { + chain_buffer_drain(buf, (size_t)n); + } + + return n; +} diff --git a/network/chainbuffer.h b/network/chainbuffer.h new file mode 100644 index 0000000..78e3c3f --- /dev/null +++ b/network/chainbuffer.h @@ -0,0 +1,29 @@ +#ifndef __CHAINBUFFER_H__ +#define __CHAINBUFFER_H__ + +#include +#include +#include + +typedef struct chain_buffer_node chain_buffer_node_t; + +typedef struct chain_buffer_s { + chain_buffer_node_t *head; + chain_buffer_node_t *tail; + size_t total_len; + size_t chunk_size; + uint8_t *linear_cache; + size_t linear_cap; +} chain_buffer_t; + +void chain_buffer_init(chain_buffer_t *buf, size_t chunk_size); +void chain_buffer_reset(chain_buffer_t *buf); + +size_t chain_buffer_len(const chain_buffer_t *buf); +int chain_buffer_append(chain_buffer_t *buf, const void *data, size_t len); +size_t chain_buffer_drain(chain_buffer_t *buf, size_t len); + +const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len); +ssize_t chain_buffer_send_fd(chain_buffer_t *buf, int fd, int flags); + +#endif diff --git a/reactor.c b/reactor.c index 661cbab..e6732f2 100644 --- a/reactor.c +++ b/reactor.c @@ -1,458 +1,615 @@ - - -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "server.h" - - -#define CONNECTION_SIZE 1024 // 1024 * 1024 - -#define MAX_PORTS 20 - -#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) - - - -#if ENABLE_KVSTORE - -// typedef int (*msg_handler)(char *msg, int length, char *response); -// typedef int (*msg_handler)(char *request, int request_length, char *response, int *response_length); -typedef int (*msg_handler)(struct conn* conn); - -static msg_handler kvs_handler; - -extern void cleanup_finished_iouring_tasks(); - - -// 0 need more, -1 error, =1 suc -int kvs_request(struct conn *c) { - // int consumed_out = kvs_handler(c->rbuffer, c->rlength, c->wbuffer, &c->wlength); - int consumed_out = kvs_handler(c); - return consumed_out; -} - -int kvs_response(struct conn *c) { - - - -} - - -#endif - - - -int accept_cb(int fd); -int recv_cb(int fd); -int send_cb(int fd); - - - -int epfd = 0; -struct timeval begin; - -int wakeup_fd = -1; -int timer_fd = -1; - -struct conn conn_list[CONNECTION_SIZE] = {0}; -// fd - - -// 1 add, 0 mod -int set_event(int fd, int event, int flag) { - - if (flag) { // non-zero add - - struct epoll_event ev; - ev.events = event; - ev.data.fd = fd; - epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); - - } else { // zero mod - - struct epoll_event ev; - ev.events = event; - ev.data.fd = fd; - epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); - - } - - -} - - -int event_register(int fd, int event) { - - if (fd < 0) return -1; - - conn_list[fd].fd = fd; - conn_list[fd].r_action.recv_callback = recv_cb; - conn_list[fd].send_callback = send_cb; - - memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH); - conn_list[fd].rlength = 0; - - memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH*2); - conn_list[fd].wlength = 0; - - conn_list[fd].is_stop = 0; - - set_event(fd, event, 1); -} - - -// listenfd(sockfd) --> EPOLLIN --> accept_cb -int accept_cb(int fd) { - - struct sockaddr_in clientaddr; - socklen_t len = sizeof(clientaddr); - - int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len); - if (clientfd < 0) { - printf("accept errno: %d --> %s\n", errno, strerror(errno)); - return -1; - } - - event_register(clientfd, EPOLLIN); // | EPOLLET - - if ((clientfd % 1000) == 0) { - - struct timeval current; - gettimeofday(¤t, NULL); - - int time_used = TIME_SUB_MS(current, begin); - memcpy(&begin, ¤t, sizeof(struct timeval)); - - - //printf("accept finshed: %d, time_used: %d\n", clientfd, time_used); - - } - - return 0; -} - - -int recv_cb(int fd) { - struct conn *c = &conn_list[fd]; - int avail = BUFFER_LENGTH - c->rlength; - // printf("avail: %d\n", avail); - if (avail <= 0) { - // 缓冲满了还没解析出来:协议异常或包过大 - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); - close(fd); - return 0; - } - - int count = recv(fd, c->rbuffer + c->rlength, avail, 0); - if (count == 0) { // disconnect - //printf("client disconnect: %d\n", fd); - - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished - close(fd); - - return 0; - } else if (count < 0) { // - - printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno)); - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); - close(fd); - - return 0; - } - - - c->rlength += count; - //printf("RECV: %s\n", conn_list[fd].rbuffer); - -#if 0 // echo - - conn_list[fd].wlength = conn_list[fd].rlength; - memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength); - - printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer); - -#elif ENABLE_HTTP - - http_request(&conn_list[fd]); - -#elif ENABLE_WEBSOCKET - - ws_request(&conn_list[fd]); - -#elif ENABLE_KVSTORE - int consumed = kvs_request(c); - if(consumed < 0){ - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); - close(fd); - return 0; - } - - // 清理 buffer - if (consumed > 0 && consumed < c->rlength) { - // 有剩余未处理数据,搬移到 buffer 头部 - int left = c->rlength - consumed; - if (left > 0) memmove(c->rbuffer, c->rbuffer + consumed, left); - c->rlength = left; - if (c->wlength > 0) set_event(fd, EPOLLOUT, 0); - return count; - - }else{ - c->rlength = 0; - if(c->wlength > 0) set_event(fd, EPOLLOUT, 0); - return count; - } - -#endif - - - set_event(fd, EPOLLOUT, 0); - - return count; -} - - -int send_cb(int fd) { - -#if ENABLE_HTTP - - http_response(&conn_list[fd]); - -#elif ENABLE_WEBSOCKET - - ws_response(&conn_list[fd]); - -#elif ENABLE_KVSTORE - - kvs_response(&conn_list[fd]); - -#endif - - struct conn *c = &conn_list[fd]; - int sent_total = 0; - - while (c->wlength > 0) { - ssize_t n = send(fd, c->wbuffer, (size_t)c->wlength, MSG_NOSIGNAL); - if (n > 0) { - sent_total += (int)n; - - if (n == c->wlength) { - /* 全部发完 */ - c->wlength = 0; - break; - } - - /* 只发了一部分:把剩余数据搬到 buffer 头部 */ - int left = c->wlength - (int)n; - memmove(c->wbuffer, c->wbuffer + n, (size_t)left); - c->wlength = left; - - /* 不要在这里死循环占用 CPU,交给下一次 EPOLLOUT */ - break; - } - - if (n < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* 暂时发不出去,等下一次可写事件 */ - set_event(fd, EPOLLOUT, 0); - return sent_total; - } - - /* 对端断开 / 其他错误 */ - int e = errno; - - printf("send fd=%d errno=%d %s\n", fd, e, strerror(e)); - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); - close(fd); - return 0; - } - - break; - } - - if (c->wlength > 0) { - /* 还有没发完,继续监听可写 */ - set_event(fd, EPOLLOUT, 0); - } else { - /* 发完了,回到读 */ - set_event(fd, EPOLLIN, 0); - } - - // printf("send_total :%d; remain: %d\n", sent_total, c->wlength); - - return sent_total; -} - -// wakup fd - -int handle_wakeup_fd_cb(int fd) { - uint64_t v; - while (1) { - ssize_t n = read(wakeup_fd, &v, sizeof(v)); - if (n == sizeof(v)) continue; - if (n < 0 && errno == EAGAIN) break; // 已经读空 - break; - } - cleanup_finished_iouring_tasks(); - - return 0; -} - -int init_wakeup_fd(void) { - int wfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - if (wfd < 0) { - printf("eventfd failed: errno=%d %s\n", errno, strerror(errno)); - return -1; - } - - conn_list[wfd].fd = wfd; - conn_list[wfd].r_action.recv_callback = handle_wakeup_fd_cb; - set_event(wfd, EPOLLIN, 1); - - return wfd; -} - -// EPOLLOUT -void sync_wakeup() { - if (wakeup_fd < 0) return; - // set_event(wakeup_fd, EPOLLOUT, 0); - - uint64_t one = 1; - ssize_t n = write(wakeup_fd, &one, sizeof(one)); -} - -// #include "diskuring/diskuring.h" -// extern iouring_ctx_t global_uring_ctx; -// extern void iouring_tick(iouring_ctx_t *ctx); - -// 定时器 -int handle_timer_fd_cb(int fd){ - uint64_t v; - while (1) { - ssize_t n = read(fd, &v, sizeof(v)); - if (n == sizeof(v)) { - continue; - } - if (n < 0 && errno == EAGAIN) break; - break; - } - // iouring_tick(&global_uring_ctx); -} - -int init_timer_fd(void){ - int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - - struct itimerspec its = { - .it_interval = {0, 100 * 1000 * 1000}, // 100ms = 100,000,000 纳秒 - .it_value = {0, 100 * 1000 * 1000}, // 首次 100ms 后触发 - }; - timerfd_settime(tfd, 0, &its, NULL); - - conn_list[tfd].fd = tfd; - conn_list[tfd].r_action.recv_callback = handle_timer_fd_cb; - set_event(tfd, EPOLLIN, 1); - - return tfd; -} - -int r_init_server(unsigned short port) { - - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - - int opt = 1; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { - perror("setsockopt"); - close(sockfd); - } - - struct sockaddr_in servaddr; - servaddr.sin_family = AF_INET; - servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 - servaddr.sin_port = htons(port); // 0-1023, - - if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { - printf("bind failed: %s\n", strerror(errno)); - } - - listen(sockfd, 10); - //printf("listen finshed: %d\n", sockfd); // 3 - - return sockfd; - -} - -int reactor_start(unsigned short port, msg_handler handler) { - - //unsigned short port = 2000; - kvs_handler = handler; - - epfd = epoll_create(1); - - wakeup_fd = init_wakeup_fd(); - if(wakeup_fd < 0){ - close(epfd); - return -1; - } - - timer_fd = init_timer_fd(); - if(timer_fd < 0){ - close(epfd); - close(wakeup_fd); - return -1; - } - - int i = 0; - - for (i = 0;i < MAX_PORTS;i ++) { - - int sockfd = r_init_server(port + i); - - conn_list[sockfd].fd = sockfd; - conn_list[sockfd].r_action.recv_callback = accept_cb; - conn_list[sockfd].is_stop = 0; - - set_event(sockfd, EPOLLIN, 1); - } - - gettimeofday(&begin, NULL); - - while (1) { // mainloop - struct epoll_event events[1024] = {0}; - int nready = epoll_wait(epfd, events, 1024, -1); - // cleanup_finished_iouring_tasks(); - - int i = 0; - for (i = 0;i < nready;i ++) { - - int connfd = events[i].data.fd; - - if (events[i].events & EPOLLIN) { - // printf("connlist:%p, r_action:%p, recv_callaback:%p\n", &conn_list[connfd], &conn_list[connfd].r_action, conn_list[connfd].r_action.recv_callback); - conn_list[connfd].r_action.recv_callback(connfd); - } - - if (events[i].events & EPOLLOUT) { - conn_list[connfd].send_callback(connfd); - } - } - - } - - if (wakeup_fd >= 0) close(wakeup_fd); - if (epfd >= 0) close(epfd); - return 0; -} - - +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "diskuring/diskuring.h" +#include "server.h" + +#define CONNECTION_SIZE 65536 +#define MAX_PORTS 20 +#define RECV_BATCH_BYTES 4096 +#define CHAIN_BUFFER_CHUNK 4096 +#define MAX_CONN_READ_BYTES (32u * 1024u * 1024u) +#define MAX_CONN_WRITE_BYTES (32u * 1024u * 1024u) + +#if ENABLE_KVSTORE +typedef int (*msg_handler)(struct conn *conn); +static msg_handler kvs_handler; + +extern iouring_ctx_t global_uring_ctx; + +int kvs_request(struct conn *c) { + return kvs_handler ? kvs_handler(c) : -1; +} + +int kvs_response(struct conn *c) { + (void)c; + return 0; +} +#endif + +int accept_cb(int fd); +int recv_cb(int fd); +int send_cb(int fd); + +static int epfd = -1; +static int wakeup_fd = -1; +static int timer_fd = -1; +static struct timeval begin; +static struct conn conn_list[CONNECTION_SIZE]; + +static int conn_fd_valid(int fd) { + return fd >= 0 && fd < CONNECTION_SIZE; +} + +static int set_nonblocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + return -1; + } + if ((flags & O_NONBLOCK) != 0) { + return 0; + } + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +static void conn_clear_slot(int fd) { + struct conn *c; + + if (!conn_fd_valid(fd)) { + return; + } + + c = &conn_list[fd]; + chain_buffer_reset(&c->rbuf); + chain_buffer_reset(&c->wbuf); + memset(c, 0, sizeof(*c)); + c->fd = -1; +} + +static void close_conn(int fd) { + if (!conn_fd_valid(fd)) { + return; + } + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); + close(fd); + conn_clear_slot(fd); +} + +static int set_event(int fd, int event, int is_add) { + struct epoll_event ev; + int op = is_add ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + + memset(&ev, 0, sizeof(ev)); + ev.events = (uint32_t)event; + ev.data.fd = fd; + + if (epoll_ctl(epfd, op, fd, &ev) < 0) { + return -1; + } + return 0; +} + +static int update_conn_events(int fd) { + struct conn *c; + int events = EPOLLIN; + + if (!conn_fd_valid(fd)) { + return -1; + } + + c = &conn_list[fd]; + if (chain_buffer_len(&c->wbuf) > 0) { + events |= EPOLLOUT; + } + + return set_event(fd, events, 0); +} + +int event_register(int fd, int event) { + struct conn *c; + + if (!conn_fd_valid(fd)) { + if (fd >= 0) { + close(fd); + } + return -1; + } + + if (set_nonblocking(fd) < 0) { + close(fd); + return -1; + } + + conn_clear_slot(fd); + + c = &conn_list[fd]; + c->fd = fd; + c->r_action.recv_callback = recv_cb; + c->send_callback = send_cb; + c->is_stop = 0; + chain_buffer_init(&c->rbuf, CHAIN_BUFFER_CHUNK); + chain_buffer_init(&c->wbuf, CHAIN_BUFFER_CHUNK); + + if (set_event(fd, event, 1) < 0) { + close_conn(fd); + return -1; + } + return 0; +} + +int accept_cb(int fd) { + while (1) { + struct sockaddr_in clientaddr; + socklen_t len = sizeof(clientaddr); + int clientfd = accept4(fd, (struct sockaddr *)&clientaddr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC); + + if (clientfd < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + if (errno == EINTR) { + continue; + } + printf("accept errno: %d --> %s\n", errno, strerror(errno)); + return -1; + } + + if (!conn_fd_valid(clientfd)) { + printf("drop client fd=%d, out of conn_list range\n", clientfd); + close(clientfd); + continue; + } + + if (event_register(clientfd, EPOLLIN) < 0) { + continue; + } + + if ((clientfd % 1000) == 0) { + struct timeval current; + int time_used; + + gettimeofday(¤t, NULL); + time_used = (int)((current.tv_sec - begin.tv_sec) * 1000 + + (current.tv_usec - begin.tv_usec) / 1000); + begin = current; + (void)time_used; + } + } +} + +int recv_cb(int fd) { + struct conn *c; + int total = 0; + + if (!conn_fd_valid(fd)) { + return -1; + } + + c = &conn_list[fd]; + while (1) { + uint8_t tmp[RECV_BATCH_BYTES]; + ssize_t n = recv(fd, tmp, sizeof(tmp), 0); + + if (n > 0) { + size_t cur_len = chain_buffer_len(&c->rbuf); + if (cur_len > MAX_CONN_READ_BYTES - (size_t)n) { + printf("fd=%d read buffer overflow, close connection\n", fd); + close_conn(fd); + return 0; + } + if (chain_buffer_append(&c->rbuf, tmp, (size_t)n) < 0) { + printf("fd=%d append read buffer failed: %s\n", fd, strerror(errno)); + close_conn(fd); + return 0; + } + total += (int)n; + continue; + } + + if (n == 0) { + close_conn(fd); + return 0; + } + + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + + printf("recv fd=%d errno=%d, %s\n", fd, errno, strerror(errno)); + close_conn(fd); + return 0; + } + + if (total <= 0) { + return 0; + } + +#if ENABLE_HTTP + http_request(c); +#elif ENABLE_WEBSOCKET + ws_request(c); +#elif ENABLE_KVSTORE + { + int consumed = kvs_request(c); + size_t readable = chain_buffer_len(&c->rbuf); + + if (consumed < 0) { + close_conn(fd); + return 0; + } + + if ((size_t)consumed > readable) { + printf("fd=%d invalid consumed=%d readable=%zu\n", fd, consumed, readable); + close_conn(fd); + return 0; + } + + if (consumed > 0) { + chain_buffer_drain(&c->rbuf, (size_t)consumed); + } + + if (chain_buffer_len(&c->wbuf) > MAX_CONN_WRITE_BYTES) { + printf("fd=%d write buffer overflow, close connection\n", fd); + close_conn(fd); + return 0; + } + } +#endif + + if (update_conn_events(fd) < 0) { + close_conn(fd); + return 0; + } + + return total; +} + +int send_cb(int fd) { + struct conn *c; + int sent_total = 0; + + if (!conn_fd_valid(fd)) { + return -1; + } + c = &conn_list[fd]; + +#if ENABLE_HTTP + http_response(c); +#elif ENABLE_WEBSOCKET + ws_response(c); +#elif ENABLE_KVSTORE + kvs_response(c); +#endif + + while (chain_buffer_len(&c->wbuf) > 0) { + ssize_t n = chain_buffer_send_fd(&c->wbuf, fd, MSG_NOSIGNAL); + if (n > 0) { + sent_total += (int)n; + continue; + } + + if (n == 0) { + break; + } + + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + + printf("send fd=%d errno=%d %s\n", fd, errno, strerror(errno)); + close_conn(fd); + return 0; + } + + if (update_conn_events(fd) < 0) { + close_conn(fd); + return 0; + } + + return sent_total; +} + +int handle_wakeup_fd_cb(int fd) { + uint64_t v; + + while (1) { + ssize_t n = read(fd, &v, sizeof(v)); + if (n == (ssize_t)sizeof(v)) { + continue; + } + if (n < 0 && errno == EINTR) { + continue; + } + if (n < 0 && errno == EAGAIN) { + break; + } + break; + } + + cleanup_finished_iouring_tasks(&global_uring_ctx); + return 0; +} + +int init_wakeup_fd(void) { + int wfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + struct conn *c; + + if (wfd < 0) { + printf("eventfd failed: errno=%d %s\n", errno, strerror(errno)); + return -1; + } + if (!conn_fd_valid(wfd)) { + close(wfd); + return -1; + } + + conn_clear_slot(wfd); + c = &conn_list[wfd]; + c->fd = wfd; + c->r_action.recv_callback = handle_wakeup_fd_cb; + + if (set_event(wfd, EPOLLIN, 1) < 0) { + close_conn(wfd); + return -1; + } + return wfd; +} + +void sync_wakeup() { + uint64_t one = 1; + ssize_t n; + + if (wakeup_fd < 0) { + return; + } + + while (1) { + n = write(wakeup_fd, &one, sizeof(one)); + if (n == (ssize_t)sizeof(one)) { + return; + } + if (n < 0 && errno == EINTR) { + continue; + } + if (n < 0 && errno == EAGAIN) { + return; + } + return; + } +} + +int handle_timer_fd_cb(int fd) { + uint64_t v; + + while (1) { + ssize_t n = read(fd, &v, sizeof(v)); + if (n == (ssize_t)sizeof(v)) { + continue; + } + if (n < 0 && errno == EINTR) { + continue; + } + if (n < 0 && errno == EAGAIN) { + break; + } + break; + } + return 0; +} + +int init_timer_fd(void) { + int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + struct itimerspec its; + struct conn *c; + + if (tfd < 0) { + printf("timerfd_create failed: errno=%d %s\n", errno, strerror(errno)); + return -1; + } + if (!conn_fd_valid(tfd)) { + close(tfd); + return -1; + } + + memset(&its, 0, sizeof(its)); + its.it_interval.tv_nsec = 100 * 1000 * 1000; + its.it_value.tv_nsec = 100 * 1000 * 1000; + + if (timerfd_settime(tfd, 0, &its, NULL) < 0) { + close(tfd); + return -1; + } + + conn_clear_slot(tfd); + c = &conn_list[tfd]; + c->fd = tfd; + c->r_action.recv_callback = handle_timer_fd_cb; + + if (set_event(tfd, EPOLLIN, 1) < 0) { + close_conn(tfd); + return -1; + } + return tfd; +} + +int r_init_server(unsigned short port) { + int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + int opt = 1; + struct sockaddr_in servaddr; + + if (sockfd < 0) { + return -1; + } + + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + close(sockfd); + return -1; + } + + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(port); + + if (bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { + printf("bind failed on port %u: %s\n", port, strerror(errno)); + close(sockfd); + return -1; + } + + if (listen(sockfd, 128) < 0) { + close(sockfd); + return -1; + } + + return sockfd; +} + +int reactor_start(unsigned short port, msg_handler handler) { + int listen_fds[MAX_PORTS]; + int listen_count = 0; + int i; + + if (!handler) { + return -1; + } + + for (i = 0; i < CONNECTION_SIZE; i++) { + conn_list[i].fd = -1; + } + + kvs_handler = handler; + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0) { + return -1; + } + + wakeup_fd = init_wakeup_fd(); + if (wakeup_fd < 0) { + close(epfd); + epfd = -1; + return -1; + } + + timer_fd = init_timer_fd(); + if (timer_fd < 0) { + close_conn(wakeup_fd); + close(epfd); + wakeup_fd = -1; + epfd = -1; + return -1; + } + + for (i = 0; i < MAX_PORTS; i++) { + int sockfd = r_init_server((unsigned short)(port + i)); + struct conn *c; + if (sockfd < 0) { + continue; + } + if (!conn_fd_valid(sockfd)) { + close(sockfd); + continue; + } + + conn_clear_slot(sockfd); + c = &conn_list[sockfd]; + c->fd = sockfd; + c->r_action.recv_callback = accept_cb; + c->is_stop = 0; + + if (set_event(sockfd, EPOLLIN, 1) < 0) { + close_conn(sockfd); + continue; + } + + listen_fds[listen_count++] = sockfd; + } + + if (listen_count == 0) { + close_conn(timer_fd); + close_conn(wakeup_fd); + close(epfd); + timer_fd = -1; + wakeup_fd = -1; + epfd = -1; + return -1; + } + + gettimeofday(&begin, NULL); + + while (1) { + struct epoll_event events[1024]; + int nready = epoll_wait(epfd, events, 1024, -1); + + if (nready < 0) { + if (errno == EINTR) { + continue; + } + break; + } + + for (i = 0; i < nready; i++) { + int connfd = events[i].data.fd; + uint32_t ev = events[i].events; + + if (!conn_fd_valid(connfd)) { + continue; + } + + if ((ev & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) && + conn_list[connfd].r_action.recv_callback == recv_cb) { + close_conn(connfd); + continue; + } + + if ((ev & EPOLLIN) && conn_list[connfd].r_action.recv_callback) { + conn_list[connfd].r_action.recv_callback(connfd); + } + + if (!conn_fd_valid(connfd) || conn_list[connfd].fd < 0) { + continue; + } + + if ((ev & EPOLLOUT) && conn_list[connfd].send_callback) { + conn_list[connfd].send_callback(connfd); + } + } + } + + for (i = 0; i < listen_count; i++) { + close_conn(listen_fds[i]); + } + if (timer_fd >= 0) { + close_conn(timer_fd); + } + if (wakeup_fd >= 0) { + close_conn(wakeup_fd); + } + if (epfd >= 0) { + close(epfd); + } + + timer_fd = -1; + wakeup_fd = -1; + epfd = -1; + return 0; +} diff --git a/server.h b/server.h index 40d5f10..372d2ed 100644 --- a/server.h +++ b/server.h @@ -2,12 +2,13 @@ -#ifndef __SERVER_H__ -#define __SERVER_H__ - -#include - -#define BUFFER_LENGTH 4096 +#ifndef __SERVER_H__ +#define __SERVER_H__ + +#include +#include "network/chainbuffer.h" + +#define BUFFER_LENGTH 4096 #define ENABLE_HTTP 0 #define ENABLE_WEBSOCKET 0 @@ -17,16 +18,13 @@ typedef int (*RCALLBACK)(int fd); -struct conn { - int fd; - - char rbuffer[BUFFER_LENGTH]; - int rlength; - - char wbuffer[BUFFER_LENGTH*2]; - int wlength; - - RCALLBACK send_callback; +struct conn { + int fd; + + chain_buffer_t rbuf; + chain_buffer_t wbuf; + + RCALLBACK send_callback; union { RCALLBACK recv_callback;