diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5dd3692 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +NtyCo/ + +proactor copy.c +ntyco copy.c +reactor copy.c \ No newline at end of file diff --git a/Makefile b/Makefile index 9db46e0..ac0e9d7 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,19 @@ CC = gcc FLAGS = -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -luring -ldl -SRCS = kvstore.c ntyco.c proactor.c kvs_array.c kvs_rbtree.c kvs_hash.c +SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array.c kvs_rbtree.c kvs_hash.c kvs_rw_tools.c TESTCASE_SRCS = testcase.c TARGET = kvstore SUBDIR = ./NtyCo/ TESTCASE = testcase +TESTCASE2 = testcase2 +TESTCASE2_SRCS = testcase.c + OBJS = $(SRCS:.c=.o) -all: $(SUBDIR) $(TARGET) $(TESTCASE) +all: $(SUBDIR) $(TARGET) $(TESTCASE) $(TESTCASE2) $(SUBDIR): ECHO make -C $@ @@ -24,6 +27,9 @@ $(TARGET): $(OBJS) $(TESTCASE): $(TESTCASE_SRCS) $(CC) -o $@ $^ +$(TESTCASE2): $(TESTCASE2_SRCS) + $(CC) -o $@ $^ + %.o: %.c $(CC) $(FLAGS) -c $^ -o $@ diff --git a/README.md b/README.md index 4eb4f4e..4cd05bf 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,38 @@ # 9.1 Kvstore +#### 目前KV存储没有持久化,down 之后 就清空。 +1. 全量持久化。 +save() -> 全保存数据集。 +2. 增量持久化。 +每执行一条,把命令写入文件里。下次允许把日志重新运行。 + +#### 目前KV存储 key value 都是调用的 malloc, 都是系统的内存分配。 +1. 会出现大量的内存碎片,实现一个内存池,管理内存数据。 +2. 对比有内存池和没有内存池的性能差异,以及开源内存池 jemalloc 的性能差别。 +3. 性能测: + 1. qps 有没有内存池。 + 2.虚拟内存的占用情况 htop。插入百万条数据集(KV*100w,set 200w del 100w delete 200w set 100w)。 + +#### 目前的kv存储是单指令的,一条指令一交互。 +从单挑指令实现批量处理。 + +对于同一个连接,目前缺少对以下情况的处理: +1. 收到半包 +2. 收到多条消息 + +对不同连接,隔离他们的消息到不同的缓冲区?要不先不考虑不同的连接的情况?Nytoco代码不是很熟。 + +如果read->execute->send->read同步循环,不会出现缓冲区被覆盖的情况。但是可能会有半包或者两个命令都抵达了,才触发事件。所以应用层需要解析到完整请求才执行下一步。 + + +#### 目前KV存储是一个单点服务,实现一个主从同步的功能。 +主从同步:启动两个KVstore。在a里set,在b里get。 + +#### 目前的Key和Value都是单个单词的,实现支持特殊字符。 +如:空格回车换行。支持插入博客。 + +这里需要在协议中定义消息长度而不是特殊字符做分隔来确定哪些数据是key哪些是value。 + ### 面试题 1. 为什么会实现kvstore,使用场景在哪里? diff --git a/kvs_rw_tools.c b/kvs_rw_tools.c new file mode 100644 index 0000000..4cd8345 --- /dev/null +++ b/kvs_rw_tools.c @@ -0,0 +1,256 @@ +#include "kvstore.h" +#include "kvs_rw_tools.h" +#include + +int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) { + return (p + n <= end) ? 0 : -1; +} + +// 注意u8类型不需要ntoh或者hton +int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) { + const uint8_t *p = *pp; + if (kvs_need(p, end, 1) < 0) return -1; + *out = *p; + *pp = p + 1; + return 0; +} + +int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out) { + const uint8_t *p = *pp; + if (kvs_need(p, end, 2) < 0) return -1; + uint16_t v; + memcpy(&v, p, 2); + *out = ntohs(v); + *pp = p + 2; + return 0; +} + +int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) { + const uint8_t *p = *pp; + if (kvs_need(p, end, 4) < 0) return -1; + uint32_t v; + memcpy(&v, p, 4); + *out = ntohl(v); + *pp = p + 4; + return 0; +} + +int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v) { + uint8_t *p = *pp; + if (kvs_need(p, end, 1) < 0) return -1; + *p = v; + *pp = p + 1; + return 0; +} + +int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v) { + uint8_t *p = *pp; + if (kvs_need(p, end, 2) < 0) return -1; + uint16_t be = htons(v); + memcpy(p, &be, 2); + *pp = p + 2; + return 0; +} + +int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) { + uint8_t *p = *pp; + if (kvs_need(p, end, 4) < 0) return -1; + uint32_t be = htonl(v); + memcpy(p, &be, 4); + *pp = p + 4; + return 0; +} + +// return: -1 fail, 0 half, >0 consumed +int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out) { + if (!msg || length <= 0 || !req_out) return -1; + memset(req_out, 0, sizeof(*req_out)); + + if(length < KVS_HDR_LEN) return 0; + + // header parser + uint32_t magic_be; + memcpy(&magic_be, msg, 4); + uint32_t magic = ntohl(magic_be); + if (magic != KVS_MAGIC) return -1; + + // + uint8_t type = *(msg+4); + + uint32_t payloadlen_be; + memcpy(&payloadlen_be, msg+5, 4); + uint32_t payloadlen = ntohl(payloadlen_be); + + uint32_t reqId_be; + memcpy(&reqId_be, msg + 9, 4); + uint32_t reqId = ntohl(reqId_be); + + uint8_t flag1 = *(msg + 13); + + uint16_t flag2_be; + memcpy(&flag2_be, msg + 14, 2); + uint16_t flag2 = ntohs(flag2_be); + + uint32_t totalLen = (uint32_t)KVS_HDR_LEN + payloadlen; + if ((uint32_t)length < totalLen) return 0; // 半包 + + + // payload parser + const uint8_t *ptr = msg + KVS_HDR_LEN; + const uint8_t *end = ptr + payloadlen; + + uint32_t opcount = 0; + if(kvs_read_u32(&ptr, end, &opcount) < 0) return -1; + if (opcount > KVS_MAX_OPCOUNT) return -1; + + kvs_op_t *ops = (kvs_op_t *)kvs_malloc(sizeof(kvs_op_t) * opcount); + if (!ops) return -1; + memset(ops, 0, sizeof(kvs_op_t) * opcount); + + // operator parser + for (uint32_t i = 0; i < opcount; i++) { + uint8_t op = 0; + uint32_t argc = 0; + + if (kvs_read_u8(&ptr, end, &op) < 0) goto FAIL; + if (kvs_read_u32(&ptr, end, &argc) < 0) goto FAIL; + if (argc > 16) goto FAIL; + + char **argv = NULL; + if (argc > 0) { + argv = (char **)kvs_malloc(sizeof(char*) * argc); + if (!argv) goto FAIL; + memset(argv, 0, sizeof(char*) * argc); + } + + // operator args parser + for (uint32_t a = 0; a < argc; a++) { + uint32_t arglen = 0; + if (kvs_read_u32(&ptr, end, &arglen) < 0) goto FAIL; + if (kvs_need(ptr, end, arglen) < 0) goto FAIL; + + char *s = (char *)kvs_malloc((size_t)arglen + 1); + if (!s) goto FAIL; + + memcpy(s, ptr, arglen); + s[arglen] = '\0'; + argv[a] = s; + ptr += arglen; + } + + ops[i].op = op; + ops[i].argc = argc; + ops[i].argv = argv; + } + + req_out->magic = magic; + req_out->type = type; + req_out->payloadLen = payloadlen; + req_out->reqId = reqId; + req_out->flag1 = flag1; + req_out->flag2 = flag2; + req_out->opcount = opcount; + req_out->ops = ops; + req_out->consumed = totalLen; + + return (int)totalLen; + +FAIL: + if (ops) { + for (uint32_t i = 0; i < opcount; i++) { + if (ops[i].argv) { + for (uint32_t a = 0; a < ops[i].argc; a++) { + if (ops[i].argv[a]) kvs_free(ops[i].argv[a]); + } + kvs_free(ops[i].argv); + } + } + kvs_free(ops); + } + return -1; +} + +void kvs_free_request(kvs_request_t *req) { + if (!req) return; + if (req->ops) { + for (uint32_t i = 0; i < req->opcount; i++) { + if (req->ops[i].argv) { + for (uint32_t a = 0; a < req->ops[i].argc; a++) { + if (req->ops[i].argv[a]) kvs_free(req->ops[i].argv[a]); + } + kvs_free(req->ops[i].argv); + } + } + kvs_free(req->ops); + } + memset(req, 0, sizeof(*req)); +} + +int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results /* size opcount */) { + if (!req || !results) return -1; + + for (uint32_t i = 0; i < req->opcount; i++) { + uint8_t status = KVS_STATUS_ERROR; + const char *data = NULL; + uint32_t dlen = 0; + + kvs_exec_one_op(req->ops[i].op, req->ops[i].argc, req->ops[i].argv, + &status, &data, &dlen); + + results[i].status = status; + results[i].datalen = dlen; + results[i].data_ptr = data; + } + return 0; +} + + + int kvs_build_response(const kvs_request_t *req, + const kvs_response_t *results, + uint8_t *response, int response_cap) { + if (!req || !results || !response || response_cap < KVS_HDR_LEN) return -1; + + uint8_t *out = response; + uint8_t *w = out + KVS_HDR_LEN; + const uint8_t *w_end = out + response_cap; + + // payload: opcount + if (kvs_write_u32(&w, w_end, req->opcount) < 0) return -1; + + for (uint32_t i = 0; i < req->opcount; i++) { + if (kvs_write_u8(&w, w_end, results[i].status) < 0) return -1; + if (kvs_write_u32(&w, w_end, results[i].datalen) < 0) return -1; + + if (results[i].datalen > 0) { + if (kvs_need(w, w_end, results[i].datalen) < 0) return -1; + memcpy(w, results[i].data_ptr, results[i].datalen); + w += results[i].datalen; + } + } + + uint32_t respPayloadLen = (uint32_t)(w - (out + KVS_HDR_LEN)); + uint32_t total = KVS_HDR_LEN + respPayloadLen; + + // header + { + uint32_t be = htonl(KVS_MAGIC); + memcpy(out + 0, &be, 4); + } + out[4] = (uint8_t)KVS_TYPE_RESP; + { + uint32_t be = htonl(respPayloadLen); + memcpy(out + 5, &be, 4); + } + { + uint32_t be = htonl(req->reqId); + memcpy(out + 9, &be, 4); + } + out[13] = req->flag1; + { + uint16_t be = htons(req->flag2); + memcpy(out + 14, &be, 2); + } + + return (int)total; +} + diff --git a/kvs_rw_tools.h b/kvs_rw_tools.h new file mode 100644 index 0000000..90dd3ac --- /dev/null +++ b/kvs_rw_tools.h @@ -0,0 +1,85 @@ +#ifndef __KVS_RW_TOOLS_H__ +#define __KVS_RW_TOOLS_H__ + +#include +#include +int kvs_need(const uint8_t *p, const uint8_t *end, size_t n); +int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out); +int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out); +int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out); + +int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v); +int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v); +int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v); + + + +/** + * Header: | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | + * + * Request + * Payload: | opcount(4) | repeat Cmd | + * Cmd: | OP(1) | argc(4) | repeat Arg | + * Arg: | arglen(4) | arg | + * + * Response + * Payload: | opcount(4) | repeat Cmd | + * Cmd: | status(1) | datalen(4) | data | + */ + +#define KVS_HDR_LEN 16 // | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | +#define KVS_MAGIC 0x4B565331u +#define KVS_TYPE_REQ 1 +#define KVS_TYPE_RESP 2 + +#define KVS_MAX_OPCOUNT 1024 +#define KVS_MAX_RESPONSE (64 * 1024) + +enum { + KVS_STATUS_OK = 0, + KVS_STATUS_ERROR = 1, + KVS_STATUS_NO_EXIST = 2, + KVS_STATUS_EXIST = 3, + KVS_STATUS_BADREQ = 4 +}; + + +typedef struct { + uint8_t op; + uint32_t argc; + char **argv; +} kvs_op_t; + +typedef struct { + uint32_t magic; + uint8_t type; + uint32_t payloadLen; + uint32_t reqId; + uint8_t flag1; + uint16_t flag2; + + // payload ops + uint32_t opcount; + kvs_op_t *ops; + + // consumed bytes (header+payload) + uint32_t consumed; +} kvs_request_t; + +typedef struct { + uint8_t status; + uint32_t datalen; + const char *data_ptr; +} kvs_response_t; + +int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out); +int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results); +int kvs_build_response(const kvs_request_t *req, + const kvs_response_t *results, + uint8_t *response, int response_cap); +void kvs_free_request(kvs_request_t *req); +void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv, + uint8_t *status_out, + const char **data_out, uint32_t *dlen_out); + +#endif diff --git a/kvstore.c b/kvstore.c index 47e4a70..7554d7e 100644 --- a/kvstore.c +++ b/kvstore.c @@ -3,7 +3,7 @@ #include "kvstore.h" - +#include "kvs_rw_tools.h" #if ENABLE_ARRAY extern kvs_array_t global_array; @@ -262,7 +262,196 @@ int kvs_filter_protocol(char **tokens, int count, char *response) { return length; } +/** + * 输入:op argc argv + * 输出 status + */ + void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv, + uint8_t *status_out, + const char **data_out, uint32_t *dlen_out) { + *status_out = KVS_STATUS_ERROR; + *data_out = NULL; + *dlen_out = 0; + const char *key = (argc >= 1) ? argv[0] : NULL; + const char *val = (argc >= 2) ? argv[1] : NULL; + + // 基本参数校验(按你原有命令语义) + switch (op) { + case KVS_CMD_SET: + case KVS_CMD_MOD: + case KVS_CMD_RSET: + case KVS_CMD_RMOD: + case KVS_CMD_HSET: + case KVS_CMD_HMOD: + if (argc != 2 || !key || !val) { *status_out = KVS_STATUS_BADREQ; return; } + break; + case KVS_CMD_GET: + case KVS_CMD_DEL: + case KVS_CMD_EXIST: + case KVS_CMD_RGET: + case KVS_CMD_RDEL: + case KVS_CMD_REXIST: + case KVS_CMD_HGET: + case KVS_CMD_HDEL: + case KVS_CMD_HEXIST: + if (argc != 1 || !key) { *status_out = KVS_STATUS_BADREQ; return; } + break; + default: + *status_out = KVS_STATUS_BADREQ; + return; + } + + int ret = 0; + const char *result = NULL; + + switch (op) { +#if ENABLE_ARRAY + case KVS_CMD_SET: + ret = kvs_array_set(&global_array, (char*)key, (char*)val); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_EXIST; + return; + + case KVS_CMD_GET: + result = kvs_array_get(&global_array, (char*)key); + if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } + *status_out = KVS_STATUS_OK; + *data_out = result; + *dlen_out = (uint32_t)strlen(result); + return; + + case KVS_CMD_DEL: + ret = kvs_array_del(&global_array, (char*)key); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_NO_EXIST; + return; + + case KVS_CMD_MOD: + ret = kvs_array_mod(&global_array, (char*)key, (char*)val); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_NO_EXIST; + return; + + case KVS_CMD_EXIST: + ret = kvs_array_exist(&global_array, (char*)key); + *status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; + return; +#endif + +#if ENABLE_RBTREE + case KVS_CMD_RSET: + ret = kvs_rbtree_set(&global_rbtree, (char*)key, (char*)val); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_EXIST; + return; + + case KVS_CMD_RGET: + result = kvs_rbtree_get(&global_rbtree, (char*)key); + if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } + *status_out = KVS_STATUS_OK; + *data_out = result; + *dlen_out = (uint32_t)strlen(result); + return; + + case KVS_CMD_RDEL: + ret = kvs_rbtree_del(&global_rbtree, (char*)key); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_NO_EXIST; + return; + + case KVS_CMD_RMOD: + ret = kvs_rbtree_mod(&global_rbtree, (char*)key, (char*)val); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_NO_EXIST; + return; + + case KVS_CMD_REXIST: + ret = kvs_rbtree_exist(&global_rbtree, (char*)key); + *status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; + return; +#endif + +#if ENABLE_HASH + case KVS_CMD_HSET: + ret = kvs_hash_set(&global_hash, (char*)key, (char*)val); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_EXIST; + return; + + case KVS_CMD_HGET: + result = kvs_hash_get(&global_hash, (char*)key); + if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } + *status_out = KVS_STATUS_OK; + *data_out = result; + *dlen_out = (uint32_t)strlen(result); + return; + + case KVS_CMD_HDEL: + ret = kvs_hash_del(&global_hash, (char*)key); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_NO_EXIST; + return; + + case KVS_CMD_HMOD: + ret = kvs_hash_mod(&global_hash, (char*)key, (char*)val); + if (ret < 0) *status_out = KVS_STATUS_ERROR; + else if (ret == 0) *status_out = KVS_STATUS_OK; + else *status_out = KVS_STATUS_NO_EXIST; + return; + + case KVS_CMD_HEXIST: + ret = kvs_hash_exist(&global_hash, (char*)key); + *status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; + return; +#endif + default: + *status_out = KVS_STATUS_BADREQ; + return; + } +} + +#if NEW_KVSTORE +// int kvs_protocol(char *msg, int length, char *response, int *response_len){ +int kvs_protocol(char *msg, int length, char *response){ + if (!msg || length <= 0 || !response) return 0; + int response_len = 0; + + kvs_request_t req; + int consumed = kvs_parse_request((const uint8_t *)msg, length, &req); + + if (consumed < 0) return 0; // 解析失败 + else if(consumed == 0) return 0; // 半包 + + + kvs_response_t *results = (kvs_response_t *)kvs_malloc(sizeof(kvs_response_t) * req.opcount); + if (!results) { kvs_free_request(&req); return -1; } + memset(results, 0, sizeof(kvs_response_t) * req.opcount); + + if (kvs_execute_request(&req, results) < 0) { + kvs_free(results); + kvs_free_request(&req); + return -1; + } + + int resp_len = kvs_build_response(&req, results, (uint8_t *)response, KVS_MAX_RESPONSE); + kvs_free(results); + kvs_free_request(&req); + + if (resp_len < 0) return 0; // error + + response_len = resp_len; + return response_len; +} +#else /* * msg: request message * length: length of request message @@ -288,7 +477,7 @@ int kvs_protocol(char *msg, int length, char *response) { // return kvs_filter_protocol(tokens, count, response); } - +#endif int init_kvengine(void) { @@ -337,9 +526,9 @@ int main(int argc, char *argv[]) { #if (NETWORK_SELECT == NETWORK_REACTOR) reactor_start(port, kvs_protocol); // #elif (NETWORK_SELECT == NETWORK_PROACTOR) - ntyco_start(port, kvs_protocol); -#elif (NETWORK_SELECT == NETWORK_NTYCO) proactor_start(port, kvs_protocol); +#elif (NETWORK_SELECT == NETWORK_NTYCO) + ntyco_start(port, kvs_protocol); #endif dest_kvengine(); diff --git a/kvstore.h b/kvstore.h index 0971c00..55f7cb9 100644 --- a/kvstore.h +++ b/kvstore.h @@ -10,15 +10,20 @@ #include #include #include +#include #define NETWORK_REACTOR 0 #define NETWORK_PROACTOR 1 #define NETWORK_NTYCO 2 -#define NETWORK_SELECT NETWORK_PROACTOR - +#define NETWORK_SELECT NETWORK_REACTOR +// 8MB +#define MAX_PAYLOAD_LEN 8388608 +#define MAX_OPCOUNT 1024 +// 4MB +#define MAX_ARG_LEN 4194304 #define KVS_MAX_TOKENS 128 @@ -26,6 +31,8 @@ #define ENABLE_RBTREE 1 #define ENABLE_HASH 1 +#define NEW_KVSTORE 1 + typedef int (*msg_handler)(char *msg, int length, char *response); diff --git a/proactor.c b/proactor.c index 2e4ff78..646c8d3 100755 --- a/proactor.c +++ b/proactor.c @@ -170,7 +170,8 @@ int proactor_start(unsigned short port, msg_handler handler) { set_event_send(&ring, result.fd, response, ret, 0); } - } else if (result.event == EVENT_WRITE) { // + } else if (result.event == EVENT_WRITE) { + // int ret = entries->res; //printf("set_event_send ret: %d, %s\n", ret, buffer); diff --git a/testcase2 b/testcase2 new file mode 100755 index 0000000..fa5a27c Binary files /dev/null and b/testcase2 differ diff --git a/testcase2.c b/testcase2.c new file mode 100644 index 0000000..877d33c --- /dev/null +++ b/testcase2.c @@ -0,0 +1,299 @@ +// kvs_client_min.c +#include +#include +#include +#include +#include +#include +#include +#include + +#define KVS_MAGIC_U32 0x4B565331u // 'KVS1' +#define KVS_HDR_LEN 16 +#define KVS_TYPE_REQ 1 +#define KVS_TYPE_RESP 2 + +// OP mapping (match your server enum) +#define OP_SET 0 +#define OP_GET 1 +#define OP_DEL 2 +#define OP_MOD 3 +#define OP_EXIST 4 + +static int write_u8(uint8_t **pp, uint8_t *end, uint8_t v) { + if (*pp + 1 > end) return -1; + **pp = v; + (*pp)++; + return 0; +} +static int write_u32(uint8_t **pp, uint8_t *end, uint32_t v) { + if (*pp + 4 > end) return -1; + uint32_t be = htonl(v); + memcpy(*pp, &be, 4); + (*pp) += 4; + return 0; +} +static int write_bytes(uint8_t **pp, uint8_t *end, const void *buf, size_t n) { + if (*pp + n > end) return -1; + memcpy(*pp, buf, n); + (*pp) += n; + return 0; +} + +static int read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) { + if (*pp + 1 > end) return -1; + *out = **pp; + (*pp)++; + return 0; +} +static int read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) { + if (*pp + 4 > end) return -1; + uint32_t be; + memcpy(&be, *pp, 4); + *out = ntohl(be); + (*pp) += 4; + return 0; +} + +static ssize_t send_all(int fd, const void *buf, size_t len) { + const uint8_t *p = (const uint8_t*)buf; + size_t sent = 0; + while (sent < len) { + ssize_t n = send(fd, p + sent, len - sent, 0); + if (n < 0) { + if (errno == EINTR) continue; + return -1; + } + if (n == 0) return -1; + sent += (size_t)n; + } + return (ssize_t)sent; +} + +static ssize_t recv_all(int fd, void *buf, size_t len) { + uint8_t *p = (uint8_t*)buf; + size_t recvd = 0; + while (recvd < len) { + ssize_t n = recv(fd, p + recvd, len - recvd, 0); + if (n < 0) { + if (errno == EINTR) continue; + return -1; + } + if (n == 0) return -1; + recvd += (size_t)n; + } + return (ssize_t)recvd; +} + +// Build one request packet: header + payload(opcount + cmds...) +static int build_req_packet(uint32_t reqId, + uint8_t opcount, + uint8_t op, + int argc, const char *argv[], + uint8_t *out, size_t out_cap, size_t *out_len) { + if (!out || out_cap < KVS_HDR_LEN || !out_len) return -1; + + uint8_t *w = out + KVS_HDR_LEN; + uint8_t *end = out + out_cap; + + // payload: opcount(4) + if (write_u32(&w, end, (uint32_t)opcount) < 0) return -1; + + // one cmd (repeat if you want multiple, keep minimal here) + if (write_u8(&w, end, op) < 0) return -1; + if (write_u32(&w, end, (uint32_t)argc) < 0) return -1; + for (int i = 0; i < argc; i++) { + uint32_t alen = (uint32_t)strlen(argv[i]); + if (write_u32(&w, end, alen) < 0) return -1; + if (write_bytes(&w, end, argv[i], alen) < 0) return -1; + } + + uint32_t payloadLen = (uint32_t)(w - (out + KVS_HDR_LEN)); + uint32_t magic_be = htonl(KVS_MAGIC_U32); + uint32_t payload_be = htonl(payloadLen); + uint32_t reqid_be = htonl(reqId); + + // header layout: + // | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | + memcpy(out + 0, &magic_be, 4); + out[4] = KVS_TYPE_REQ; + memcpy(out + 5, &payload_be, 4); + memcpy(out + 9, &reqid_be, 4); + out[13] = 0; // flag1 + out[14] = 0; out[15] = 0; // flag2 + + *out_len = (size_t)(KVS_HDR_LEN + payloadLen); + return 0; +} + +/** + * KVS_STATUS_OK = 0, + KVS_STATUS_ERROR = 1, + KVS_STATUS_NO_EXIST = 2, + KVS_STATUS_EXIST = 3, + KVS_STATUS_BADREQ = 4 + */ +const char *rsp_status[5] = {"OK", "ERROR", "NO_EXIST", "EXIST", "ERROR"}; +static int recv_and_print_resp(int fd) { + uint8_t hdr[KVS_HDR_LEN]; + + if (recv_all(fd, hdr, KVS_HDR_LEN) < 0) { + perror("recv header"); + return -1; + } + + uint32_t magic_be, payload_be, reqid_be; + memcpy(&magic_be, hdr + 0, 4); + memcpy(&payload_be, hdr + 5, 4); + memcpy(&reqid_be, hdr + 9, 4); + + uint32_t magic = ntohl(magic_be); + uint8_t type = hdr[4]; + uint32_t payloadLen = ntohl(payload_be); + uint32_t reqId = ntohl(reqid_be); + + if (magic != KVS_MAGIC_U32 || type != KVS_TYPE_RESP) { + fprintf(stderr, "bad response header: magic=0x%x type=%u\n", magic, type); + return -1; + } + + uint8_t *payload = (uint8_t*)malloc(payloadLen); + if (!payload) return -1; + + if (payloadLen > 0) { + if (recv_all(fd, payload, payloadLen) < 0) { + perror("recv payload"); + free(payload); + return -1; + } + } + + // parse response payload: + // | opcount(4) | repeat Cmd | + // Cmd: | status(1) | datalen(4) | data | + const uint8_t *p = payload; + const uint8_t *end = payload + payloadLen; + + uint32_t opcount = 0; + if (read_u32(&p, end, &opcount) < 0) { + fprintf(stderr, "resp parse failed\n"); + free(payload); + return -1; + } + + printf("RESP reqId=%u opcount=%u\n", reqId, opcount); + + for (uint32_t i = 0; i < opcount; i++) { + uint8_t status = 0; + uint32_t dlen = 0; + if (read_u8(&p, end, &status) < 0) goto bad; + if (read_u32(&p, end, &dlen) < 0) goto bad; + if (p + dlen > end) goto bad; + + printf(" op[%u]: status=%s datalen=%u", i, rsp_status[status], dlen); + if (dlen > 0) { + // printf(" data=\"%.*s\"", (int)dlen, (const char*)p); + printf(" data=\""); + for (int i = 0; i < dlen; i++) { + printf("%02X", (unsigned char)p[i]); + if (i + 1 < dlen) + printf(" "); + } + printf("\""); + } + printf("\n"); + p += dlen; + } + + free(payload); + return 0; + +bad: + fprintf(stderr, "resp parse failed (truncated)\n"); + free(payload); + return -1; +} + +uint8_t buf[4096]; +size_t pkt_len = 0; +uint32_t reqId = 1; +int fd; + +int testcase(int op, const char *args[], int argnum){ + if (build_req_packet(reqId++, 1, op, argnum, args, buf, sizeof(buf), &pkt_len) < 0) { + fprintf(stderr, "build failed\n"); + close(fd); + return 1; + } + if (send_all(fd, buf, pkt_len) < 0) { perror("send SET"); close(fd); return 1; } + if (recv_and_print_resp(fd) < 0) { close(fd); return 1; } + return 0; +} + +#define BUF_CAP 65536 +int main(int argc, char **argv) { + if (argc != 3) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + const char *ip = argv[1]; + int port = atoi(argv[2]); + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { perror("socket"); return 1; } + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons((uint16_t)port); + if (inet_pton(AF_INET, ip, &addr.sin_addr) != 1) { + fprintf(stderr, "bad ip\n"); + close(fd); + return 1; + } + if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + perror("connect"); + close(fd); + return 1; + } + + size_t pkt_len = 0; + uint32_t reqId = 1; + + // ---- SET key value ---- + { + const char *args[] = {"foo", "bar"}; + printf("SET "); + for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) { + printf("%02X ", *p); + } + printf("\n"); + testcase(OP_SET, args, 2); + } + + // ---- GET key ---- + { + const char *args[] = {"foo"}; + testcase(OP_GET, args, 1); + } + + // ---- SET key ---- + { + const char *args[] = {"text", "\r\n\0aaaa\r\n\0"}; + printf("SET "); + for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) { + printf("%02X ", *p); + } + printf("\n"); + testcase(OP_SET, args, 2); + } + + // ---- GET key ---- + { + const char *args[] = {"text"}; + testcase(OP_GET, args, 1); + } + + close(fd); + return 0; +}