diff --git a/.gitignore b/.gitignore index 5dd3692..22a7c70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ NtyCo/ +.vscode/ + proactor copy.c ntyco copy.c -reactor copy.c \ No newline at end of file +reactor copy.c diff --git a/Makefile b/Makefile index ac0e9d7..a9fe9fb 100644 --- a/Makefile +++ b/Makefile @@ -6,8 +6,8 @@ TESTCASE_SRCS = testcase.c TARGET = kvstore SUBDIR = ./NtyCo/ TESTCASE = testcase -TESTCASE2 = testcase2 -TESTCASE2_SRCS = testcase.c +TESTCASE2 = ./test/testcase +TESTCASE2_SRCS = ./test/testcase.c ./test/test_client.c OBJS = $(SRCS:.c=.o) @@ -22,18 +22,18 @@ ECHO: @echo $(SUBDIR) $(TARGET): $(OBJS) - $(CC) -o $@ $^ $(FLAGS) + $(CC) -g -o $@ $^ $(FLAGS) $(TESTCASE): $(TESTCASE_SRCS) - $(CC) -o $@ $^ + $(CC) -g -o $@ $^ $(TESTCASE2): $(TESTCASE2_SRCS) - $(CC) -o $@ $^ + $(CC) -g -o $@ $^ %.o: %.c - $(CC) $(FLAGS) -c $^ -o $@ + $(CC) $(FLAGS) -c $^ -g -o $@ clean: - rm -rf $(OBJS) $(TARGET) $(TESTCASE) + rm -rf $(OBJS) $(TARGET) $(TESTCASE) $(TESTCASE2) diff --git a/README.md b/README.md index 4cd05bf..983088f 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ save() -> 全保存数据集。 这里需要在协议中定义消息长度而不是特殊字符做分隔来确定哪些数据是key哪些是value。 +还要修改底层,不要用strlen和strcpy,用memcpy。 + ### 面试题 1. 为什么会实现kvstore,使用场景在哪里? diff --git a/kvs_rw_tools.c b/kvs_rw_tools.c index 4cd8345..a324b0d 100644 --- a/kvs_rw_tools.c +++ b/kvs_rw_tools.c @@ -62,195 +62,125 @@ int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) { } // return: -1 fail, 0 half, >0 consumed -int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out) { - if (!msg || length <= 0 || !req_out) return -1; - memset(req_out, 0, sizeof(*req_out)); +int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){ + if (!request || request_length <= 0 || !req_out) return -1; + + req_out->op = 0; + req_out->argc = 0; + req_out->args = NULL; - if(length < KVS_HDR_LEN) return 0; + const uint8_t *p = request; + const uint8_t *end = request + (size_t)request_length; - // header parser - uint32_t magic_be; - memcpy(&magic_be, msg, 4); - uint32_t magic = ntohl(magic_be); - if (magic != KVS_MAGIC) return -1; + // OP + ARGC + if (kvs_need(p, end, 2)) { + return 0; // NEED_MORE + } - // - uint8_t type = *(msg+4); + uint8_t op = 0, argc = 0; + if (kvs_read_u8(&p, end, &op) < 0) return -1; + if (kvs_read_u8(&p, end, &argc) < 0) return -1; - uint32_t payloadlen_be; - memcpy(&payloadlen_be, msg+5, 4); - uint32_t payloadlen = ntohl(payloadlen_be); + if (argc > KVS_MAX_ARGC) return -1; - uint32_t reqId_be; - memcpy(&reqId_be, msg + 9, 4); - uint32_t reqId = ntohl(reqId_be); - - uint8_t flag1 = *(msg + 13); - - uint16_t flag2_be; - memcpy(&flag2_be, msg + 14, 2); - uint16_t flag2 = ntohs(flag2_be); - - uint32_t totalLen = (uint32_t)KVS_HDR_LEN + payloadlen; - if ((uint32_t)length < totalLen) return 0; // 半包 - - - // payload parser - const uint8_t *ptr = msg + KVS_HDR_LEN; - const uint8_t *end = ptr + payloadlen; - - uint32_t opcount = 0; - if(kvs_read_u32(&ptr, end, &opcount) < 0) return -1; - if (opcount > KVS_MAX_OPCOUNT) return -1; - - kvs_op_t *ops = (kvs_op_t *)kvs_malloc(sizeof(kvs_op_t) * opcount); - if (!ops) return -1; - memset(ops, 0, sizeof(kvs_op_t) * opcount); - - // operator parser - for (uint32_t i = 0; i < opcount; i++) { - uint8_t op = 0; - uint32_t argc = 0; - - if (kvs_read_u8(&ptr, end, &op) < 0) goto FAIL; - if (kvs_read_u32(&ptr, end, &argc) < 0) goto FAIL; - if (argc > 16) goto FAIL; - - char **argv = NULL; - if (argc > 0) { - argv = (char **)kvs_malloc(sizeof(char*) * argc); - if (!argv) goto FAIL; - memset(argv, 0, sizeof(char*) * argc); - } - - // operator args parser - for (uint32_t a = 0; a < argc; a++) { - uint32_t arglen = 0; - if (kvs_read_u32(&ptr, end, &arglen) < 0) goto FAIL; - if (kvs_need(ptr, end, arglen) < 0) goto FAIL; - - char *s = (char *)kvs_malloc((size_t)arglen + 1); - if (!s) goto FAIL; - - memcpy(s, ptr, arglen); - s[arglen] = '\0'; - argv[a] = s; - ptr += arglen; - } - - ops[i].op = op; - ops[i].argc = argc; - ops[i].argv = argv; - } - - req_out->magic = magic; - req_out->type = type; - req_out->payloadLen = payloadlen; - req_out->reqId = reqId; - req_out->flag1 = flag1; - req_out->flag2 = flag2; - req_out->opcount = opcount; - req_out->ops = ops; - req_out->consumed = totalLen; - - return (int)totalLen; - -FAIL: - if (ops) { - for (uint32_t i = 0; i < opcount; i++) { - if (ops[i].argv) { - for (uint32_t a = 0; a < ops[i].argc; a++) { - if (ops[i].argv[a]) kvs_free(ops[i].argv[a]); - } - kvs_free(ops[i].argv); + // 先扫描一遍确认整条命令数据都在 buffer 里 + const uint8_t *scan = p; + uint32_t lens[KVS_MAX_ARGC]; + if (argc > 0) { + for (uint8_t i = 0; i < argc; i++) { + if (kvs_need(scan, end, 4)) { + return 0; // NEED_MORE } - } - kvs_free(ops); - } - return -1; -} + uint32_t alen = 0; + if (kvs_read_u32(&scan, end, &alen) < 0) return -1; -void kvs_free_request(kvs_request_t *req) { - if (!req) return; - if (req->ops) { - for (uint32_t i = 0; i < req->opcount; i++) { - if (req->ops[i].argv) { - for (uint32_t a = 0; a < req->ops[i].argc; a++) { - if (req->ops[i].argv[a]) kvs_free(req->ops[i].argv[a]); - } - kvs_free(req->ops[i].argv); + // 防御:单个参数长度限制 + if (alen > KVS_MAX_ARGLEN) return -1; + + // 防御:scan + alen 越界 / 半包 + if (kvs_need(scan, end, (size_t)alen)) { + return 0; // NEED_MORE } - } - kvs_free(req->ops); + lens[i] = alen; + scan += alen; + } } - memset(req, 0, sizeof(*req)); + + size_t total_len = (size_t)(scan - request); + if (total_len > KVS_MAX_CMD_BYTES) return -1; + + req_out->op = op; + req_out->argc = argc; + + if (argc == 0) { + return (int)total_len; + } + + kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t)); + if (!args) { + kvs_free_request(req_out); + return -1; + } + memset(args, 0, (size_t)argc * sizeof(kvs_arg_t)); + + for (uint8_t i = 0; i < argc; i++) { + uint32_t alen = 0; + if (kvs_read_u32(&p, end, &alen) < 0) { + kvs_free(args); + kvs_free_request(req_out); + return -1; + } + + // alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常 + if (alen != lens[i]) { + kvs_free(args); + kvs_free_request(req_out); + return -1; + } + + args[i].len = alen; + args[i].data = p; // 直接指向输入 buffer(零拷贝) + p += alen; + } + + + req_out->args = args; + + return (int)(p - request); } -int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results /* size opcount */) { - if (!req || !results) return -1; - - for (uint32_t i = 0; i < req->opcount; i++) { - uint8_t status = KVS_STATUS_ERROR; - const char *data = NULL; - uint32_t dlen = 0; - - kvs_exec_one_op(req->ops[i].op, req->ops[i].argc, req->ops[i].argv, - &status, &data, &dlen); - - results[i].status = status; - results[i].datalen = dlen; - results[i].data_ptr = data; - } - return 0; +void kvs_free_request(kvs_req_t *req) { + if (!req) return; + if (req->args) { + kvs_free(req->args); + req->args = NULL; + } + req->op = 0; + req->argc = 0; } +int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){ + if (!results || !response) return -1; - int kvs_build_response(const kvs_request_t *req, - const kvs_response_t *results, - uint8_t *response, int response_cap) { - if (!req || !results || !response || response_cap < KVS_HDR_LEN) return -1; + const uint8_t *end = response + response_cap; + uint8_t *p = response; - uint8_t *out = response; - uint8_t *w = out + KVS_HDR_LEN; - const uint8_t *w_end = out + response_cap; + // 计算所需长度:1 + 1 + 4 + dlen + // 注意防止 size_t 溢出 + size_t need = 1u + 1u + 4u + (size_t)results->dlen; + if (need > response_cap) return -1; - // payload: opcount - if (kvs_write_u32(&w, w_end, req->opcount) < 0) return -1; + if (kvs_write_u8(&p, end, results->op) < 0) return -1; + if (kvs_write_u8(&p, end, results->status) < 0) return -1; + if (kvs_write_u32(&p, end, results->dlen) < 0) return -1; - for (uint32_t i = 0; i < req->opcount; i++) { - if (kvs_write_u8(&w, w_end, results[i].status) < 0) return -1; - if (kvs_write_u32(&w, w_end, results[i].datalen) < 0) return -1; + if (results->dlen > 0) { + if (!results->data) return -1; // 有长度却没指针,视为错误 + if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1; + memcpy(p, results->data, results->dlen); + p += results->dlen; + } - if (results[i].datalen > 0) { - if (kvs_need(w, w_end, results[i].datalen) < 0) return -1; - memcpy(w, results[i].data_ptr, results[i].datalen); - w += results[i].datalen; - } - } - - uint32_t respPayloadLen = (uint32_t)(w - (out + KVS_HDR_LEN)); - uint32_t total = KVS_HDR_LEN + respPayloadLen; - - // header - { - uint32_t be = htonl(KVS_MAGIC); - memcpy(out + 0, &be, 4); - } - out[4] = (uint8_t)KVS_TYPE_RESP; - { - uint32_t be = htonl(respPayloadLen); - memcpy(out + 5, &be, 4); - } - { - uint32_t be = htonl(req->reqId); - memcpy(out + 9, &be, 4); - } - out[13] = req->flag1; - { - uint16_t be = htons(req->flag2); - memcpy(out + 14, &be, 2); - } - - return (int)total; + return (int)(p - response); } diff --git a/kvs_rw_tools.h b/kvs_rw_tools.h index 90dd3ac..ef895cd 100644 --- a/kvs_rw_tools.h +++ b/kvs_rw_tools.h @@ -14,26 +14,20 @@ int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v); -/** - * Header: | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | - * + /** * Request - * Payload: | opcount(4) | repeat Cmd | - * Cmd: | OP(1) | argc(4) | repeat Arg | - * Arg: | arglen(4) | arg | + * Cmd: | OP(1) | argc(1) | repeat { arglen(4) | arg } | * * Response - * Payload: | opcount(4) | repeat Cmd | - * Cmd: | status(1) | datalen(4) | data | + * Rsp: | OP(1) | status(1) | datalen(4) | data | */ -#define KVS_HDR_LEN 16 // | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | -#define KVS_MAGIC 0x4B565331u -#define KVS_TYPE_REQ 1 -#define KVS_TYPE_RESP 2 - -#define KVS_MAX_OPCOUNT 1024 -#define KVS_MAX_RESPONSE (64 * 1024) +#define KVS_MAX_CMDS_PER_CALL 64 +// 1MB +#define KVS_MAX_RESPONSE (1024u * 1024u) +#define KVS_MAX_ARGC 64 +#define KVS_MAX_ARGLEN (1024u * 1024u) +#define KVS_MAX_CMD_BYTES (4u * 1024u * 1024u) enum { KVS_STATUS_OK = 0, @@ -43,43 +37,34 @@ enum { KVS_STATUS_BADREQ = 4 }; +typedef enum { + KVS_OK = 1, + KVS_NEED_MORE = 0, + KVS_ERROR = -1 +}kvs_rc_t; -typedef struct { - uint8_t op; - uint32_t argc; - char **argv; -} kvs_op_t; +typedef struct kvs_arg_s{ + uint32_t len; + const uint8_t *data; +} kvs_arg_t; -typedef struct { - uint32_t magic; - uint8_t type; - uint32_t payloadLen; - uint32_t reqId; - uint8_t flag1; - uint16_t flag2; +typedef struct kvs_req_s{ + uint8_t op; + uint8_t argc; + kvs_arg_t *args; +}kvs_req_t; - // payload ops - uint32_t opcount; - kvs_op_t *ops; - - // consumed bytes (header+payload) - uint32_t consumed; -} kvs_request_t; - -typedef struct { +typedef struct kvs_rsp_s{ + uint8_t op; uint8_t status; - uint32_t datalen; - const char *data_ptr; -} kvs_response_t; + uint32_t dlen; + const uint8_t *data; +} kvs_rsp_t; + +int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out); +void kvs_free_request(kvs_req_t *req); +int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out); +int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap); -int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out); -int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results); -int kvs_build_response(const kvs_request_t *req, - const kvs_response_t *results, - uint8_t *response, int response_cap); -void kvs_free_request(kvs_request_t *req); -void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv, - uint8_t *status_out, - const char **data_out, uint32_t *dlen_out); #endif diff --git a/kvstore.c b/kvstore.c index 7554d7e..a887e42 100644 --- a/kvstore.c +++ b/kvstore.c @@ -263,18 +263,22 @@ int kvs_filter_protocol(char **tokens, int count, char *response) { } /** - * 输入:op argc argv - * 输出 status + * 输入:req + * 输出:rsp */ - void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv, - uint8_t *status_out, - const char **data_out, uint32_t *dlen_out) { - *status_out = KVS_STATUS_ERROR; - *data_out = NULL; - *dlen_out = 0; + int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) { + if(!req || !rsp_out) return -1; + rsp_out->op = req->op; + rsp_out->status = KVS_STATUS_ERROR; + rsp_out->data = NULL; + rsp_out->dlen = 0; - const char *key = (argc >= 1) ? argv[0] : NULL; - const char *val = (argc >= 2) ? argv[1] : NULL; + int argc = req->argc; + int op = req->op; + kvs_arg_t *argv = req->args; + + const char *key = (argc >= 1) ? argv[0].data : NULL; + const char *val = (argc >= 2) ? argv[1].data : NULL; // 基本参数校验(按你原有命令语义) switch (op) { @@ -284,7 +288,7 @@ int kvs_filter_protocol(char **tokens, int count, char *response) { case KVS_CMD_RMOD: case KVS_CMD_HSET: case KVS_CMD_HMOD: - if (argc != 2 || !key || !val) { *status_out = KVS_STATUS_BADREQ; return; } + if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } break; case KVS_CMD_GET: case KVS_CMD_DEL: @@ -295,11 +299,11 @@ int kvs_filter_protocol(char **tokens, int count, char *response) { case KVS_CMD_HGET: case KVS_CMD_HDEL: case KVS_CMD_HEXIST: - if (argc != 1 || !key) { *status_out = KVS_STATUS_BADREQ; return; } + if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; } break; default: - *status_out = KVS_STATUS_BADREQ; - return; + rsp_out->status = KVS_STATUS_BADREQ; + return -1; } int ret = 0; @@ -309,147 +313,184 @@ int kvs_filter_protocol(char **tokens, int count, char *response) { #if ENABLE_ARRAY case KVS_CMD_SET: ret = kvs_array_set(&global_array, (char*)key, (char*)val); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_EXIST; + return 0; case KVS_CMD_GET: result = kvs_array_get(&global_array, (char*)key); - if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } - *status_out = KVS_STATUS_OK; - *data_out = result; - *dlen_out = (uint32_t)strlen(result); - return; + if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } + rsp_out->status = KVS_STATUS_OK; + rsp_out->data = result; + rsp_out->dlen = (uint32_t)strlen(result); + return 0; case KVS_CMD_DEL: ret = kvs_array_del(&global_array, (char*)key); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_NO_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_NO_EXIST; + return 0; case KVS_CMD_MOD: ret = kvs_array_mod(&global_array, (char*)key, (char*)val); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_NO_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_NO_EXIST; + return 0; case KVS_CMD_EXIST: ret = kvs_array_exist(&global_array, (char*)key); - *status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; - return; + rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; + return 0; #endif #if ENABLE_RBTREE case KVS_CMD_RSET: ret = kvs_rbtree_set(&global_rbtree, (char*)key, (char*)val); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_EXIST; + return 0; case KVS_CMD_RGET: result = kvs_rbtree_get(&global_rbtree, (char*)key); - if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } - *status_out = KVS_STATUS_OK; - *data_out = result; - *dlen_out = (uint32_t)strlen(result); - return; + if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } + rsp_out->status = KVS_STATUS_OK; + rsp_out->data = result; + rsp_out->dlen = (uint32_t)strlen(result); + return 0; case KVS_CMD_RDEL: ret = kvs_rbtree_del(&global_rbtree, (char*)key); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_NO_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_NO_EXIST; + return 0; case KVS_CMD_RMOD: ret = kvs_rbtree_mod(&global_rbtree, (char*)key, (char*)val); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_NO_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_NO_EXIST; + return 0; case KVS_CMD_REXIST: ret = kvs_rbtree_exist(&global_rbtree, (char*)key); - *status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; - return; + rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; + return 0; #endif #if ENABLE_HASH case KVS_CMD_HSET: ret = kvs_hash_set(&global_hash, (char*)key, (char*)val); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_EXIST; + return 0; case KVS_CMD_HGET: result = kvs_hash_get(&global_hash, (char*)key); - if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } - *status_out = KVS_STATUS_OK; - *data_out = result; - *dlen_out = (uint32_t)strlen(result); - return; + if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; } + rsp_out->status = KVS_STATUS_OK; + rsp_out->data = result; + rsp_out->dlen = (uint32_t)strlen(result); + return 0; case KVS_CMD_HDEL: ret = kvs_hash_del(&global_hash, (char*)key); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_NO_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_NO_EXIST; + return 0; case KVS_CMD_HMOD: ret = kvs_hash_mod(&global_hash, (char*)key, (char*)val); - if (ret < 0) *status_out = KVS_STATUS_ERROR; - else if (ret == 0) *status_out = KVS_STATUS_OK; - else *status_out = KVS_STATUS_NO_EXIST; - return; + if (ret < 0) rsp_out->status = KVS_STATUS_ERROR; + else if (ret == 0) rsp_out->status = KVS_STATUS_OK; + else rsp_out->status = KVS_STATUS_NO_EXIST; + return 0; case KVS_CMD_HEXIST: ret = kvs_hash_exist(&global_hash, (char*)key); - *status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; - return; + rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; + return 0; #endif default: - *status_out = KVS_STATUS_BADREQ; - return; - } -} - -#if NEW_KVSTORE -// int kvs_protocol(char *msg, int length, char *response, int *response_len){ -int kvs_protocol(char *msg, int length, char *response){ - if (!msg || length <= 0 || !response) return 0; - int response_len = 0; - - kvs_request_t req; - int consumed = kvs_parse_request((const uint8_t *)msg, length, &req); - - if (consumed < 0) return 0; // 解析失败 - else if(consumed == 0) return 0; // 半包 - - - kvs_response_t *results = (kvs_response_t *)kvs_malloc(sizeof(kvs_response_t) * req.opcount); - if (!results) { kvs_free_request(&req); return -1; } - memset(results, 0, sizeof(kvs_response_t) * req.opcount); - - if (kvs_execute_request(&req, results) < 0) { - kvs_free(results); - kvs_free_request(&req); + rsp_out->status = KVS_STATUS_BADREQ; return -1; } - int resp_len = kvs_build_response(&req, results, (uint8_t *)response, KVS_MAX_RESPONSE); - kvs_free(results); - kvs_free_request(&req); + return -1; +} - if (resp_len < 0) return 0; // error +#if NEW_KVSTORE +/** + * input : request request_length + * output : response response_length consumed_out + * return : -1 error, =0 半包, 1 成功 + */ +int kvs_protocol(char *request, int request_length, int *consumed_out, char *response, int *response_length){ + if (!request || request_length <= 0 || !consumed_out || !response || !response_length) return KVS_NEED_MORE; + int consumed = 0; + int out_len = 0; + int budget = KVS_MAX_CMDS_PER_CALL; - response_len = resp_len; - return response_len; + while(consumed < request_length && (budget-- > 0)){ + kvs_req_t req; + memset(&req, 0, sizeof(kvs_req_t)); + + const uint8_t *p = request+consumed; + int remain = request_length - consumed; + + int len = kvs_parse_one_cmd(p, remain, &req); + if(len < 0){ + kvs_free_request(&req); + *consumed_out = consumed; + *response_length = out_len; + return KVS_ERROR; + } + else if(len == 0){ + kvs_free_request(&req); + break; + } + + kvs_rsp_t rsp; + memset(&rsp, 0, sizeof(kvs_rsp_t)); + + if (kvs_execute_one_cmd(&req, &rsp) < 0){ + kvs_free_request(&req); + *consumed_out = consumed; + *response_length = out_len; + return KVS_ERROR; + } + + if (out_len >= KVS_MAX_RESPONSE) { + kvs_free_request(&req); + *consumed_out = consumed; + *response_length = out_len; + return KVS_ERROR; + } + + int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len); + kvs_free_request(&req); + if (resp_len < 0) { + *consumed_out = consumed; + *response_length = out_len; + return KVS_ERROR; + } + + // printf("resp_len:%d\n", resp_len); + // printf("consumed:%d\n", len); + out_len += resp_len; + consumed += len; + } + + *consumed_out = consumed; + *response_length = out_len; + if (consumed == 0 && out_len == 0) return KVS_NEED_MORE; + return KVS_OK; } #else /* diff --git a/kvstore.h b/kvstore.h index 55f7cb9..c894e64 100644 --- a/kvstore.h +++ b/kvstore.h @@ -34,8 +34,8 @@ #define NEW_KVSTORE 1 -typedef int (*msg_handler)(char *msg, int length, char *response); - +// typedef int (*msg_handler)(char *msg, int length, char *response); +typedef int (*msg_handler)(char *request, int request_length, int *consumed_out, char *response, int *response_length); extern int reactor_start(unsigned short port, msg_handler handler); extern int proactor_start(unsigned short port, msg_handler handler); diff --git a/reactor.c b/reactor.c index 1f257f3..1bc9e52 100644 --- a/reactor.c +++ b/reactor.c @@ -27,13 +27,15 @@ #if ENABLE_KVSTORE -typedef int (*msg_handler)(char *msg, int length, char *response); +// typedef int (*msg_handler)(char *msg, int length, char *response); +typedef int (*msg_handler)(char *request, int request_length, int *consumed_out, char *response, int *response_length); + static msg_handler kvs_handler; int kvs_request(struct conn *c) { - - c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer); + int consumed_out; + int ret = kvs_handler(c->rbuffer, c->rlength, &consumed_out, c->wbuffer, &c->wlength); } diff --git a/test/test_client.c b/test/test_client.c new file mode 100644 index 0000000..aba1442 --- /dev/null +++ b/test/test_client.c @@ -0,0 +1,221 @@ + +#include "test_client.h" +#include +#include +#include + +int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) { + return (p + n <= end) ? 0 : -1; +} + +// 注意u8类型不需要ntoh或者hton +int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) { + const uint8_t *p = *pp; + if (kvs_need(p, end, 1) < 0) return -1; + *out = *p; + *pp = p + 1; + return 0; +} + +int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out) { + const uint8_t *p = *pp; + if (kvs_need(p, end, 2) < 0) return -1; + uint16_t v; + memcpy(&v, p, 2); + *out = ntohs(v); + *pp = p + 2; + return 0; +} + +int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) { + const uint8_t *p = *pp; + if (kvs_need(p, end, 4) < 0) return -1; + uint32_t v; + memcpy(&v, p, 4); + *out = ntohl(v); + *pp = p + 4; + return 0; +} + +int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v) { + uint8_t *p = *pp; + if (kvs_need(p, end, 1) < 0) return -1; + *p = v; + *pp = p + 1; + return 0; +} + +int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v) { + uint8_t *p = *pp; + if (kvs_need(p, end, 2) < 0) return -1; + uint16_t be = htons(v); + memcpy(p, &be, 2); + *pp = p + 2; + return 0; +} + +int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) { + uint8_t *p = *pp; + if (kvs_need(p, end, 4) < 0) return -1; + uint32_t be = htonl(v); + memcpy(p, &be, 4); + *pp = p + 4; + return 0; +} + + + +int getcmd(uint8_t op, const char *key, const char *value, uint8_t *buf){ + if(!key || !buf) return -1; + uint8_t *end = buf + CMD_SIZE; + uint8_t *p = buf; + uint8_t argc = ((value == NULL)?1 : 2); + + + if (kvs_write_u8(&p, end, op) < 0) return -1; + if (kvs_write_u8(&p, end, argc) < 0) return -1; + + + // 写入 key + int keylen = strlen(key); + if (kvs_write_u32(&p, end, keylen) < 0) return -1; + if (kvs_need(p, end, keylen) < 0) return -1; + if (keylen > 0) { + memcpy(p, key, keylen); + p += keylen; + } + + if(value){ + int vallen = strlen(value); + if (kvs_write_u32(&p, end, vallen) < 0) return -1; + if (kvs_need(p, end, vallen) < 0) return -1; + if (vallen > 0) { + memcpy(p, value, vallen); + p += vallen; + } + } + + return (p - buf); +} + +int parse_response(const uint8_t *buf, int buflen, kvs_response_t *rsp) { + const uint8_t *p = buf; + const uint8_t *end = buf + buflen; + + // 读取 OP + if (kvs_read_u8(&p, end, &rsp->op) < 0) { + fprintf(stderr, "Failed to read op\n"); + return -1; + } + + // 读取 status + if (kvs_read_u8(&p, end, &rsp->status) < 0) { + fprintf(stderr, "Failed to read status\n"); + return -1; + } + + // 读取 datalen + if (kvs_read_u32(&p, end, &rsp->datalen) < 0) { + fprintf(stderr, "Failed to read datalen\n"); + return -1; + } + + // 检查数据长度 + if (kvs_need(p, end, rsp->datalen) < 0) { + fprintf(stderr, "Data length mismatch: expected %u bytes, but only %ld available\n", + rsp->datalen, end - p); + return -1; + } + + // 指向数据部分 + rsp->data = (uint8_t *)p; + + return (p - buf) + rsp->datalen; +} + + +void print_response(const char *cmd_name, const kvs_response_t *rsp) { + printf("\n=== %s Response ===\n", cmd_name); + printf("OP: %u\n", rsp->op); + printf("Status: %u ", rsp->status); + + switch (rsp->status) { + case KVS_STATUS_OK: + printf("(OK)\n"); + break; + case KVS_STATUS_ERROR: + printf("(ERROR)\n"); + break; + case KVS_STATUS_NO_EXIST: + printf("(NO_EXIST)\n"); + break; + case KVS_STATUS_EXIST: + printf("(EXISTS)\n"); + break; + default: + printf("(UNKNOWN)\n"); + break; + } + + printf("Data Length: %u\n", rsp->datalen); + + if (rsp->datalen > 0 && rsp->data != NULL) { + printf("Data: "); + // 尝试以字符串形式打印(如果是可打印字符) + int is_printable = 1; + for (uint32_t i = 0; i < rsp->datalen; i++) { + if (rsp->data[i] < 32 || rsp->data[i] > 126) { + is_printable = 0; + break; + } + } + + if (is_printable) { + printf("\""); + for (uint32_t i = 0; i < rsp->datalen; i++) { + printf("%c", rsp->data[i]); + } + printf("\"\n"); + } else { + // 以十六进制打印 + printf("0x"); + for (uint32_t i = 0; i < rsp->datalen; i++) { + printf("%02x", rsp->data[i]); + } + printf("\n"); + } + } else { + printf("Data: (empty)\n"); + } + printf("==================\n"); +} + +int verify_response(const kvs_response_t *rsp, uint8_t expected_op, + uint8_t expected_status, const char *expected_data) { + if (rsp->op != expected_op) { + printf("❌ OP mismatch: expected %u, got %u\n", expected_op, rsp->op); + return 0; + } + + if (rsp->status != expected_status) { + printf("❌ Status mismatch: expected %u, got %u\n", expected_status, rsp->status); + return 0; + } + + if (expected_data != NULL) { + uint32_t expected_len = strlen(expected_data); + if (rsp->datalen != expected_len) { + printf("❌ Data length mismatch: expected %u, got %u\n", expected_len, rsp->datalen); + return 0; + } + + if (memcmp(rsp->data, expected_data, expected_len) != 0) { + printf("❌ Data content mismatch\n"); + return 0; + } + } + + return 1; +} + + diff --git a/test/test_client.h b/test/test_client.h new file mode 100644 index 0000000..6541dcc --- /dev/null +++ b/test/test_client.h @@ -0,0 +1,154 @@ + + /** + * Request + * Cmd: | OP(1) | argc(1) | repeat { arglen(4) | arg } | + * + * Response + * Rsp: | OP(1) | status(1) | datalen(4) | data | + */ +#include +#include +#include +#include +#include +#include +#include + +#define CMD_SIZE (4096) +#define BATCH_SIZE (65536) +#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) + +#define PRESP print_response + + +typedef enum { + KVS_STATUS_OK = 0, + KVS_STATUS_ERROR = 1, + KVS_STATUS_NO_EXIST = 2, + KVS_STATUS_EXIST = 3, + KVS_STATUS_BADREQ = 4 +}rsp_ret_status_e; + +enum { + KVS_CMD_START = 0, + // array + KVS_CMD_SET = KVS_CMD_START, + KVS_CMD_GET, + KVS_CMD_DEL, + KVS_CMD_MOD, + KVS_CMD_EXIST, + // rbtree + KVS_CMD_RSET, + KVS_CMD_RGET, + KVS_CMD_RDEL, + KVS_CMD_RMOD, + KVS_CMD_REXIST, + // hash + KVS_CMD_HSET, + KVS_CMD_HGET, + KVS_CMD_HDEL, + KVS_CMD_HMOD, + KVS_CMD_HEXIST, + + KVS_CMD_COUNT, +}; + +typedef struct { + uint8_t op; + uint8_t status; + uint32_t datalen; + uint8_t *data; +} kvs_response_t; + +int kvs_need(const uint8_t *p, const uint8_t *end, size_t n); +int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out); +int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out); +int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out); + +int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v); +int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v); +int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v); + +int getcmd(uint8_t op, const char *key, const char *value, uint8_t *buf); + +int parse_response(const uint8_t *buf, int buflen, kvs_response_t *rsp); +void print_response(const char *cmd_name, const kvs_response_t *rsp); +int verify_response(const kvs_response_t *rsp, uint8_t expected_op, + uint8_t expected_status, const char *expected_data); + + + + +#define KVS_BATCH_MAX 64 + +typedef struct { + uint8_t buf[BATCH_SIZE]; + int len; // 当前已写入的 batch 字节数 + int cnt; // 当前 batch 里命令条数 + int cmd_len[KVS_BATCH_MAX]; +} kvs_batch_t; + +static void kvs_batch_init(kvs_batch_t *b) +{ + b->len = 0; + b->cnt = 0; + memset(b->cmd_len, 0, sizeof(b->cmd_len)); +} + +/** + * 用 getcmd() 生成单条命令,然后 append 到 batch buffer + * 返回:0 成功,-1 失败(太多条 or buffer 不够) + */ +static int kvs_batch_add(kvs_batch_t *b, uint8_t op, const char *key, const char *value) +{ + if (b->cnt >= KVS_BATCH_MAX) return -1; + + uint8_t tmp[CMD_SIZE]; + int n = getcmd(op, key, value, tmp); // 你提供的函数 + if (n <= 0) return -1; + + if (b->len + n > (int)sizeof(b->buf)) return -1; + + memcpy(b->buf + b->len, tmp, n); + b->cmd_len[b->cnt] = n; + b->cnt++; + b->len += n; + return 0; +} + +/** + * 一次性发送 batch + * 返回:发送字节数,<0 表示失败 + */ +static int kvs_batch_send(int fd, const kvs_batch_t *b) +{ + return (int)send(fd, b->buf, b->len, 0); +} + +/** + * 一次 recv 收回所有响应,然后批量解析为 rsp 数组 + * + * 返回:成功解析出的响应条数(期望是 b->cnt) + */ +static int kvs_batch_recv_parse(int fd, + const kvs_batch_t *b, + kvs_response_t *rsps, // 输出数组,长度 >= b->cnt + uint8_t *recvbuf, + int recvbuf_cap) +{ + int nrecv = (int)recv(fd, recvbuf, recvbuf_cap, 0); + if (nrecv <= 0) return -1; + + int off = 0; + int parsed = 0; + + while (parsed < b->cnt && off < nrecv) { + int consumed = parse_response(recvbuf + off, nrecv - off, &rsps[parsed]); + if (consumed <= 0) break; // 不够解析/失败,简单处理:直接退出 + + off += consumed; + parsed++; + } + + return parsed; +} \ No newline at end of file diff --git a/test/testcase.c b/test/testcase.c new file mode 100644 index 0000000..836514b --- /dev/null +++ b/test/testcase.c @@ -0,0 +1,143 @@ + +#include "test_client.h" +#include + + +int connect_tcpserver(const char *ip, unsigned short port) { + + int connfd = socket(AF_INET, SOCK_STREAM, 0); + + struct sockaddr_in server_addr; + memset(&server_addr, 0, sizeof(struct sockaddr_in)); + + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = inet_addr(ip); + server_addr.sin_port = htons(port); + + if (0 != connect(connfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr_in))) { + perror("connect"); + return -1; + } + + return connfd; + +} + + +int send_msg(int connfd, char *msg, int length) { + + int res = send(connfd, msg, length, 0); + if (res < 0) { + perror("send"); + exit(1); + } + return res; +} + +int recv_msg(int connfd, char *msg, int length) { + + int res = recv(connfd, msg, length, 0); + if (res < 0) { + perror("recv"); + exit(1); + } + return res; + +} + +void testcase(int connfd, uint8_t op, const char* key, const char* value, rsp_ret_status_e st, const char* rsp_value, const char* command_name){ + uint8_t buf[CMD_SIZE]; + uint8_t result[CMD_SIZE]; + kvs_response_t rsp; + int len, recv_len; + + len = getcmd(op, key, value, buf); + send_msg(connfd, buf, len); + recv_len = recv_msg(connfd, result, CMD_SIZE); + if (parse_response(result, recv_len, &rsp) > 0) { + PRESP(command_name, &rsp); + if(!verify_response(&rsp, op, st, rsp_value)) printf("%s\n", command_name); + }else{ + printf("parser error\n"); + } + + return ; +} + + +void array_testcase_1w(int connfd) { + + int count = 1; + int i = 0; + + struct timeval tv_begin; + gettimeofday(&tv_begin, NULL); + + for (i = 0;i < count;i ++) { + testcase(connfd, KVS_CMD_SET, "nage", "lian", KVS_STATUS_OK, NULL, "SET NAME"); + testcase(connfd, KVS_CMD_GET, "nage", NULL, KVS_STATUS_OK, "lian", "GET NAME"); + testcase(connfd, KVS_CMD_MOD, "nage", "liu", KVS_STATUS_OK, NULL, "MOD NAME"); + testcase(connfd, KVS_CMD_GET, "nage", NULL, KVS_STATUS_OK, "liu", "GET NAME"); + + testcase(connfd, KVS_CMD_EXIST, "nage", NULL, KVS_STATUS_EXIST, NULL, "EXIST NAME"); + testcase(connfd, KVS_CMD_DEL, "nage", NULL, KVS_STATUS_OK, NULL, "DEL NAME"); + testcase(connfd, KVS_CMD_EXIST, "nage", NULL, KVS_STATUS_NO_EXIST, NULL, "NOT EXIST NAME"); + } + + struct timeval tv_end; + gettimeofday(&tv_end, NULL); + + int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms + + printf("array testcase --> time_used: %d, qps: %d\n", time_used, 70000 * 1000 / time_used); + +} + +void do_batch_example(int fd) +{ + kvs_batch_t batch; + kvs_batch_init(&batch); + + // 组 batch(最多 64 条) + kvs_batch_add(&batch, KVS_CMD_SET, "k1", "v1"); + kvs_batch_add(&batch, KVS_CMD_SET, "k2", "v2"); + kvs_batch_add(&batch, KVS_CMD_GET, "k1", NULL); + kvs_batch_add(&batch, KVS_CMD_GET, "k2", NULL); + + // 一次性发送 + kvs_batch_send(fd, &batch); + + // 一次性 recv + parse + uint8_t recvbuf[BATCH_SIZE]; + kvs_response_t rsps[KVS_BATCH_MAX]; + + int nrsp = kvs_batch_recv_parse(fd, &batch, rsps, recvbuf, sizeof(recvbuf)); + + // 打印/处理 + for (int i = 0; i < nrsp; i++) { + PRESP("BATCH", &rsps[i]); + } + + printf("%d\n", nrsp); + + testcase(fd, KVS_CMD_GET, "k1", NULL, KVS_STATUS_OK, "v1", "GET k1"); + testcase(fd, KVS_CMD_GET, "k2", NULL, KVS_STATUS_OK, "v2", "GET k2"); +} + + +int main(int argc, char *argv[]) { + if (argc != 3) { + printf("arg error\n"); + return -1; + } + + char *ip = argv[1]; + int port = atoi(argv[2]); + + int connfd = connect_tcpserver(ip, port); + + // array_testcase_1w(connfd); + do_batch_example(connfd); + + return 0; +} \ No newline at end of file diff --git a/testcase2 b/testcase2 deleted file mode 100755 index fa5a27c..0000000 Binary files a/testcase2 and /dev/null differ diff --git a/testcase2.c b/testcase2.c deleted file mode 100644 index 877d33c..0000000 --- a/testcase2.c +++ /dev/null @@ -1,299 +0,0 @@ -// kvs_client_min.c -#include -#include -#include -#include -#include -#include -#include -#include - -#define KVS_MAGIC_U32 0x4B565331u // 'KVS1' -#define KVS_HDR_LEN 16 -#define KVS_TYPE_REQ 1 -#define KVS_TYPE_RESP 2 - -// OP mapping (match your server enum) -#define OP_SET 0 -#define OP_GET 1 -#define OP_DEL 2 -#define OP_MOD 3 -#define OP_EXIST 4 - -static int write_u8(uint8_t **pp, uint8_t *end, uint8_t v) { - if (*pp + 1 > end) return -1; - **pp = v; - (*pp)++; - return 0; -} -static int write_u32(uint8_t **pp, uint8_t *end, uint32_t v) { - if (*pp + 4 > end) return -1; - uint32_t be = htonl(v); - memcpy(*pp, &be, 4); - (*pp) += 4; - return 0; -} -static int write_bytes(uint8_t **pp, uint8_t *end, const void *buf, size_t n) { - if (*pp + n > end) return -1; - memcpy(*pp, buf, n); - (*pp) += n; - return 0; -} - -static int read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) { - if (*pp + 1 > end) return -1; - *out = **pp; - (*pp)++; - return 0; -} -static int read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) { - if (*pp + 4 > end) return -1; - uint32_t be; - memcpy(&be, *pp, 4); - *out = ntohl(be); - (*pp) += 4; - return 0; -} - -static ssize_t send_all(int fd, const void *buf, size_t len) { - const uint8_t *p = (const uint8_t*)buf; - size_t sent = 0; - while (sent < len) { - ssize_t n = send(fd, p + sent, len - sent, 0); - if (n < 0) { - if (errno == EINTR) continue; - return -1; - } - if (n == 0) return -1; - sent += (size_t)n; - } - return (ssize_t)sent; -} - -static ssize_t recv_all(int fd, void *buf, size_t len) { - uint8_t *p = (uint8_t*)buf; - size_t recvd = 0; - while (recvd < len) { - ssize_t n = recv(fd, p + recvd, len - recvd, 0); - if (n < 0) { - if (errno == EINTR) continue; - return -1; - } - if (n == 0) return -1; - recvd += (size_t)n; - } - return (ssize_t)recvd; -} - -// Build one request packet: header + payload(opcount + cmds...) -static int build_req_packet(uint32_t reqId, - uint8_t opcount, - uint8_t op, - int argc, const char *argv[], - uint8_t *out, size_t out_cap, size_t *out_len) { - if (!out || out_cap < KVS_HDR_LEN || !out_len) return -1; - - uint8_t *w = out + KVS_HDR_LEN; - uint8_t *end = out + out_cap; - - // payload: opcount(4) - if (write_u32(&w, end, (uint32_t)opcount) < 0) return -1; - - // one cmd (repeat if you want multiple, keep minimal here) - if (write_u8(&w, end, op) < 0) return -1; - if (write_u32(&w, end, (uint32_t)argc) < 0) return -1; - for (int i = 0; i < argc; i++) { - uint32_t alen = (uint32_t)strlen(argv[i]); - if (write_u32(&w, end, alen) < 0) return -1; - if (write_bytes(&w, end, argv[i], alen) < 0) return -1; - } - - uint32_t payloadLen = (uint32_t)(w - (out + KVS_HDR_LEN)); - uint32_t magic_be = htonl(KVS_MAGIC_U32); - uint32_t payload_be = htonl(payloadLen); - uint32_t reqid_be = htonl(reqId); - - // header layout: - // | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | - memcpy(out + 0, &magic_be, 4); - out[4] = KVS_TYPE_REQ; - memcpy(out + 5, &payload_be, 4); - memcpy(out + 9, &reqid_be, 4); - out[13] = 0; // flag1 - out[14] = 0; out[15] = 0; // flag2 - - *out_len = (size_t)(KVS_HDR_LEN + payloadLen); - return 0; -} - -/** - * KVS_STATUS_OK = 0, - KVS_STATUS_ERROR = 1, - KVS_STATUS_NO_EXIST = 2, - KVS_STATUS_EXIST = 3, - KVS_STATUS_BADREQ = 4 - */ -const char *rsp_status[5] = {"OK", "ERROR", "NO_EXIST", "EXIST", "ERROR"}; -static int recv_and_print_resp(int fd) { - uint8_t hdr[KVS_HDR_LEN]; - - if (recv_all(fd, hdr, KVS_HDR_LEN) < 0) { - perror("recv header"); - return -1; - } - - uint32_t magic_be, payload_be, reqid_be; - memcpy(&magic_be, hdr + 0, 4); - memcpy(&payload_be, hdr + 5, 4); - memcpy(&reqid_be, hdr + 9, 4); - - uint32_t magic = ntohl(magic_be); - uint8_t type = hdr[4]; - uint32_t payloadLen = ntohl(payload_be); - uint32_t reqId = ntohl(reqid_be); - - if (magic != KVS_MAGIC_U32 || type != KVS_TYPE_RESP) { - fprintf(stderr, "bad response header: magic=0x%x type=%u\n", magic, type); - return -1; - } - - uint8_t *payload = (uint8_t*)malloc(payloadLen); - if (!payload) return -1; - - if (payloadLen > 0) { - if (recv_all(fd, payload, payloadLen) < 0) { - perror("recv payload"); - free(payload); - return -1; - } - } - - // parse response payload: - // | opcount(4) | repeat Cmd | - // Cmd: | status(1) | datalen(4) | data | - const uint8_t *p = payload; - const uint8_t *end = payload + payloadLen; - - uint32_t opcount = 0; - if (read_u32(&p, end, &opcount) < 0) { - fprintf(stderr, "resp parse failed\n"); - free(payload); - return -1; - } - - printf("RESP reqId=%u opcount=%u\n", reqId, opcount); - - for (uint32_t i = 0; i < opcount; i++) { - uint8_t status = 0; - uint32_t dlen = 0; - if (read_u8(&p, end, &status) < 0) goto bad; - if (read_u32(&p, end, &dlen) < 0) goto bad; - if (p + dlen > end) goto bad; - - printf(" op[%u]: status=%s datalen=%u", i, rsp_status[status], dlen); - if (dlen > 0) { - // printf(" data=\"%.*s\"", (int)dlen, (const char*)p); - printf(" data=\""); - for (int i = 0; i < dlen; i++) { - printf("%02X", (unsigned char)p[i]); - if (i + 1 < dlen) - printf(" "); - } - printf("\""); - } - printf("\n"); - p += dlen; - } - - free(payload); - return 0; - -bad: - fprintf(stderr, "resp parse failed (truncated)\n"); - free(payload); - return -1; -} - -uint8_t buf[4096]; -size_t pkt_len = 0; -uint32_t reqId = 1; -int fd; - -int testcase(int op, const char *args[], int argnum){ - if (build_req_packet(reqId++, 1, op, argnum, args, buf, sizeof(buf), &pkt_len) < 0) { - fprintf(stderr, "build failed\n"); - close(fd); - return 1; - } - if (send_all(fd, buf, pkt_len) < 0) { perror("send SET"); close(fd); return 1; } - if (recv_and_print_resp(fd) < 0) { close(fd); return 1; } - return 0; -} - -#define BUF_CAP 65536 -int main(int argc, char **argv) { - if (argc != 3) { - fprintf(stderr, "Usage: %s \n", argv[0]); - return 1; - } - const char *ip = argv[1]; - int port = atoi(argv[2]); - - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { perror("socket"); return 1; } - - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons((uint16_t)port); - if (inet_pton(AF_INET, ip, &addr.sin_addr) != 1) { - fprintf(stderr, "bad ip\n"); - close(fd); - return 1; - } - if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - perror("connect"); - close(fd); - return 1; - } - - size_t pkt_len = 0; - uint32_t reqId = 1; - - // ---- SET key value ---- - { - const char *args[] = {"foo", "bar"}; - printf("SET "); - for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) { - printf("%02X ", *p); - } - printf("\n"); - testcase(OP_SET, args, 2); - } - - // ---- GET key ---- - { - const char *args[] = {"foo"}; - testcase(OP_GET, args, 1); - } - - // ---- SET key ---- - { - const char *args[] = {"text", "\r\n\0aaaa\r\n\0"}; - printf("SET "); - for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) { - printf("%02X ", *p); - } - printf("\n"); - testcase(OP_SET, args, 2); - } - - // ---- GET key ---- - { - const char *args[] = {"text"}; - testcase(OP_GET, args, 1); - } - - close(fd); - return 0; -}