简化协议,

/**
 * Request
 * Cmd: 	| OP(1) | argc(1) | repeat { arglen(4) | arg } |
 *
 * Response
 * Rsp:		| OP(1) | status(1) | datalen(4) | data |
 */

封装客户端进行批处理和单条命令测试。
This commit is contained in:
2026-01-06 19:16:12 +08:00
parent 0dc86f5aa5
commit 144b374aa2
13 changed files with 815 additions and 634 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,6 @@
NtyCo/ NtyCo/
.vscode/
proactor copy.c proactor copy.c
ntyco copy.c ntyco copy.c

View File

@@ -6,8 +6,8 @@ TESTCASE_SRCS = testcase.c
TARGET = kvstore TARGET = kvstore
SUBDIR = ./NtyCo/ SUBDIR = ./NtyCo/
TESTCASE = testcase TESTCASE = testcase
TESTCASE2 = testcase2 TESTCASE2 = ./test/testcase
TESTCASE2_SRCS = testcase.c TESTCASE2_SRCS = ./test/testcase.c ./test/test_client.c
OBJS = $(SRCS:.c=.o) OBJS = $(SRCS:.c=.o)
@@ -22,18 +22,18 @@ ECHO:
@echo $(SUBDIR) @echo $(SUBDIR)
$(TARGET): $(OBJS) $(TARGET): $(OBJS)
$(CC) -o $@ $^ $(FLAGS) $(CC) -g -o $@ $^ $(FLAGS)
$(TESTCASE): $(TESTCASE_SRCS) $(TESTCASE): $(TESTCASE_SRCS)
$(CC) -o $@ $^ $(CC) -g -o $@ $^
$(TESTCASE2): $(TESTCASE2_SRCS) $(TESTCASE2): $(TESTCASE2_SRCS)
$(CC) -o $@ $^ $(CC) -g -o $@ $^
%.o: %.c %.o: %.c
$(CC) $(FLAGS) -c $^ -o $@ $(CC) $(FLAGS) -c $^ -g -o $@
clean: clean:
rm -rf $(OBJS) $(TARGET) $(TESTCASE) rm -rf $(OBJS) $(TARGET) $(TESTCASE) $(TESTCASE2)

View File

@@ -33,6 +33,8 @@ save() -> 全保存数据集。
这里需要在协议中定义消息长度而不是特殊字符做分隔来确定哪些数据是key哪些是value。 这里需要在协议中定义消息长度而不是特殊字符做分隔来确定哪些数据是key哪些是value。
还要修改底层不要用strlen和strcpy用memcpy。
### 面试题 ### 面试题
1. 为什么会实现kvstore使用场景在哪里 1. 为什么会实现kvstore使用场景在哪里

View File

@@ -62,195 +62,125 @@ int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) {
} }
// return: -1 fail, 0 half, >0 consumed // return: -1 fail, 0 half, >0 consumed
int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out) { int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out){
if (!msg || length <= 0 || !req_out) return -1; if (!request || request_length <= 0 || !req_out) return -1;
memset(req_out, 0, sizeof(*req_out));
if(length < KVS_HDR_LEN) return 0; req_out->op = 0;
req_out->argc = 0;
req_out->args = NULL;
// header parser const uint8_t *p = request;
uint32_t magic_be; const uint8_t *end = request + (size_t)request_length;
memcpy(&magic_be, msg, 4);
uint32_t magic = ntohl(magic_be);
if (magic != KVS_MAGIC) return -1;
// // OP + ARGC
uint8_t type = *(msg+4); if (kvs_need(p, end, 2)) {
return 0; // NEED_MORE
}
uint32_t payloadlen_be; uint8_t op = 0, argc = 0;
memcpy(&payloadlen_be, msg+5, 4); if (kvs_read_u8(&p, end, &op) < 0) return -1;
uint32_t payloadlen = ntohl(payloadlen_be); if (kvs_read_u8(&p, end, &argc) < 0) return -1;
uint32_t reqId_be; if (argc > KVS_MAX_ARGC) return -1;
memcpy(&reqId_be, msg + 9, 4);
uint32_t reqId = ntohl(reqId_be);
uint8_t flag1 = *(msg + 13); // 先扫描一遍确认整条命令数据都在 buffer 里
const uint8_t *scan = p;
uint16_t flag2_be; uint32_t lens[KVS_MAX_ARGC];
memcpy(&flag2_be, msg + 14, 2);
uint16_t flag2 = ntohs(flag2_be);
uint32_t totalLen = (uint32_t)KVS_HDR_LEN + payloadlen;
if ((uint32_t)length < totalLen) return 0; // 半包
// payload parser
const uint8_t *ptr = msg + KVS_HDR_LEN;
const uint8_t *end = ptr + payloadlen;
uint32_t opcount = 0;
if(kvs_read_u32(&ptr, end, &opcount) < 0) return -1;
if (opcount > KVS_MAX_OPCOUNT) return -1;
kvs_op_t *ops = (kvs_op_t *)kvs_malloc(sizeof(kvs_op_t) * opcount);
if (!ops) return -1;
memset(ops, 0, sizeof(kvs_op_t) * opcount);
// operator parser
for (uint32_t i = 0; i < opcount; i++) {
uint8_t op = 0;
uint32_t argc = 0;
if (kvs_read_u8(&ptr, end, &op) < 0) goto FAIL;
if (kvs_read_u32(&ptr, end, &argc) < 0) goto FAIL;
if (argc > 16) goto FAIL;
char **argv = NULL;
if (argc > 0) { if (argc > 0) {
argv = (char **)kvs_malloc(sizeof(char*) * argc); for (uint8_t i = 0; i < argc; i++) {
if (!argv) goto FAIL; if (kvs_need(scan, end, 4)) {
memset(argv, 0, sizeof(char*) * argc); return 0; // NEED_MORE
} }
uint32_t alen = 0;
if (kvs_read_u32(&scan, end, &alen) < 0) return -1;
// operator args parser // 防御:单个参数长度限制
for (uint32_t a = 0; a < argc; a++) { if (alen > KVS_MAX_ARGLEN) return -1;
uint32_t arglen = 0;
if (kvs_read_u32(&ptr, end, &arglen) < 0) goto FAIL;
if (kvs_need(ptr, end, arglen) < 0) goto FAIL;
char *s = (char *)kvs_malloc((size_t)arglen + 1); // 防御scan + alen 越界 / 半包
if (!s) goto FAIL; if (kvs_need(scan, end, (size_t)alen)) {
return 0; // NEED_MORE
memcpy(s, ptr, arglen);
s[arglen] = '\0';
argv[a] = s;
ptr += arglen;
} }
lens[i] = alen;
ops[i].op = op; scan += alen;
ops[i].argc = argc;
ops[i].argv = argv;
}
req_out->magic = magic;
req_out->type = type;
req_out->payloadLen = payloadlen;
req_out->reqId = reqId;
req_out->flag1 = flag1;
req_out->flag2 = flag2;
req_out->opcount = opcount;
req_out->ops = ops;
req_out->consumed = totalLen;
return (int)totalLen;
FAIL:
if (ops) {
for (uint32_t i = 0; i < opcount; i++) {
if (ops[i].argv) {
for (uint32_t a = 0; a < ops[i].argc; a++) {
if (ops[i].argv[a]) kvs_free(ops[i].argv[a]);
}
kvs_free(ops[i].argv);
} }
} }
kvs_free(ops);
size_t total_len = (size_t)(scan - request);
if (total_len > KVS_MAX_CMD_BYTES) return -1;
req_out->op = op;
req_out->argc = argc;
if (argc == 0) {
return (int)total_len;
} }
kvs_arg_t *args = (kvs_arg_t *)kvs_malloc((size_t)argc * sizeof(kvs_arg_t));
if (!args) {
kvs_free_request(req_out);
return -1;
}
memset(args, 0, (size_t)argc * sizeof(kvs_arg_t));
for (uint8_t i = 0; i < argc; i++) {
uint32_t alen = 0;
if (kvs_read_u32(&p, end, &alen) < 0) {
kvs_free(args);
kvs_free_request(req_out);
return -1; return -1;
} }
void kvs_free_request(kvs_request_t *req) { // alen 与 lens[i] 应当一致(扫描时读过),不一致说明解析器/输入异常
if (alen != lens[i]) {
kvs_free(args);
kvs_free_request(req_out);
return -1;
}
args[i].len = alen;
args[i].data = p; // 直接指向输入 buffer零拷贝
p += alen;
}
req_out->args = args;
return (int)(p - request);
}
void kvs_free_request(kvs_req_t *req) {
if (!req) return; if (!req) return;
if (req->ops) { if (req->args) {
for (uint32_t i = 0; i < req->opcount; i++) { kvs_free(req->args);
if (req->ops[i].argv) { req->args = NULL;
for (uint32_t a = 0; a < req->ops[i].argc; a++) {
if (req->ops[i].argv[a]) kvs_free(req->ops[i].argv[a]);
} }
kvs_free(req->ops[i].argv); req->op = 0;
} req->argc = 0;
}
kvs_free(req->ops);
}
memset(req, 0, sizeof(*req));
} }
int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results /* size opcount */) { int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap){
if (!req || !results) return -1; if (!results || !response) return -1;
for (uint32_t i = 0; i < req->opcount; i++) { const uint8_t *end = response + response_cap;
uint8_t status = KVS_STATUS_ERROR; uint8_t *p = response;
const char *data = NULL;
uint32_t dlen = 0;
kvs_exec_one_op(req->ops[i].op, req->ops[i].argc, req->ops[i].argv, // 计算所需长度1 + 1 + 4 + dlen
&status, &data, &dlen); // 注意防止 size_t 溢出
size_t need = 1u + 1u + 4u + (size_t)results->dlen;
if (need > response_cap) return -1;
results[i].status = status; if (kvs_write_u8(&p, end, results->op) < 0) return -1;
results[i].datalen = dlen; if (kvs_write_u8(&p, end, results->status) < 0) return -1;
results[i].data_ptr = data; if (kvs_write_u32(&p, end, results->dlen) < 0) return -1;
}
return 0; if (results->dlen > 0) {
if (!results->data) return -1; // 有长度却没指针,视为错误
if (kvs_need(p, end, (size_t)results->dlen) < 0) return -1;
memcpy(p, results->data, results->dlen);
p += results->dlen;
} }
return (int)(p - response);
int kvs_build_response(const kvs_request_t *req,
const kvs_response_t *results,
uint8_t *response, int response_cap) {
if (!req || !results || !response || response_cap < KVS_HDR_LEN) return -1;
uint8_t *out = response;
uint8_t *w = out + KVS_HDR_LEN;
const uint8_t *w_end = out + response_cap;
// payload: opcount
if (kvs_write_u32(&w, w_end, req->opcount) < 0) return -1;
for (uint32_t i = 0; i < req->opcount; i++) {
if (kvs_write_u8(&w, w_end, results[i].status) < 0) return -1;
if (kvs_write_u32(&w, w_end, results[i].datalen) < 0) return -1;
if (results[i].datalen > 0) {
if (kvs_need(w, w_end, results[i].datalen) < 0) return -1;
memcpy(w, results[i].data_ptr, results[i].datalen);
w += results[i].datalen;
}
}
uint32_t respPayloadLen = (uint32_t)(w - (out + KVS_HDR_LEN));
uint32_t total = KVS_HDR_LEN + respPayloadLen;
// header
{
uint32_t be = htonl(KVS_MAGIC);
memcpy(out + 0, &be, 4);
}
out[4] = (uint8_t)KVS_TYPE_RESP;
{
uint32_t be = htonl(respPayloadLen);
memcpy(out + 5, &be, 4);
}
{
uint32_t be = htonl(req->reqId);
memcpy(out + 9, &be, 4);
}
out[13] = req->flag1;
{
uint16_t be = htons(req->flag2);
memcpy(out + 14, &be, 2);
}
return (int)total;
} }

View File

@@ -15,25 +15,19 @@ int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v);
/** /**
* Header: | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) |
*
* Request * Request
* Payload: | opcount(4) | repeat Cmd | * Cmd: | OP(1) | argc(1) | repeat { arglen(4) | arg } |
* Cmd: | OP(1) | argc(4) | repeat Arg |
* Arg: | arglen(4) | arg |
* *
* Response * Response
* Payload: | opcount(4) | repeat Cmd | * Rsp: | OP(1) | status(1) | datalen(4) | data |
* Cmd: | status(1) | datalen(4) | data |
*/ */
#define KVS_HDR_LEN 16 // | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) | #define KVS_MAX_CMDS_PER_CALL 64
#define KVS_MAGIC 0x4B565331u // 1MB
#define KVS_TYPE_REQ 1 #define KVS_MAX_RESPONSE (1024u * 1024u)
#define KVS_TYPE_RESP 2 #define KVS_MAX_ARGC 64
#define KVS_MAX_ARGLEN (1024u * 1024u)
#define KVS_MAX_OPCOUNT 1024 #define KVS_MAX_CMD_BYTES (4u * 1024u * 1024u)
#define KVS_MAX_RESPONSE (64 * 1024)
enum { enum {
KVS_STATUS_OK = 0, KVS_STATUS_OK = 0,
@@ -43,43 +37,34 @@ enum {
KVS_STATUS_BADREQ = 4 KVS_STATUS_BADREQ = 4
}; };
typedef enum {
KVS_OK = 1,
KVS_NEED_MORE = 0,
KVS_ERROR = -1
}kvs_rc_t;
typedef struct { typedef struct kvs_arg_s{
uint32_t len;
const uint8_t *data;
} kvs_arg_t;
typedef struct kvs_req_s{
uint8_t op; uint8_t op;
uint32_t argc; uint8_t argc;
char **argv; kvs_arg_t *args;
} kvs_op_t; }kvs_req_t;
typedef struct { typedef struct kvs_rsp_s{
uint32_t magic; uint8_t op;
uint8_t type;
uint32_t payloadLen;
uint32_t reqId;
uint8_t flag1;
uint16_t flag2;
// payload ops
uint32_t opcount;
kvs_op_t *ops;
// consumed bytes (header+payload)
uint32_t consumed;
} kvs_request_t;
typedef struct {
uint8_t status; uint8_t status;
uint32_t datalen; uint32_t dlen;
const char *data_ptr; const uint8_t *data;
} kvs_response_t; } kvs_rsp_t;
int kvs_parse_one_cmd(const uint8_t *request, int request_length, kvs_req_t *req_out);
void kvs_free_request(kvs_req_t *req);
int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out);
int kvs_build_one_rsp(const kvs_rsp_t *results, uint8_t *response, size_t response_cap);
int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out);
int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results);
int kvs_build_response(const kvs_request_t *req,
const kvs_response_t *results,
uint8_t *response, int response_cap);
void kvs_free_request(kvs_request_t *req);
void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv,
uint8_t *status_out,
const char **data_out, uint32_t *dlen_out);
#endif #endif

243
kvstore.c
View File

@@ -263,18 +263,22 @@ int kvs_filter_protocol(char **tokens, int count, char *response) {
} }
/** /**
* 输入:op argc argv * 输入:req
* 输出 status * 输出rsp
*/ */
void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv, int kvs_execute_one_cmd(const kvs_req_t *req, kvs_rsp_t *rsp_out) {
uint8_t *status_out, if(!req || !rsp_out) return -1;
const char **data_out, uint32_t *dlen_out) { rsp_out->op = req->op;
*status_out = KVS_STATUS_ERROR; rsp_out->status = KVS_STATUS_ERROR;
*data_out = NULL; rsp_out->data = NULL;
*dlen_out = 0; rsp_out->dlen = 0;
const char *key = (argc >= 1) ? argv[0] : NULL; int argc = req->argc;
const char *val = (argc >= 2) ? argv[1] : NULL; int op = req->op;
kvs_arg_t *argv = req->args;
const char *key = (argc >= 1) ? argv[0].data : NULL;
const char *val = (argc >= 2) ? argv[1].data : NULL;
// 基本参数校验(按你原有命令语义) // 基本参数校验(按你原有命令语义)
switch (op) { switch (op) {
@@ -284,7 +288,7 @@ int kvs_filter_protocol(char **tokens, int count, char *response) {
case KVS_CMD_RMOD: case KVS_CMD_RMOD:
case KVS_CMD_HSET: case KVS_CMD_HSET:
case KVS_CMD_HMOD: case KVS_CMD_HMOD:
if (argc != 2 || !key || !val) { *status_out = KVS_STATUS_BADREQ; return; } if (argc != 2 || !key || !val) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break; break;
case KVS_CMD_GET: case KVS_CMD_GET:
case KVS_CMD_DEL: case KVS_CMD_DEL:
@@ -295,11 +299,11 @@ int kvs_filter_protocol(char **tokens, int count, char *response) {
case KVS_CMD_HGET: case KVS_CMD_HGET:
case KVS_CMD_HDEL: case KVS_CMD_HDEL:
case KVS_CMD_HEXIST: case KVS_CMD_HEXIST:
if (argc != 1 || !key) { *status_out = KVS_STATUS_BADREQ; return; } if (argc != 1 || !key) { rsp_out->status = KVS_STATUS_BADREQ; return -1; }
break; break;
default: default:
*status_out = KVS_STATUS_BADREQ; rsp_out->status = KVS_STATUS_BADREQ;
return; return -1;
} }
int ret = 0; int ret = 0;
@@ -309,147 +313,184 @@ int kvs_filter_protocol(char **tokens, int count, char *response) {
#if ENABLE_ARRAY #if ENABLE_ARRAY
case KVS_CMD_SET: case KVS_CMD_SET:
ret = kvs_array_set(&global_array, (char*)key, (char*)val); ret = kvs_array_set(&global_array, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_EXIST; else rsp_out->status = KVS_STATUS_EXIST;
return; return 0;
case KVS_CMD_GET: case KVS_CMD_GET:
result = kvs_array_get(&global_array, (char*)key); result = kvs_array_get(&global_array, (char*)key);
if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; }
*status_out = KVS_STATUS_OK; rsp_out->status = KVS_STATUS_OK;
*data_out = result; rsp_out->data = result;
*dlen_out = (uint32_t)strlen(result); rsp_out->dlen = (uint32_t)strlen(result);
return; return 0;
case KVS_CMD_DEL: case KVS_CMD_DEL:
ret = kvs_array_del(&global_array, (char*)key); ret = kvs_array_del(&global_array, (char*)key);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST; else rsp_out->status = KVS_STATUS_NO_EXIST;
return; return 0;
case KVS_CMD_MOD: case KVS_CMD_MOD:
ret = kvs_array_mod(&global_array, (char*)key, (char*)val); ret = kvs_array_mod(&global_array, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST; else rsp_out->status = KVS_STATUS_NO_EXIST;
return; return 0;
case KVS_CMD_EXIST: case KVS_CMD_EXIST:
ret = kvs_array_exist(&global_array, (char*)key); ret = kvs_array_exist(&global_array, (char*)key);
*status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return; return 0;
#endif #endif
#if ENABLE_RBTREE #if ENABLE_RBTREE
case KVS_CMD_RSET: case KVS_CMD_RSET:
ret = kvs_rbtree_set(&global_rbtree, (char*)key, (char*)val); ret = kvs_rbtree_set(&global_rbtree, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_EXIST; else rsp_out->status = KVS_STATUS_EXIST;
return; return 0;
case KVS_CMD_RGET: case KVS_CMD_RGET:
result = kvs_rbtree_get(&global_rbtree, (char*)key); result = kvs_rbtree_get(&global_rbtree, (char*)key);
if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; }
*status_out = KVS_STATUS_OK; rsp_out->status = KVS_STATUS_OK;
*data_out = result; rsp_out->data = result;
*dlen_out = (uint32_t)strlen(result); rsp_out->dlen = (uint32_t)strlen(result);
return; return 0;
case KVS_CMD_RDEL: case KVS_CMD_RDEL:
ret = kvs_rbtree_del(&global_rbtree, (char*)key); ret = kvs_rbtree_del(&global_rbtree, (char*)key);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST; else rsp_out->status = KVS_STATUS_NO_EXIST;
return; return 0;
case KVS_CMD_RMOD: case KVS_CMD_RMOD:
ret = kvs_rbtree_mod(&global_rbtree, (char*)key, (char*)val); ret = kvs_rbtree_mod(&global_rbtree, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST; else rsp_out->status = KVS_STATUS_NO_EXIST;
return; return 0;
case KVS_CMD_REXIST: case KVS_CMD_REXIST:
ret = kvs_rbtree_exist(&global_rbtree, (char*)key); ret = kvs_rbtree_exist(&global_rbtree, (char*)key);
*status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return; return 0;
#endif #endif
#if ENABLE_HASH #if ENABLE_HASH
case KVS_CMD_HSET: case KVS_CMD_HSET:
ret = kvs_hash_set(&global_hash, (char*)key, (char*)val); ret = kvs_hash_set(&global_hash, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_EXIST; else rsp_out->status = KVS_STATUS_EXIST;
return; return 0;
case KVS_CMD_HGET: case KVS_CMD_HGET:
result = kvs_hash_get(&global_hash, (char*)key); result = kvs_hash_get(&global_hash, (char*)key);
if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; } if (!result) { rsp_out->status = KVS_STATUS_NO_EXIST; return 0; }
*status_out = KVS_STATUS_OK; rsp_out->status = KVS_STATUS_OK;
*data_out = result; rsp_out->data = result;
*dlen_out = (uint32_t)strlen(result); rsp_out->dlen = (uint32_t)strlen(result);
return; return 0;
case KVS_CMD_HDEL: case KVS_CMD_HDEL:
ret = kvs_hash_del(&global_hash, (char*)key); ret = kvs_hash_del(&global_hash, (char*)key);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST; else rsp_out->status = KVS_STATUS_NO_EXIST;
return; return 0;
case KVS_CMD_HMOD: case KVS_CMD_HMOD:
ret = kvs_hash_mod(&global_hash, (char*)key, (char*)val); ret = kvs_hash_mod(&global_hash, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR; if (ret < 0) rsp_out->status = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK; else if (ret == 0) rsp_out->status = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST; else rsp_out->status = KVS_STATUS_NO_EXIST;
return; return 0;
case KVS_CMD_HEXIST: case KVS_CMD_HEXIST:
ret = kvs_hash_exist(&global_hash, (char*)key); ret = kvs_hash_exist(&global_hash, (char*)key);
*status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST; rsp_out->status = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return; return 0;
#endif #endif
default: default:
*status_out = KVS_STATUS_BADREQ; rsp_out->status = KVS_STATUS_BADREQ;
return;
}
}
#if NEW_KVSTORE
// int kvs_protocol(char *msg, int length, char *response, int *response_len){
int kvs_protocol(char *msg, int length, char *response){
if (!msg || length <= 0 || !response) return 0;
int response_len = 0;
kvs_request_t req;
int consumed = kvs_parse_request((const uint8_t *)msg, length, &req);
if (consumed < 0) return 0; // 解析失败
else if(consumed == 0) return 0; // 半包
kvs_response_t *results = (kvs_response_t *)kvs_malloc(sizeof(kvs_response_t) * req.opcount);
if (!results) { kvs_free_request(&req); return -1; }
memset(results, 0, sizeof(kvs_response_t) * req.opcount);
if (kvs_execute_request(&req, results) < 0) {
kvs_free(results);
kvs_free_request(&req);
return -1; return -1;
} }
int resp_len = kvs_build_response(&req, results, (uint8_t *)response, KVS_MAX_RESPONSE); return -1;
kvs_free(results); }
#if NEW_KVSTORE
/**
* input : request request_length
* output : response response_length consumed_out
* return : -1 error, =0 半包, 1 成功
*/
int kvs_protocol(char *request, int request_length, int *consumed_out, char *response, int *response_length){
if (!request || request_length <= 0 || !consumed_out || !response || !response_length) return KVS_NEED_MORE;
int consumed = 0;
int out_len = 0;
int budget = KVS_MAX_CMDS_PER_CALL;
while(consumed < request_length && (budget-- > 0)){
kvs_req_t req;
memset(&req, 0, sizeof(kvs_req_t));
const uint8_t *p = request+consumed;
int remain = request_length - consumed;
int len = kvs_parse_one_cmd(p, remain, &req);
if(len < 0){
kvs_free_request(&req); kvs_free_request(&req);
*consumed_out = consumed;
*response_length = out_len;
return KVS_ERROR;
}
else if(len == 0){
kvs_free_request(&req);
break;
}
if (resp_len < 0) return 0; // error kvs_rsp_t rsp;
memset(&rsp, 0, sizeof(kvs_rsp_t));
response_len = resp_len; if (kvs_execute_one_cmd(&req, &rsp) < 0){
return response_len; kvs_free_request(&req);
*consumed_out = consumed;
*response_length = out_len;
return KVS_ERROR;
}
if (out_len >= KVS_MAX_RESPONSE) {
kvs_free_request(&req);
*consumed_out = consumed;
*response_length = out_len;
return KVS_ERROR;
}
int resp_len = kvs_build_one_rsp(&rsp, (uint8_t *)response+out_len, KVS_MAX_RESPONSE-out_len);
kvs_free_request(&req);
if (resp_len < 0) {
*consumed_out = consumed;
*response_length = out_len;
return KVS_ERROR;
}
// printf("resp_len:%d\n", resp_len);
// printf("consumed:%d\n", len);
out_len += resp_len;
consumed += len;
}
*consumed_out = consumed;
*response_length = out_len;
if (consumed == 0 && out_len == 0) return KVS_NEED_MORE;
return KVS_OK;
} }
#else #else
/* /*

View File

@@ -34,8 +34,8 @@
#define NEW_KVSTORE 1 #define NEW_KVSTORE 1
typedef int (*msg_handler)(char *msg, int length, char *response); // typedef int (*msg_handler)(char *msg, int length, char *response);
typedef int (*msg_handler)(char *request, int request_length, int *consumed_out, char *response, int *response_length);
extern int reactor_start(unsigned short port, msg_handler handler); extern int reactor_start(unsigned short port, msg_handler handler);
extern int proactor_start(unsigned short port, msg_handler handler); extern int proactor_start(unsigned short port, msg_handler handler);

View File

@@ -27,13 +27,15 @@
#if ENABLE_KVSTORE #if ENABLE_KVSTORE
typedef int (*msg_handler)(char *msg, int length, char *response); // typedef int (*msg_handler)(char *msg, int length, char *response);
typedef int (*msg_handler)(char *request, int request_length, int *consumed_out, char *response, int *response_length);
static msg_handler kvs_handler; static msg_handler kvs_handler;
int kvs_request(struct conn *c) { int kvs_request(struct conn *c) {
int consumed_out;
c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer); int ret = kvs_handler(c->rbuffer, c->rlength, &consumed_out, c->wbuffer, &c->wlength);
} }

221
test/test_client.c Normal file
View File

@@ -0,0 +1,221 @@
#include "test_client.h"
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) {
return (p + n <= end) ? 0 : -1;
}
// 注意u8类型不需要ntoh或者hton
int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 1) < 0) return -1;
*out = *p;
*pp = p + 1;
return 0;
}
int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 2) < 0) return -1;
uint16_t v;
memcpy(&v, p, 2);
*out = ntohs(v);
*pp = p + 2;
return 0;
}
int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 4) < 0) return -1;
uint32_t v;
memcpy(&v, p, 4);
*out = ntohl(v);
*pp = p + 4;
return 0;
}
int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v) {
uint8_t *p = *pp;
if (kvs_need(p, end, 1) < 0) return -1;
*p = v;
*pp = p + 1;
return 0;
}
int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v) {
uint8_t *p = *pp;
if (kvs_need(p, end, 2) < 0) return -1;
uint16_t be = htons(v);
memcpy(p, &be, 2);
*pp = p + 2;
return 0;
}
int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) {
uint8_t *p = *pp;
if (kvs_need(p, end, 4) < 0) return -1;
uint32_t be = htonl(v);
memcpy(p, &be, 4);
*pp = p + 4;
return 0;
}
int getcmd(uint8_t op, const char *key, const char *value, uint8_t *buf){
if(!key || !buf) return -1;
uint8_t *end = buf + CMD_SIZE;
uint8_t *p = buf;
uint8_t argc = ((value == NULL)?1 : 2);
if (kvs_write_u8(&p, end, op) < 0) return -1;
if (kvs_write_u8(&p, end, argc) < 0) return -1;
// 写入 key
int keylen = strlen(key);
if (kvs_write_u32(&p, end, keylen) < 0) return -1;
if (kvs_need(p, end, keylen) < 0) return -1;
if (keylen > 0) {
memcpy(p, key, keylen);
p += keylen;
}
if(value){
int vallen = strlen(value);
if (kvs_write_u32(&p, end, vallen) < 0) return -1;
if (kvs_need(p, end, vallen) < 0) return -1;
if (vallen > 0) {
memcpy(p, value, vallen);
p += vallen;
}
}
return (p - buf);
}
int parse_response(const uint8_t *buf, int buflen, kvs_response_t *rsp) {
const uint8_t *p = buf;
const uint8_t *end = buf + buflen;
// 读取 OP
if (kvs_read_u8(&p, end, &rsp->op) < 0) {
fprintf(stderr, "Failed to read op\n");
return -1;
}
// 读取 status
if (kvs_read_u8(&p, end, &rsp->status) < 0) {
fprintf(stderr, "Failed to read status\n");
return -1;
}
// 读取 datalen
if (kvs_read_u32(&p, end, &rsp->datalen) < 0) {
fprintf(stderr, "Failed to read datalen\n");
return -1;
}
// 检查数据长度
if (kvs_need(p, end, rsp->datalen) < 0) {
fprintf(stderr, "Data length mismatch: expected %u bytes, but only %ld available\n",
rsp->datalen, end - p);
return -1;
}
// 指向数据部分
rsp->data = (uint8_t *)p;
return (p - buf) + rsp->datalen;
}
void print_response(const char *cmd_name, const kvs_response_t *rsp) {
printf("\n=== %s Response ===\n", cmd_name);
printf("OP: %u\n", rsp->op);
printf("Status: %u ", rsp->status);
switch (rsp->status) {
case KVS_STATUS_OK:
printf("(OK)\n");
break;
case KVS_STATUS_ERROR:
printf("(ERROR)\n");
break;
case KVS_STATUS_NO_EXIST:
printf("(NO_EXIST)\n");
break;
case KVS_STATUS_EXIST:
printf("(EXISTS)\n");
break;
default:
printf("(UNKNOWN)\n");
break;
}
printf("Data Length: %u\n", rsp->datalen);
if (rsp->datalen > 0 && rsp->data != NULL) {
printf("Data: ");
// 尝试以字符串形式打印(如果是可打印字符)
int is_printable = 1;
for (uint32_t i = 0; i < rsp->datalen; i++) {
if (rsp->data[i] < 32 || rsp->data[i] > 126) {
is_printable = 0;
break;
}
}
if (is_printable) {
printf("\"");
for (uint32_t i = 0; i < rsp->datalen; i++) {
printf("%c", rsp->data[i]);
}
printf("\"\n");
} else {
// 以十六进制打印
printf("0x");
for (uint32_t i = 0; i < rsp->datalen; i++) {
printf("%02x", rsp->data[i]);
}
printf("\n");
}
} else {
printf("Data: (empty)\n");
}
printf("==================\n");
}
int verify_response(const kvs_response_t *rsp, uint8_t expected_op,
uint8_t expected_status, const char *expected_data) {
if (rsp->op != expected_op) {
printf("❌ OP mismatch: expected %u, got %u\n", expected_op, rsp->op);
return 0;
}
if (rsp->status != expected_status) {
printf("❌ Status mismatch: expected %u, got %u\n", expected_status, rsp->status);
return 0;
}
if (expected_data != NULL) {
uint32_t expected_len = strlen(expected_data);
if (rsp->datalen != expected_len) {
printf("❌ Data length mismatch: expected %u, got %u\n", expected_len, rsp->datalen);
return 0;
}
if (memcmp(rsp->data, expected_data, expected_len) != 0) {
printf("❌ Data content mismatch\n");
return 0;
}
}
return 1;
}

154
test/test_client.h Normal file
View File

@@ -0,0 +1,154 @@
/**
* Request
* Cmd: | OP(1) | argc(1) | repeat { arglen(4) | arg } |
*
* Response
* Rsp: | OP(1) | status(1) | datalen(4) | data |
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define CMD_SIZE (4096)
#define BATCH_SIZE (65536)
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define PRESP print_response
typedef enum {
KVS_STATUS_OK = 0,
KVS_STATUS_ERROR = 1,
KVS_STATUS_NO_EXIST = 2,
KVS_STATUS_EXIST = 3,
KVS_STATUS_BADREQ = 4
}rsp_ret_status_e;
enum {
KVS_CMD_START = 0,
// array
KVS_CMD_SET = KVS_CMD_START,
KVS_CMD_GET,
KVS_CMD_DEL,
KVS_CMD_MOD,
KVS_CMD_EXIST,
// rbtree
KVS_CMD_RSET,
KVS_CMD_RGET,
KVS_CMD_RDEL,
KVS_CMD_RMOD,
KVS_CMD_REXIST,
// hash
KVS_CMD_HSET,
KVS_CMD_HGET,
KVS_CMD_HDEL,
KVS_CMD_HMOD,
KVS_CMD_HEXIST,
KVS_CMD_COUNT,
};
typedef struct {
uint8_t op;
uint8_t status;
uint32_t datalen;
uint8_t *data;
} kvs_response_t;
int kvs_need(const uint8_t *p, const uint8_t *end, size_t n);
int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out);
int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out);
int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out);
int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v);
int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v);
int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v);
int getcmd(uint8_t op, const char *key, const char *value, uint8_t *buf);
int parse_response(const uint8_t *buf, int buflen, kvs_response_t *rsp);
void print_response(const char *cmd_name, const kvs_response_t *rsp);
int verify_response(const kvs_response_t *rsp, uint8_t expected_op,
uint8_t expected_status, const char *expected_data);
#define KVS_BATCH_MAX 64
typedef struct {
uint8_t buf[BATCH_SIZE];
int len; // 当前已写入的 batch 字节数
int cnt; // 当前 batch 里命令条数
int cmd_len[KVS_BATCH_MAX];
} kvs_batch_t;
static void kvs_batch_init(kvs_batch_t *b)
{
b->len = 0;
b->cnt = 0;
memset(b->cmd_len, 0, sizeof(b->cmd_len));
}
/**
* 用 getcmd() 生成单条命令,然后 append 到 batch buffer
* 返回0 成功,-1 失败(太多条 or buffer 不够)
*/
static int kvs_batch_add(kvs_batch_t *b, uint8_t op, const char *key, const char *value)
{
if (b->cnt >= KVS_BATCH_MAX) return -1;
uint8_t tmp[CMD_SIZE];
int n = getcmd(op, key, value, tmp); // 你提供的函数
if (n <= 0) return -1;
if (b->len + n > (int)sizeof(b->buf)) return -1;
memcpy(b->buf + b->len, tmp, n);
b->cmd_len[b->cnt] = n;
b->cnt++;
b->len += n;
return 0;
}
/**
* 一次性发送 batch
* 返回:发送字节数,<0 表示失败
*/
static int kvs_batch_send(int fd, const kvs_batch_t *b)
{
return (int)send(fd, b->buf, b->len, 0);
}
/**
* 一次 recv 收回所有响应,然后批量解析为 rsp 数组
*
* 返回:成功解析出的响应条数(期望是 b->cnt
*/
static int kvs_batch_recv_parse(int fd,
const kvs_batch_t *b,
kvs_response_t *rsps, // 输出数组,长度 >= b->cnt
uint8_t *recvbuf,
int recvbuf_cap)
{
int nrecv = (int)recv(fd, recvbuf, recvbuf_cap, 0);
if (nrecv <= 0) return -1;
int off = 0;
int parsed = 0;
while (parsed < b->cnt && off < nrecv) {
int consumed = parse_response(recvbuf + off, nrecv - off, &rsps[parsed]);
if (consumed <= 0) break; // 不够解析/失败,简单处理:直接退出
off += consumed;
parsed++;
}
return parsed;
}

143
test/testcase.c Normal file
View File

@@ -0,0 +1,143 @@
#include "test_client.h"
#include <arpa/inet.h>
int connect_tcpserver(const char *ip, unsigned short port) {
int connfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(struct sockaddr_in));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(ip);
server_addr.sin_port = htons(port);
if (0 != connect(connfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr_in))) {
perror("connect");
return -1;
}
return connfd;
}
int send_msg(int connfd, char *msg, int length) {
int res = send(connfd, msg, length, 0);
if (res < 0) {
perror("send");
exit(1);
}
return res;
}
int recv_msg(int connfd, char *msg, int length) {
int res = recv(connfd, msg, length, 0);
if (res < 0) {
perror("recv");
exit(1);
}
return res;
}
void testcase(int connfd, uint8_t op, const char* key, const char* value, rsp_ret_status_e st, const char* rsp_value, const char* command_name){
uint8_t buf[CMD_SIZE];
uint8_t result[CMD_SIZE];
kvs_response_t rsp;
int len, recv_len;
len = getcmd(op, key, value, buf);
send_msg(connfd, buf, len);
recv_len = recv_msg(connfd, result, CMD_SIZE);
if (parse_response(result, recv_len, &rsp) > 0) {
PRESP(command_name, &rsp);
if(!verify_response(&rsp, op, st, rsp_value)) printf("%s\n", command_name);
}else{
printf("parser error\n");
}
return ;
}
void array_testcase_1w(int connfd) {
int count = 1;
int i = 0;
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
for (i = 0;i < count;i ++) {
testcase(connfd, KVS_CMD_SET, "nage", "lian", KVS_STATUS_OK, NULL, "SET NAME");
testcase(connfd, KVS_CMD_GET, "nage", NULL, KVS_STATUS_OK, "lian", "GET NAME");
testcase(connfd, KVS_CMD_MOD, "nage", "liu", KVS_STATUS_OK, NULL, "MOD NAME");
testcase(connfd, KVS_CMD_GET, "nage", NULL, KVS_STATUS_OK, "liu", "GET NAME");
testcase(connfd, KVS_CMD_EXIST, "nage", NULL, KVS_STATUS_EXIST, NULL, "EXIST NAME");
testcase(connfd, KVS_CMD_DEL, "nage", NULL, KVS_STATUS_OK, NULL, "DEL NAME");
testcase(connfd, KVS_CMD_EXIST, "nage", NULL, KVS_STATUS_NO_EXIST, NULL, "NOT EXIST NAME");
}
struct timeval tv_end;
gettimeofday(&tv_end, NULL);
int time_used = TIME_SUB_MS(tv_end, tv_begin); // ms
printf("array testcase --> time_used: %d, qps: %d\n", time_used, 70000 * 1000 / time_used);
}
void do_batch_example(int fd)
{
kvs_batch_t batch;
kvs_batch_init(&batch);
// 组 batch最多 64 条)
kvs_batch_add(&batch, KVS_CMD_SET, "k1", "v1");
kvs_batch_add(&batch, KVS_CMD_SET, "k2", "v2");
kvs_batch_add(&batch, KVS_CMD_GET, "k1", NULL);
kvs_batch_add(&batch, KVS_CMD_GET, "k2", NULL);
// 一次性发送
kvs_batch_send(fd, &batch);
// 一次性 recv + parse
uint8_t recvbuf[BATCH_SIZE];
kvs_response_t rsps[KVS_BATCH_MAX];
int nrsp = kvs_batch_recv_parse(fd, &batch, rsps, recvbuf, sizeof(recvbuf));
// 打印/处理
for (int i = 0; i < nrsp; i++) {
PRESP("BATCH", &rsps[i]);
}
printf("%d\n", nrsp);
testcase(fd, KVS_CMD_GET, "k1", NULL, KVS_STATUS_OK, "v1", "GET k1");
testcase(fd, KVS_CMD_GET, "k2", NULL, KVS_STATUS_OK, "v2", "GET k2");
}
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("arg error\n");
return -1;
}
char *ip = argv[1];
int port = atoi(argv[2]);
int connfd = connect_tcpserver(ip, port);
// array_testcase_1w(connfd);
do_batch_example(connfd);
return 0;
}

BIN
testcase2

Binary file not shown.

View File

@@ -1,299 +0,0 @@
// kvs_client_min.c
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define KVS_MAGIC_U32 0x4B565331u // 'KVS1'
#define KVS_HDR_LEN 16
#define KVS_TYPE_REQ 1
#define KVS_TYPE_RESP 2
// OP mapping (match your server enum)
#define OP_SET 0
#define OP_GET 1
#define OP_DEL 2
#define OP_MOD 3
#define OP_EXIST 4
static int write_u8(uint8_t **pp, uint8_t *end, uint8_t v) {
if (*pp + 1 > end) return -1;
**pp = v;
(*pp)++;
return 0;
}
static int write_u32(uint8_t **pp, uint8_t *end, uint32_t v) {
if (*pp + 4 > end) return -1;
uint32_t be = htonl(v);
memcpy(*pp, &be, 4);
(*pp) += 4;
return 0;
}
static int write_bytes(uint8_t **pp, uint8_t *end, const void *buf, size_t n) {
if (*pp + n > end) return -1;
memcpy(*pp, buf, n);
(*pp) += n;
return 0;
}
static int read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) {
if (*pp + 1 > end) return -1;
*out = **pp;
(*pp)++;
return 0;
}
static int read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) {
if (*pp + 4 > end) return -1;
uint32_t be;
memcpy(&be, *pp, 4);
*out = ntohl(be);
(*pp) += 4;
return 0;
}
static ssize_t send_all(int fd, const void *buf, size_t len) {
const uint8_t *p = (const uint8_t*)buf;
size_t sent = 0;
while (sent < len) {
ssize_t n = send(fd, p + sent, len - sent, 0);
if (n < 0) {
if (errno == EINTR) continue;
return -1;
}
if (n == 0) return -1;
sent += (size_t)n;
}
return (ssize_t)sent;
}
static ssize_t recv_all(int fd, void *buf, size_t len) {
uint8_t *p = (uint8_t*)buf;
size_t recvd = 0;
while (recvd < len) {
ssize_t n = recv(fd, p + recvd, len - recvd, 0);
if (n < 0) {
if (errno == EINTR) continue;
return -1;
}
if (n == 0) return -1;
recvd += (size_t)n;
}
return (ssize_t)recvd;
}
// Build one request packet: header + payload(opcount + cmds...)
static int build_req_packet(uint32_t reqId,
uint8_t opcount,
uint8_t op,
int argc, const char *argv[],
uint8_t *out, size_t out_cap, size_t *out_len) {
if (!out || out_cap < KVS_HDR_LEN || !out_len) return -1;
uint8_t *w = out + KVS_HDR_LEN;
uint8_t *end = out + out_cap;
// payload: opcount(4)
if (write_u32(&w, end, (uint32_t)opcount) < 0) return -1;
// one cmd (repeat if you want multiple, keep minimal here)
if (write_u8(&w, end, op) < 0) return -1;
if (write_u32(&w, end, (uint32_t)argc) < 0) return -1;
for (int i = 0; i < argc; i++) {
uint32_t alen = (uint32_t)strlen(argv[i]);
if (write_u32(&w, end, alen) < 0) return -1;
if (write_bytes(&w, end, argv[i], alen) < 0) return -1;
}
uint32_t payloadLen = (uint32_t)(w - (out + KVS_HDR_LEN));
uint32_t magic_be = htonl(KVS_MAGIC_U32);
uint32_t payload_be = htonl(payloadLen);
uint32_t reqid_be = htonl(reqId);
// header layout:
// | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) |
memcpy(out + 0, &magic_be, 4);
out[4] = KVS_TYPE_REQ;
memcpy(out + 5, &payload_be, 4);
memcpy(out + 9, &reqid_be, 4);
out[13] = 0; // flag1
out[14] = 0; out[15] = 0; // flag2
*out_len = (size_t)(KVS_HDR_LEN + payloadLen);
return 0;
}
/**
* KVS_STATUS_OK = 0,
KVS_STATUS_ERROR = 1,
KVS_STATUS_NO_EXIST = 2,
KVS_STATUS_EXIST = 3,
KVS_STATUS_BADREQ = 4
*/
const char *rsp_status[5] = {"OK", "ERROR", "NO_EXIST", "EXIST", "ERROR"};
static int recv_and_print_resp(int fd) {
uint8_t hdr[KVS_HDR_LEN];
if (recv_all(fd, hdr, KVS_HDR_LEN) < 0) {
perror("recv header");
return -1;
}
uint32_t magic_be, payload_be, reqid_be;
memcpy(&magic_be, hdr + 0, 4);
memcpy(&payload_be, hdr + 5, 4);
memcpy(&reqid_be, hdr + 9, 4);
uint32_t magic = ntohl(magic_be);
uint8_t type = hdr[4];
uint32_t payloadLen = ntohl(payload_be);
uint32_t reqId = ntohl(reqid_be);
if (magic != KVS_MAGIC_U32 || type != KVS_TYPE_RESP) {
fprintf(stderr, "bad response header: magic=0x%x type=%u\n", magic, type);
return -1;
}
uint8_t *payload = (uint8_t*)malloc(payloadLen);
if (!payload) return -1;
if (payloadLen > 0) {
if (recv_all(fd, payload, payloadLen) < 0) {
perror("recv payload");
free(payload);
return -1;
}
}
// parse response payload:
// | opcount(4) | repeat Cmd |
// Cmd: | status(1) | datalen(4) | data |
const uint8_t *p = payload;
const uint8_t *end = payload + payloadLen;
uint32_t opcount = 0;
if (read_u32(&p, end, &opcount) < 0) {
fprintf(stderr, "resp parse failed\n");
free(payload);
return -1;
}
printf("RESP reqId=%u opcount=%u\n", reqId, opcount);
for (uint32_t i = 0; i < opcount; i++) {
uint8_t status = 0;
uint32_t dlen = 0;
if (read_u8(&p, end, &status) < 0) goto bad;
if (read_u32(&p, end, &dlen) < 0) goto bad;
if (p + dlen > end) goto bad;
printf(" op[%u]: status=%s datalen=%u", i, rsp_status[status], dlen);
if (dlen > 0) {
// printf(" data=\"%.*s\"", (int)dlen, (const char*)p);
printf(" data=\"");
for (int i = 0; i < dlen; i++) {
printf("%02X", (unsigned char)p[i]);
if (i + 1 < dlen)
printf(" ");
}
printf("\"");
}
printf("\n");
p += dlen;
}
free(payload);
return 0;
bad:
fprintf(stderr, "resp parse failed (truncated)\n");
free(payload);
return -1;
}
uint8_t buf[4096];
size_t pkt_len = 0;
uint32_t reqId = 1;
int fd;
int testcase(int op, const char *args[], int argnum){
if (build_req_packet(reqId++, 1, op, argnum, args, buf, sizeof(buf), &pkt_len) < 0) {
fprintf(stderr, "build failed\n");
close(fd);
return 1;
}
if (send_all(fd, buf, pkt_len) < 0) { perror("send SET"); close(fd); return 1; }
if (recv_and_print_resp(fd) < 0) { close(fd); return 1; }
return 0;
}
#define BUF_CAP 65536
int main(int argc, char **argv) {
if (argc != 3) {
fprintf(stderr, "Usage: %s <ip> <port>\n", argv[0]);
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) { perror("socket"); return 1; }
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons((uint16_t)port);
if (inet_pton(AF_INET, ip, &addr.sin_addr) != 1) {
fprintf(stderr, "bad ip\n");
close(fd);
return 1;
}
if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("connect");
close(fd);
return 1;
}
size_t pkt_len = 0;
uint32_t reqId = 1;
// ---- SET key value ----
{
const char *args[] = {"foo", "bar"};
printf("SET ");
for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) {
printf("%02X ", *p);
}
printf("\n");
testcase(OP_SET, args, 2);
}
// ---- GET key ----
{
const char *args[] = {"foo"};
testcase(OP_GET, args, 1);
}
// ---- SET key ----
{
const char *args[] = {"text", "\r\n\0aaaa\r\n\0"};
printf("SET ");
for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) {
printf("%02X ", *p);
}
printf("\n");
testcase(OP_SET, args, 2);
}
// ---- GET key ----
{
const char *args[] = {"text"};
testcase(OP_GET, args, 1);
}
close(fd);
return 0;
}