diff --git a/.gitignore b/.gitignore index 400a076..c2c677a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,8 @@ NtyCo/ .vscode/ *.db *.copy +*.o +kvstore +testcase proactor copy.c diff --git a/Makefile b/Makefile index baa35bf..039b5c3 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC = gcc FLAGS = -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -luring -ldl -ljemalloc # SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array.c kvs_rbtree.c kvs_hash.c kvs_rw_tools.c -SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_cmd_log.c ./mem_pool/mem_pool.c +SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array_bin.c kvs_rbtree_bin.c kvs_hash_bin.c kvs_rw_tools.c kvs_cmd_log.c ./mem_pool/mem_pool.c kvs_slave.c TESTCASE_SRCS = testcase.c TARGET = kvstore SUBDIR = ./NtyCo/ diff --git a/kvs_rw_tools.h b/kvs_rw_tools.h index 8e9c70a..b9dbfcb 100644 --- a/kvs_rw_tools.h +++ b/kvs_rw_tools.h @@ -62,6 +62,7 @@ typedef enum { KVS_CMD_HMOD, KVS_CMD_HEXIST, + KVS_CMD_PSYNC, KVS_CMD_SAVE, KVS_CMD_COUNT, }kvs_cmd_t; diff --git a/kvs_slave.c b/kvs_slave.c new file mode 100644 index 0000000..a1cfece --- /dev/null +++ b/kvs_slave.c @@ -0,0 +1,70 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "server.h" + +static int kvs_write_u8(uint8_t **pp, uint8_t v) { + uint8_t *p = *pp; + *p = v; + *pp = p + 1; + return 0; +} + +static int kvs_write_u32(uint8_t **pp, uint32_t v) { + uint8_t *p = *pp; + uint32_t be = htonl(v); + memcpy(p, &be, 4); + *pp = p + 4; + return 0; +} + + +int try_connect_master(char *ip, int port){ + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(struct sockaddr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip); + + int rt = 1; + while(1){ + int fd = socket(AF_INET, SOCK_STREAM, 0); + if(fd < 0){ + continue; + } + + if(0 == connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))){ + // [OP KVS_CMD_PSYNC=15][ARGC 1][ARGLEN 4][ARG offset] + + char buf[100]; + char *p = buf; + + kvs_write_u8((uint8_t**)&p, 15); + kvs_write_u8((uint8_t**)&p, 1); + + uint64_t len = sizeof(uint64_t); + kvs_write_u32((uint8_t**)&p, len); + + uint64_t offset = 0; + memcpy(p, (void*)&offset, len); + + p += len; + + send(fd, buf, p-buf, 0); + recv(fd, buf, 100, 0); + return fd; + } + + close(fd); + } + + return -1; +} \ No newline at end of file diff --git a/kvstore.c b/kvstore.c index 583f8ca..df1ddf0 100644 --- a/kvstore.c +++ b/kvstore.c @@ -8,6 +8,10 @@ #include #include #include +#include +#include +#include +#include #if ENABLE_ARRAY extern kvs_array_t global_array; @@ -25,6 +29,11 @@ extern kvs_hash_t global_hash; extern mp_pool_t global_mempool; #endif +#define KVS_SERVER_NOT_INIT 0 +#define KVS_SERVER_MASETER 1 +#define KVS_SERVER_SLAVE 2 +int kvs_server_role = KVS_SERVER_NOT_INIT; + int global_cmd_log_fd = -1; const char *command[] = { @@ -185,6 +194,7 @@ void kvs_free_request(kvs_req_t *req) { case KVS_CMD_HGET: case KVS_CMD_HDEL: case KVS_CMD_HEXIST: + case KVS_CMD_PSYNC: if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } break; case KVS_CMD_SAVE: @@ -311,6 +321,10 @@ void kvs_free_request(kvs_req_t *req) { if(ret == 0) rsp_out->status = KVS_STATUS_OK; else rsp_out->status = KVS_STATUS_ERROR; return 0; + case KVS_CMD_PSYNC: + rsp_out->op = req->op; + rsp_out->status = KVS_STATUS_OK; + return 0; default: rsp_out->status = KVS_STATUS_BADREQ; return -1; @@ -380,13 +394,30 @@ int kvs_save_to_file(){ return ret; } +int is_update_cmd(kvs_cmd_t op){ + if(op == KVS_CMD_SET || op == KVS_CMD_RSET || op == KVS_CMD_HSET + || op == KVS_CMD_MOD || op == KVS_CMD_RMOD || op == KVS_CMD_HMOD + || op == KVS_CMD_DEL || op == KVS_CMD_RDEL || op == KVS_CMD_HDEL){ + return 1; + } + return 0; +} /** * input : request request_length * output : response response_length * return : -1 error, =0 半包, 1 成功 */ -int kvs_protocol(char *request, int request_length, char *response, int *response_length){ + + +// int kvs_protocol(char *request, int request_length, char *response, int *response_length){ +int kvs_protocol(struct conn* conn){ + if (!conn) return -1; + char *request = conn->rbuffer; + int request_length = conn->rlength; + char *response = conn->wbuffer; + int *response_length = &conn->wlength; + if (!request || request_length <= 0 || !response || !response_length) return -1; int consumed = 0; int out_len = 0; @@ -430,14 +461,16 @@ int kvs_protocol(char *request, int request_length, char *response, int *respons }else{ // 执行成功,在这里保存到日志中。 if(rsp.status == KVS_STATUS_OK){ - if(req.op != KVS_CMD_GET && req.op != KVS_CMD_EXIST - && req.op != KVS_CMD_RGET && req.op != KVS_CMD_REXIST - && req.op != KVS_CMD_HGET && req.op != KVS_CMD_HEXIST ){ + if(is_update_cmd(req.op)){ kvs_save_cmd_to_logfile(p, len, global_cmd_log_fd); } } } + if(req.op == KVS_CMD_PSYNC){ + build_thread_to_sync(req.args->data, conn); + } + int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len); // 构建响应 <0 构建失败 kvs_free_request(&req); @@ -446,16 +479,147 @@ int kvs_protocol(char *request, int request_length, char *response, int *respons return -1; } - // printf("resp_len:%d\n", resp_len); - // printf("consumed:%d\n", len); out_len += resp_len; consumed += len; } + // slave 暂时不需要回报,或者回一个new_offset + if(conn->is_from_master){ + conn->wlength = 0; + return consumed; + } *response_length = out_len; return consumed; } +extern void sync_wakeup(int fd); +static int g_slavefd = -1; +static uint64_t g_offset = 0; + +static void *sync_thread_main(void *arg) { + struct conn *conn = (struct conn*) arg; + + int logfd = open(KVS_CMD_LOG_FILE, O_RDONLY); + if (logfd < 0) { + printf("open replaylog failed: %s\n", strerror(errno)); + return NULL; + } + + pthread_mutex_lock(&conn->g_sync_lock); + uint64_t off = g_offset; + pthread_mutex_unlock(&conn->g_sync_lock); + + while (1) { + + // 单槽位:等 reactor 发完再填 + pthread_mutex_lock(&conn->g_sync_lock); + int busy = (conn->wlength > 0); + pthread_mutex_unlock(&conn->g_sync_lock); + if (busy) { usleep(10 * 1000); continue; } + + size_t filled = 0; + int records = 0; + + // 试图攒一批 + while (filled < (size_t)KVS_MAX_RESPONSE && records < 128) { + + // 读 len 头 + uint32_t nlen = 0; + ssize_t r = pread(logfd, &nlen, sizeof(nlen), (off_t)off); + + if (r == 0) { + // EOF:文件当前没更多数据 + break; + } + if (r < 0) { + if (errno == EINTR) continue; + printf("pread len error: %s\n", strerror(errno)); + close(logfd); + return NULL; + } + if (r < (ssize_t)sizeof(nlen)) { + // 半截 len:writer 还没写完头 + break; + } + + uint32_t len = ntohl(nlen); + if(len <= 0) { + printf("sync error\n"); + } + + // 这一条放不进本批次,就先发已有的 + if (filled + len > (size_t)KVS_MAX_RESPONSE) { + break; + } + + // 读 payload(cmd) + ssize_t pr = pread(logfd, conn->wbuffer + filled, len, + (off_t)(off + sizeof(nlen))); + + if (pr == 0) { + // payload 还没写到 + break; + } + if (pr < 0) { + if (errno == EINTR) continue; + printf("pread payload error: %s\n", strerror(errno)); + close(logfd); + return NULL; + } + if (pr < (ssize_t)len) { + // 半截 payload:writer 还没写完这一条 + break; + } + + // 成功拿到一条完整记录:推进 + off += sizeof(nlen) + (uint64_t)len; + filled += (size_t)len; + records++; + + } + + if (filled > 0) { + // 提交给 reactor 发送 + pthread_mutex_lock(&conn->g_sync_lock); + conn->wlength = (int)filled; + g_offset = off; + pthread_mutex_unlock(&conn->g_sync_lock); + + // 唤醒 reactor 发 + sync_wakeup(conn->fd); // 或 g_slavefd + continue; + } + + // 没攒到任何完整记录:说明真到末尾/半条记录,等一会儿 + usleep(10*1000); + } + + close(logfd); + return NULL; +} + +void build_thread_to_sync(const uint8_t *offset, struct conn* conn){ + uint64_t off64 = 0; + memcpy(&off64, offset, 8); + + pthread_mutex_lock(&conn->g_sync_lock); + g_slavefd = conn->fd; + g_offset = (uint64_t)off64; + + conn->wlength = 0; + pthread_mutex_unlock(&conn->g_sync_lock); + + pthread_t tid; + + int rc = pthread_create(&tid, NULL, sync_thread_main, conn); + if (rc != 0) { + printf("pthread_create failed: %s\n", strerror(rc)); + return; + } + + pthread_detach(tid); +} + int init_kvengine(void) { #if ENABLE_ARRAY @@ -511,18 +675,39 @@ void dest_memory_pool(void){ #endif } +/** + * ./ksvtore [role] [port] + * ./kvstore master 8888 + * + * ./ksvtore [role] [port] [masterip] [masterport] + * ./kvstore slave 7000 192.168.10.129 8888 + */ int main(int argc, char *argv[]) { - if (argc != 2) return -1; + if (argc < 3) return -1; + + char *role = argv[1]; + int port = atoi(argv[2]); + + char *master_ip = NULL; + int master_port = -1; - int port = atoi(argv[1]); + if(strcmp(role, "master") == 0){ + kvs_server_role = KVS_SERVER_SLAVE; + }else if(strcmp(role, "slave") == 0){ + kvs_server_role = KVS_SERVER_MASETER; + if(argc < 5) return -1; + master_ip = argv[3]; + master_port = atoi(argv[4]); + + } init_memory_pool(); init_kvengine(); #if (NETWORK_SELECT == NETWORK_REACTOR) - reactor_start(port, kvs_protocol); // + reactor_start(port, kvs_protocol, master_ip, master_port); // #elif (NETWORK_SELECT == NETWORK_PROACTOR) proactor_start(port, kvs_protocol); #elif (NETWORK_SELECT == NETWORK_NTYCO) diff --git a/kvstore.h b/kvstore.h index 66f8ad0..cfaa28e 100644 --- a/kvstore.h +++ b/kvstore.h @@ -11,6 +11,7 @@ #include #include #include +#include "server.h" #define NETWORK_REACTOR 0 @@ -38,9 +39,10 @@ // typedef int (*msg_handler)(char *msg, int length, char *response); -typedef int (*msg_handler)(char *request, int request_length, char *response, int *response_length); +// typedef int (*msg_handler)(char *request, int request_length, char *response, int *response_length); +typedef int (*msg_handler)(struct conn* conn); -extern int reactor_start(unsigned short port, msg_handler handler); +extern int reactor_start(unsigned short port, msg_handler handler, const char *m_ip, int m_port); extern int proactor_start(unsigned short port, msg_handler handler); extern int ntyco_start(unsigned short port, msg_handler handler); @@ -50,6 +52,9 @@ extern int kvs_save_cmd_to_logfile(const uint8_t *cmd, size_t len, int logfd); extern int kvs_replay_log(const char *logfile, int logfd); extern int ksv_clear_log(int logfd); +extern int try_connect_master(char *ip, int port); +void build_thread_to_sync(const uint8_t *offset, struct conn* conn); + #if ENABLE_ARRAY #if BIN_SAFE diff --git a/reactor.c b/reactor.c index edfed77..c099693 100644 --- a/reactor.c +++ b/reactor.c @@ -12,7 +12,7 @@ #include #include #include - +#include #include "server.h" @@ -28,14 +28,17 @@ #if ENABLE_KVSTORE // typedef int (*msg_handler)(char *msg, int length, char *response); -typedef int (*msg_handler)(char *request, int request_length, char *response, int *response_length); +// typedef int (*msg_handler)(char *request, int request_length, char *response, int *response_length); +typedef int (*msg_handler)(struct conn* conn); +extern int try_connect_master(char *ip, int port); static msg_handler kvs_handler; // 0 need more, -1 error, =1 suc int kvs_request(struct conn *c) { - int consumed_out = kvs_handler(c->rbuffer, c->rlength, c->wbuffer, &c->wlength); + // int consumed_out = kvs_handler(c->rbuffer, c->rlength, c->wbuffer, &c->wlength); + int consumed_out = kvs_handler(c); return consumed_out; } @@ -59,12 +62,14 @@ int send_cb(int fd); int epfd = 0; struct timeval begin; +int wakeup_fd = -1; struct conn conn_list[CONNECTION_SIZE] = {0}; // fd +// 1 add, 0 mod int set_event(int fd, int event, int flag) { if (flag) { // non-zero add @@ -101,6 +106,8 @@ int event_register(int fd, int event) { memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH); conn_list[fd].wlength = 0; + conn_list[fd].is_from_master = 0; + set_event(fd, event, 1); } @@ -250,9 +257,17 @@ int send_cb(int fd) { } #else // printf("wlength: %d\n", conn_list[fd].wlength); + + pthread_mutex_lock(&conn_list[fd].g_sync_lock); if (conn_list[fd].wlength != 0) { + // for(int i = 0;i < conn_list[fd].wlength; ++i){ + // printf("%02x", conn_list[fd].wbuffer[i]); + // } + // printf("\n"); count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); + conn_list[fd].wlength = 0; } + pthread_mutex_unlock(&conn_list[fd].g_sync_lock); set_event(fd, EPOLLIN, 0); @@ -262,7 +277,43 @@ int send_cb(int fd) { return count; } +// wakup fd +int handle_wakeup_fd_cb(int fd); + +int init_wakeup_fd(void) { + wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (wakeup_fd < 0) { + printf("eventfd failed: errno=%d %s\n", errno, strerror(errno)); + return -1; + } + + conn_list[wakeup_fd].fd = wakeup_fd; + conn_list[wakeup_fd].r_action.recv_callback = handle_wakeup_fd_cb; + set_event(wakeup_fd, EPOLLIN, 1); + + return 0; +} + +// EPOLLOUT +void sync_wakeup(int fd) { + if (wakeup_fd < 0) return; + set_event(fd, EPOLLOUT, 0); + + uint64_t one = 1; + ssize_t n = write(wakeup_fd, &one, sizeof(one)); +} + +int handle_wakeup_fd_cb(int fd) { + uint64_t v; + while (1) { + ssize_t n = read(wakeup_fd, &v, sizeof(v)); + if (n == sizeof(v)) continue; + if (n < 0 && errno == EAGAIN) break; // 已经读空 + break; + } + return 0; +} int r_init_server(unsigned short port) { @@ -284,13 +335,25 @@ int r_init_server(unsigned short port) { } -int reactor_start(unsigned short port, msg_handler handler) { +int reactor_start(unsigned short port, msg_handler handler, char *m_ip, int m_port) { //unsigned short port = 2000; kvs_handler = handler; epfd = epoll_create(1); + if(init_wakeup_fd() < 0){ + close(epfd); + return -1; + } + + // slave + if(m_ip != NULL){ + int masterfd = try_connect_master(m_ip, m_port); + event_register(masterfd, EPOLLIN); + conn_list[masterfd].is_from_master = 1; + } + int i = 0; for (i = 0;i < MAX_PORTS;i ++) { @@ -299,7 +362,8 @@ int reactor_start(unsigned short port, msg_handler handler) { conn_list[sockfd].fd = sockfd; conn_list[sockfd].r_action.recv_callback = accept_cb; - + conn_list[sockfd].is_from_master = 0; + set_event(sockfd, EPOLLIN, 1); } @@ -335,7 +399,9 @@ int reactor_start(unsigned short port, msg_handler handler) { } - + if (wakeup_fd >= 0) close(wakeup_fd); + if (epfd >= 0) close(epfd); + return 0; } diff --git a/server.h b/server.h index 78e9462..4228dd3 100644 --- a/server.h +++ b/server.h @@ -5,6 +5,8 @@ #ifndef __SERVER_H__ #define __SERVER_H__ +#include + #define BUFFER_LENGTH 4096 #define ENABLE_HTTP 0 @@ -31,6 +33,10 @@ struct conn { RCALLBACK accept_callback; } r_action; + int is_from_master; + + pthread_mutex_t g_sync_lock; + int status; #if 1 // websocket char *payload; diff --git a/test/testcase.c b/test/testcase.c index bb0246f..fa2ecc0 100644 --- a/test/testcase.c +++ b/test/testcase.c @@ -152,7 +152,7 @@ void hash_testcase_1w(int connfd) { int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms - printf("array testcase --> time_used: %d, qps: %d\n", time_used, 9000 * 1000 / time_used); + printf("array testcase --> time_used: %d, qps: %d\n", time_used, 9*count * 1000 / time_used); }