协议定义与实现, 协议支持 批处理、特殊字符如\r\n\0。与单条命令测试。

/**
 * Header: 	| magic(4) | payloadLen(4) |
 *
 * Request
 * Payload: | opcount(4) | repeat Cmd |
 * Cmd: 	| OP(1) | argc(4) | repeat Arg |
 * Arg:		| arglen(4) | arg |
 *
 * Response
 * Payload: | opcount(4) | repeat Rsp |
 * Rsp:		| OP(1) | status(1) | datalen(4) | data |
 */

 kvstore层,先解析,再执行,最后构建返回体。
 一个是半包问题,没有处理。
 另一个是感觉协议结构有点麻烦,
This commit is contained in:
2026-01-05 23:20:37 +08:00
parent 7524c57442
commit 0dc86f5aa5
10 changed files with 890 additions and 9 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
NtyCo/
proactor copy.c
ntyco copy.c
reactor copy.c

View File

@@ -1,16 +1,19 @@
CC = gcc CC = gcc
FLAGS = -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -luring -ldl FLAGS = -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -luring -ldl
SRCS = kvstore.c ntyco.c proactor.c kvs_array.c kvs_rbtree.c kvs_hash.c SRCS = kvstore.c ntyco.c proactor.c reactor.c kvs_array.c kvs_rbtree.c kvs_hash.c kvs_rw_tools.c
TESTCASE_SRCS = testcase.c TESTCASE_SRCS = testcase.c
TARGET = kvstore TARGET = kvstore
SUBDIR = ./NtyCo/ SUBDIR = ./NtyCo/
TESTCASE = testcase TESTCASE = testcase
TESTCASE2 = testcase2
TESTCASE2_SRCS = testcase.c
OBJS = $(SRCS:.c=.o) OBJS = $(SRCS:.c=.o)
all: $(SUBDIR) $(TARGET) $(TESTCASE) all: $(SUBDIR) $(TARGET) $(TESTCASE) $(TESTCASE2)
$(SUBDIR): ECHO $(SUBDIR): ECHO
make -C $@ make -C $@
@@ -24,6 +27,9 @@ $(TARGET): $(OBJS)
$(TESTCASE): $(TESTCASE_SRCS) $(TESTCASE): $(TESTCASE_SRCS)
$(CC) -o $@ $^ $(CC) -o $@ $^
$(TESTCASE2): $(TESTCASE2_SRCS)
$(CC) -o $@ $^
%.o: %.c %.o: %.c
$(CC) $(FLAGS) -c $^ -o $@ $(CC) $(FLAGS) -c $^ -o $@

View File

@@ -1,5 +1,38 @@
# 9.1 Kvstore # 9.1 Kvstore
#### 目前KV存储没有持久化down 之后 就清空。
1. 全量持久化。
save() -> 全保存数据集。
2. 增量持久化。
每执行一条,把命令写入文件里。下次允许把日志重新运行。
#### 目前KV存储 key value 都是调用的 malloc 都是系统的内存分配。
1. 会出现大量的内存碎片,实现一个内存池,管理内存数据。
2. 对比有内存池和没有内存池的性能差异,以及开源内存池 jemalloc 的性能差别。
3. 性能测:
1. qps 有没有内存池。
2.虚拟内存的占用情况 htop。插入百万条数据集(KV*100wset 200w del 100w delete 200w set 100w)。
#### 目前的kv存储是单指令的一条指令一交互。
从单挑指令实现批量处理。
对于同一个连接,目前缺少对以下情况的处理:
1. 收到半包
2. 收到多条消息
对不同连接隔离他们的消息到不同的缓冲区要不先不考虑不同的连接的情况Nytoco代码不是很熟。
如果read->execute->send->read同步循环不会出现缓冲区被覆盖的情况。但是可能会有半包或者两个命令都抵达了才触发事件。所以应用层需要解析到完整请求才执行下一步。
#### 目前KV存储是一个单点服务实现一个主从同步的功能。
主从同步启动两个KVstore。在a里set在b里get。
#### 目前的Key和Value都是单个单词的实现支持特殊字符。
如:空格回车换行。支持插入博客。
这里需要在协议中定义消息长度而不是特殊字符做分隔来确定哪些数据是key哪些是value。
### 面试题 ### 面试题
1. 为什么会实现kvstore使用场景在哪里 1. 为什么会实现kvstore使用场景在哪里

256
kvs_rw_tools.c Normal file
View File

@@ -0,0 +1,256 @@
#include "kvstore.h"
#include "kvs_rw_tools.h"
#include <arpa/inet.h>
int kvs_need(const uint8_t *p, const uint8_t *end, size_t n) {
return (p + n <= end) ? 0 : -1;
}
// 注意u8类型不需要ntoh或者hton
int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 1) < 0) return -1;
*out = *p;
*pp = p + 1;
return 0;
}
int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 2) < 0) return -1;
uint16_t v;
memcpy(&v, p, 2);
*out = ntohs(v);
*pp = p + 2;
return 0;
}
int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) {
const uint8_t *p = *pp;
if (kvs_need(p, end, 4) < 0) return -1;
uint32_t v;
memcpy(&v, p, 4);
*out = ntohl(v);
*pp = p + 4;
return 0;
}
int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v) {
uint8_t *p = *pp;
if (kvs_need(p, end, 1) < 0) return -1;
*p = v;
*pp = p + 1;
return 0;
}
int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v) {
uint8_t *p = *pp;
if (kvs_need(p, end, 2) < 0) return -1;
uint16_t be = htons(v);
memcpy(p, &be, 2);
*pp = p + 2;
return 0;
}
int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v) {
uint8_t *p = *pp;
if (kvs_need(p, end, 4) < 0) return -1;
uint32_t be = htonl(v);
memcpy(p, &be, 4);
*pp = p + 4;
return 0;
}
// return: -1 fail, 0 half, >0 consumed
int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out) {
if (!msg || length <= 0 || !req_out) return -1;
memset(req_out, 0, sizeof(*req_out));
if(length < KVS_HDR_LEN) return 0;
// header parser
uint32_t magic_be;
memcpy(&magic_be, msg, 4);
uint32_t magic = ntohl(magic_be);
if (magic != KVS_MAGIC) return -1;
//
uint8_t type = *(msg+4);
uint32_t payloadlen_be;
memcpy(&payloadlen_be, msg+5, 4);
uint32_t payloadlen = ntohl(payloadlen_be);
uint32_t reqId_be;
memcpy(&reqId_be, msg + 9, 4);
uint32_t reqId = ntohl(reqId_be);
uint8_t flag1 = *(msg + 13);
uint16_t flag2_be;
memcpy(&flag2_be, msg + 14, 2);
uint16_t flag2 = ntohs(flag2_be);
uint32_t totalLen = (uint32_t)KVS_HDR_LEN + payloadlen;
if ((uint32_t)length < totalLen) return 0; // 半包
// payload parser
const uint8_t *ptr = msg + KVS_HDR_LEN;
const uint8_t *end = ptr + payloadlen;
uint32_t opcount = 0;
if(kvs_read_u32(&ptr, end, &opcount) < 0) return -1;
if (opcount > KVS_MAX_OPCOUNT) return -1;
kvs_op_t *ops = (kvs_op_t *)kvs_malloc(sizeof(kvs_op_t) * opcount);
if (!ops) return -1;
memset(ops, 0, sizeof(kvs_op_t) * opcount);
// operator parser
for (uint32_t i = 0; i < opcount; i++) {
uint8_t op = 0;
uint32_t argc = 0;
if (kvs_read_u8(&ptr, end, &op) < 0) goto FAIL;
if (kvs_read_u32(&ptr, end, &argc) < 0) goto FAIL;
if (argc > 16) goto FAIL;
char **argv = NULL;
if (argc > 0) {
argv = (char **)kvs_malloc(sizeof(char*) * argc);
if (!argv) goto FAIL;
memset(argv, 0, sizeof(char*) * argc);
}
// operator args parser
for (uint32_t a = 0; a < argc; a++) {
uint32_t arglen = 0;
if (kvs_read_u32(&ptr, end, &arglen) < 0) goto FAIL;
if (kvs_need(ptr, end, arglen) < 0) goto FAIL;
char *s = (char *)kvs_malloc((size_t)arglen + 1);
if (!s) goto FAIL;
memcpy(s, ptr, arglen);
s[arglen] = '\0';
argv[a] = s;
ptr += arglen;
}
ops[i].op = op;
ops[i].argc = argc;
ops[i].argv = argv;
}
req_out->magic = magic;
req_out->type = type;
req_out->payloadLen = payloadlen;
req_out->reqId = reqId;
req_out->flag1 = flag1;
req_out->flag2 = flag2;
req_out->opcount = opcount;
req_out->ops = ops;
req_out->consumed = totalLen;
return (int)totalLen;
FAIL:
if (ops) {
for (uint32_t i = 0; i < opcount; i++) {
if (ops[i].argv) {
for (uint32_t a = 0; a < ops[i].argc; a++) {
if (ops[i].argv[a]) kvs_free(ops[i].argv[a]);
}
kvs_free(ops[i].argv);
}
}
kvs_free(ops);
}
return -1;
}
void kvs_free_request(kvs_request_t *req) {
if (!req) return;
if (req->ops) {
for (uint32_t i = 0; i < req->opcount; i++) {
if (req->ops[i].argv) {
for (uint32_t a = 0; a < req->ops[i].argc; a++) {
if (req->ops[i].argv[a]) kvs_free(req->ops[i].argv[a]);
}
kvs_free(req->ops[i].argv);
}
}
kvs_free(req->ops);
}
memset(req, 0, sizeof(*req));
}
int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results /* size opcount */) {
if (!req || !results) return -1;
for (uint32_t i = 0; i < req->opcount; i++) {
uint8_t status = KVS_STATUS_ERROR;
const char *data = NULL;
uint32_t dlen = 0;
kvs_exec_one_op(req->ops[i].op, req->ops[i].argc, req->ops[i].argv,
&status, &data, &dlen);
results[i].status = status;
results[i].datalen = dlen;
results[i].data_ptr = data;
}
return 0;
}
int kvs_build_response(const kvs_request_t *req,
const kvs_response_t *results,
uint8_t *response, int response_cap) {
if (!req || !results || !response || response_cap < KVS_HDR_LEN) return -1;
uint8_t *out = response;
uint8_t *w = out + KVS_HDR_LEN;
const uint8_t *w_end = out + response_cap;
// payload: opcount
if (kvs_write_u32(&w, w_end, req->opcount) < 0) return -1;
for (uint32_t i = 0; i < req->opcount; i++) {
if (kvs_write_u8(&w, w_end, results[i].status) < 0) return -1;
if (kvs_write_u32(&w, w_end, results[i].datalen) < 0) return -1;
if (results[i].datalen > 0) {
if (kvs_need(w, w_end, results[i].datalen) < 0) return -1;
memcpy(w, results[i].data_ptr, results[i].datalen);
w += results[i].datalen;
}
}
uint32_t respPayloadLen = (uint32_t)(w - (out + KVS_HDR_LEN));
uint32_t total = KVS_HDR_LEN + respPayloadLen;
// header
{
uint32_t be = htonl(KVS_MAGIC);
memcpy(out + 0, &be, 4);
}
out[4] = (uint8_t)KVS_TYPE_RESP;
{
uint32_t be = htonl(respPayloadLen);
memcpy(out + 5, &be, 4);
}
{
uint32_t be = htonl(req->reqId);
memcpy(out + 9, &be, 4);
}
out[13] = req->flag1;
{
uint16_t be = htons(req->flag2);
memcpy(out + 14, &be, 2);
}
return (int)total;
}

85
kvs_rw_tools.h Normal file
View File

@@ -0,0 +1,85 @@
#ifndef __KVS_RW_TOOLS_H__
#define __KVS_RW_TOOLS_H__
#include <stdint.h>
#include <stdio.h>
int kvs_need(const uint8_t *p, const uint8_t *end, size_t n);
int kvs_read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out);
int kvs_read_u16(const uint8_t **pp, const uint8_t *end, uint16_t *out);
int kvs_read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out);
int kvs_write_u8(uint8_t **pp, const uint8_t *end, uint8_t v);
int kvs_write_u16(uint8_t **pp, const uint8_t *end, uint16_t v);
int kvs_write_u32(uint8_t **pp, const uint8_t *end, uint32_t v);
/**
* Header: | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) |
*
* Request
* Payload: | opcount(4) | repeat Cmd |
* Cmd: | OP(1) | argc(4) | repeat Arg |
* Arg: | arglen(4) | arg |
*
* Response
* Payload: | opcount(4) | repeat Cmd |
* Cmd: | status(1) | datalen(4) | data |
*/
#define KVS_HDR_LEN 16 // | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) |
#define KVS_MAGIC 0x4B565331u
#define KVS_TYPE_REQ 1
#define KVS_TYPE_RESP 2
#define KVS_MAX_OPCOUNT 1024
#define KVS_MAX_RESPONSE (64 * 1024)
enum {
KVS_STATUS_OK = 0,
KVS_STATUS_ERROR = 1,
KVS_STATUS_NO_EXIST = 2,
KVS_STATUS_EXIST = 3,
KVS_STATUS_BADREQ = 4
};
typedef struct {
uint8_t op;
uint32_t argc;
char **argv;
} kvs_op_t;
typedef struct {
uint32_t magic;
uint8_t type;
uint32_t payloadLen;
uint32_t reqId;
uint8_t flag1;
uint16_t flag2;
// payload ops
uint32_t opcount;
kvs_op_t *ops;
// consumed bytes (header+payload)
uint32_t consumed;
} kvs_request_t;
typedef struct {
uint8_t status;
uint32_t datalen;
const char *data_ptr;
} kvs_response_t;
int kvs_parse_request(const uint8_t *msg, int length, kvs_request_t *req_out);
int kvs_execute_request(const kvs_request_t *req, kvs_response_t *results);
int kvs_build_response(const kvs_request_t *req,
const kvs_response_t *results,
uint8_t *response, int response_cap);
void kvs_free_request(kvs_request_t *req);
void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv,
uint8_t *status_out,
const char **data_out, uint32_t *dlen_out);
#endif

197
kvstore.c
View File

@@ -3,7 +3,7 @@
#include "kvstore.h" #include "kvstore.h"
#include "kvs_rw_tools.h"
#if ENABLE_ARRAY #if ENABLE_ARRAY
extern kvs_array_t global_array; extern kvs_array_t global_array;
@@ -262,7 +262,196 @@ int kvs_filter_protocol(char **tokens, int count, char *response) {
return length; return length;
} }
/**
* 输入op argc argv
* 输出 status
*/
void kvs_exec_one_op(uint8_t op, uint32_t argc, char **argv,
uint8_t *status_out,
const char **data_out, uint32_t *dlen_out) {
*status_out = KVS_STATUS_ERROR;
*data_out = NULL;
*dlen_out = 0;
const char *key = (argc >= 1) ? argv[0] : NULL;
const char *val = (argc >= 2) ? argv[1] : NULL;
// 基本参数校验(按你原有命令语义)
switch (op) {
case KVS_CMD_SET:
case KVS_CMD_MOD:
case KVS_CMD_RSET:
case KVS_CMD_RMOD:
case KVS_CMD_HSET:
case KVS_CMD_HMOD:
if (argc != 2 || !key || !val) { *status_out = KVS_STATUS_BADREQ; return; }
break;
case KVS_CMD_GET:
case KVS_CMD_DEL:
case KVS_CMD_EXIST:
case KVS_CMD_RGET:
case KVS_CMD_RDEL:
case KVS_CMD_REXIST:
case KVS_CMD_HGET:
case KVS_CMD_HDEL:
case KVS_CMD_HEXIST:
if (argc != 1 || !key) { *status_out = KVS_STATUS_BADREQ; return; }
break;
default:
*status_out = KVS_STATUS_BADREQ;
return;
}
int ret = 0;
const char *result = NULL;
switch (op) {
#if ENABLE_ARRAY
case KVS_CMD_SET:
ret = kvs_array_set(&global_array, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_EXIST;
return;
case KVS_CMD_GET:
result = kvs_array_get(&global_array, (char*)key);
if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; }
*status_out = KVS_STATUS_OK;
*data_out = result;
*dlen_out = (uint32_t)strlen(result);
return;
case KVS_CMD_DEL:
ret = kvs_array_del(&global_array, (char*)key);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST;
return;
case KVS_CMD_MOD:
ret = kvs_array_mod(&global_array, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST;
return;
case KVS_CMD_EXIST:
ret = kvs_array_exist(&global_array, (char*)key);
*status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return;
#endif
#if ENABLE_RBTREE
case KVS_CMD_RSET:
ret = kvs_rbtree_set(&global_rbtree, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_EXIST;
return;
case KVS_CMD_RGET:
result = kvs_rbtree_get(&global_rbtree, (char*)key);
if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; }
*status_out = KVS_STATUS_OK;
*data_out = result;
*dlen_out = (uint32_t)strlen(result);
return;
case KVS_CMD_RDEL:
ret = kvs_rbtree_del(&global_rbtree, (char*)key);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST;
return;
case KVS_CMD_RMOD:
ret = kvs_rbtree_mod(&global_rbtree, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST;
return;
case KVS_CMD_REXIST:
ret = kvs_rbtree_exist(&global_rbtree, (char*)key);
*status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return;
#endif
#if ENABLE_HASH
case KVS_CMD_HSET:
ret = kvs_hash_set(&global_hash, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_EXIST;
return;
case KVS_CMD_HGET:
result = kvs_hash_get(&global_hash, (char*)key);
if (!result) { *status_out = KVS_STATUS_NO_EXIST; return; }
*status_out = KVS_STATUS_OK;
*data_out = result;
*dlen_out = (uint32_t)strlen(result);
return;
case KVS_CMD_HDEL:
ret = kvs_hash_del(&global_hash, (char*)key);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST;
return;
case KVS_CMD_HMOD:
ret = kvs_hash_mod(&global_hash, (char*)key, (char*)val);
if (ret < 0) *status_out = KVS_STATUS_ERROR;
else if (ret == 0) *status_out = KVS_STATUS_OK;
else *status_out = KVS_STATUS_NO_EXIST;
return;
case KVS_CMD_HEXIST:
ret = kvs_hash_exist(&global_hash, (char*)key);
*status_out = (ret == 0) ? KVS_STATUS_EXIST : KVS_STATUS_NO_EXIST;
return;
#endif
default:
*status_out = KVS_STATUS_BADREQ;
return;
}
}
#if NEW_KVSTORE
// int kvs_protocol(char *msg, int length, char *response, int *response_len){
int kvs_protocol(char *msg, int length, char *response){
if (!msg || length <= 0 || !response) return 0;
int response_len = 0;
kvs_request_t req;
int consumed = kvs_parse_request((const uint8_t *)msg, length, &req);
if (consumed < 0) return 0; // 解析失败
else if(consumed == 0) return 0; // 半包
kvs_response_t *results = (kvs_response_t *)kvs_malloc(sizeof(kvs_response_t) * req.opcount);
if (!results) { kvs_free_request(&req); return -1; }
memset(results, 0, sizeof(kvs_response_t) * req.opcount);
if (kvs_execute_request(&req, results) < 0) {
kvs_free(results);
kvs_free_request(&req);
return -1;
}
int resp_len = kvs_build_response(&req, results, (uint8_t *)response, KVS_MAX_RESPONSE);
kvs_free(results);
kvs_free_request(&req);
if (resp_len < 0) return 0; // error
response_len = resp_len;
return response_len;
}
#else
/* /*
* msg: request message * msg: request message
* length: length of request message * length: length of request message
@@ -288,7 +477,7 @@ int kvs_protocol(char *msg, int length, char *response) { //
return kvs_filter_protocol(tokens, count, response); return kvs_filter_protocol(tokens, count, response);
} }
#endif
int init_kvengine(void) { int init_kvengine(void) {
@@ -337,9 +526,9 @@ int main(int argc, char *argv[]) {
#if (NETWORK_SELECT == NETWORK_REACTOR) #if (NETWORK_SELECT == NETWORK_REACTOR)
reactor_start(port, kvs_protocol); // reactor_start(port, kvs_protocol); //
#elif (NETWORK_SELECT == NETWORK_PROACTOR) #elif (NETWORK_SELECT == NETWORK_PROACTOR)
ntyco_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
proactor_start(port, kvs_protocol); proactor_start(port, kvs_protocol);
#elif (NETWORK_SELECT == NETWORK_NTYCO)
ntyco_start(port, kvs_protocol);
#endif #endif
dest_kvengine(); dest_kvengine();

View File

@@ -10,15 +10,20 @@
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <stdint.h>
#define NETWORK_REACTOR 0 #define NETWORK_REACTOR 0
#define NETWORK_PROACTOR 1 #define NETWORK_PROACTOR 1
#define NETWORK_NTYCO 2 #define NETWORK_NTYCO 2
#define NETWORK_SELECT NETWORK_PROACTOR #define NETWORK_SELECT NETWORK_REACTOR
// 8MB
#define MAX_PAYLOAD_LEN 8388608
#define MAX_OPCOUNT 1024
// 4MB
#define MAX_ARG_LEN 4194304
#define KVS_MAX_TOKENS 128 #define KVS_MAX_TOKENS 128
@@ -26,6 +31,8 @@
#define ENABLE_RBTREE 1 #define ENABLE_RBTREE 1
#define ENABLE_HASH 1 #define ENABLE_HASH 1
#define NEW_KVSTORE 1
typedef int (*msg_handler)(char *msg, int length, char *response); typedef int (*msg_handler)(char *msg, int length, char *response);

View File

@@ -170,7 +170,8 @@ int proactor_start(unsigned short port, msg_handler handler) {
set_event_send(&ring, result.fd, response, ret, 0); set_event_send(&ring, result.fd, response, ret, 0);
} }
} else if (result.event == EVENT_WRITE) { } else if (result.event == EVENT_WRITE) {
//
int ret = entries->res; int ret = entries->res;
//printf("set_event_send ret: %d, %s\n", ret, buffer); //printf("set_event_send ret: %d, %s\n", ret, buffer);

BIN
testcase2 Executable file

Binary file not shown.

299
testcase2.c Normal file
View File

@@ -0,0 +1,299 @@
// kvs_client_min.c
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define KVS_MAGIC_U32 0x4B565331u // 'KVS1'
#define KVS_HDR_LEN 16
#define KVS_TYPE_REQ 1
#define KVS_TYPE_RESP 2
// OP mapping (match your server enum)
#define OP_SET 0
#define OP_GET 1
#define OP_DEL 2
#define OP_MOD 3
#define OP_EXIST 4
static int write_u8(uint8_t **pp, uint8_t *end, uint8_t v) {
if (*pp + 1 > end) return -1;
**pp = v;
(*pp)++;
return 0;
}
static int write_u32(uint8_t **pp, uint8_t *end, uint32_t v) {
if (*pp + 4 > end) return -1;
uint32_t be = htonl(v);
memcpy(*pp, &be, 4);
(*pp) += 4;
return 0;
}
static int write_bytes(uint8_t **pp, uint8_t *end, const void *buf, size_t n) {
if (*pp + n > end) return -1;
memcpy(*pp, buf, n);
(*pp) += n;
return 0;
}
static int read_u8(const uint8_t **pp, const uint8_t *end, uint8_t *out) {
if (*pp + 1 > end) return -1;
*out = **pp;
(*pp)++;
return 0;
}
static int read_u32(const uint8_t **pp, const uint8_t *end, uint32_t *out) {
if (*pp + 4 > end) return -1;
uint32_t be;
memcpy(&be, *pp, 4);
*out = ntohl(be);
(*pp) += 4;
return 0;
}
static ssize_t send_all(int fd, const void *buf, size_t len) {
const uint8_t *p = (const uint8_t*)buf;
size_t sent = 0;
while (sent < len) {
ssize_t n = send(fd, p + sent, len - sent, 0);
if (n < 0) {
if (errno == EINTR) continue;
return -1;
}
if (n == 0) return -1;
sent += (size_t)n;
}
return (ssize_t)sent;
}
static ssize_t recv_all(int fd, void *buf, size_t len) {
uint8_t *p = (uint8_t*)buf;
size_t recvd = 0;
while (recvd < len) {
ssize_t n = recv(fd, p + recvd, len - recvd, 0);
if (n < 0) {
if (errno == EINTR) continue;
return -1;
}
if (n == 0) return -1;
recvd += (size_t)n;
}
return (ssize_t)recvd;
}
// Build one request packet: header + payload(opcount + cmds...)
static int build_req_packet(uint32_t reqId,
uint8_t opcount,
uint8_t op,
int argc, const char *argv[],
uint8_t *out, size_t out_cap, size_t *out_len) {
if (!out || out_cap < KVS_HDR_LEN || !out_len) return -1;
uint8_t *w = out + KVS_HDR_LEN;
uint8_t *end = out + out_cap;
// payload: opcount(4)
if (write_u32(&w, end, (uint32_t)opcount) < 0) return -1;
// one cmd (repeat if you want multiple, keep minimal here)
if (write_u8(&w, end, op) < 0) return -1;
if (write_u32(&w, end, (uint32_t)argc) < 0) return -1;
for (int i = 0; i < argc; i++) {
uint32_t alen = (uint32_t)strlen(argv[i]);
if (write_u32(&w, end, alen) < 0) return -1;
if (write_bytes(&w, end, argv[i], alen) < 0) return -1;
}
uint32_t payloadLen = (uint32_t)(w - (out + KVS_HDR_LEN));
uint32_t magic_be = htonl(KVS_MAGIC_U32);
uint32_t payload_be = htonl(payloadLen);
uint32_t reqid_be = htonl(reqId);
// header layout:
// | magic(4) | type(1) | payloadLen(4) | reqId(4) | flag1(1) | flag2(2) |
memcpy(out + 0, &magic_be, 4);
out[4] = KVS_TYPE_REQ;
memcpy(out + 5, &payload_be, 4);
memcpy(out + 9, &reqid_be, 4);
out[13] = 0; // flag1
out[14] = 0; out[15] = 0; // flag2
*out_len = (size_t)(KVS_HDR_LEN + payloadLen);
return 0;
}
/**
* KVS_STATUS_OK = 0,
KVS_STATUS_ERROR = 1,
KVS_STATUS_NO_EXIST = 2,
KVS_STATUS_EXIST = 3,
KVS_STATUS_BADREQ = 4
*/
const char *rsp_status[5] = {"OK", "ERROR", "NO_EXIST", "EXIST", "ERROR"};
static int recv_and_print_resp(int fd) {
uint8_t hdr[KVS_HDR_LEN];
if (recv_all(fd, hdr, KVS_HDR_LEN) < 0) {
perror("recv header");
return -1;
}
uint32_t magic_be, payload_be, reqid_be;
memcpy(&magic_be, hdr + 0, 4);
memcpy(&payload_be, hdr + 5, 4);
memcpy(&reqid_be, hdr + 9, 4);
uint32_t magic = ntohl(magic_be);
uint8_t type = hdr[4];
uint32_t payloadLen = ntohl(payload_be);
uint32_t reqId = ntohl(reqid_be);
if (magic != KVS_MAGIC_U32 || type != KVS_TYPE_RESP) {
fprintf(stderr, "bad response header: magic=0x%x type=%u\n", magic, type);
return -1;
}
uint8_t *payload = (uint8_t*)malloc(payloadLen);
if (!payload) return -1;
if (payloadLen > 0) {
if (recv_all(fd, payload, payloadLen) < 0) {
perror("recv payload");
free(payload);
return -1;
}
}
// parse response payload:
// | opcount(4) | repeat Cmd |
// Cmd: | status(1) | datalen(4) | data |
const uint8_t *p = payload;
const uint8_t *end = payload + payloadLen;
uint32_t opcount = 0;
if (read_u32(&p, end, &opcount) < 0) {
fprintf(stderr, "resp parse failed\n");
free(payload);
return -1;
}
printf("RESP reqId=%u opcount=%u\n", reqId, opcount);
for (uint32_t i = 0; i < opcount; i++) {
uint8_t status = 0;
uint32_t dlen = 0;
if (read_u8(&p, end, &status) < 0) goto bad;
if (read_u32(&p, end, &dlen) < 0) goto bad;
if (p + dlen > end) goto bad;
printf(" op[%u]: status=%s datalen=%u", i, rsp_status[status], dlen);
if (dlen > 0) {
// printf(" data=\"%.*s\"", (int)dlen, (const char*)p);
printf(" data=\"");
for (int i = 0; i < dlen; i++) {
printf("%02X", (unsigned char)p[i]);
if (i + 1 < dlen)
printf(" ");
}
printf("\"");
}
printf("\n");
p += dlen;
}
free(payload);
return 0;
bad:
fprintf(stderr, "resp parse failed (truncated)\n");
free(payload);
return -1;
}
uint8_t buf[4096];
size_t pkt_len = 0;
uint32_t reqId = 1;
int fd;
int testcase(int op, const char *args[], int argnum){
if (build_req_packet(reqId++, 1, op, argnum, args, buf, sizeof(buf), &pkt_len) < 0) {
fprintf(stderr, "build failed\n");
close(fd);
return 1;
}
if (send_all(fd, buf, pkt_len) < 0) { perror("send SET"); close(fd); return 1; }
if (recv_and_print_resp(fd) < 0) { close(fd); return 1; }
return 0;
}
#define BUF_CAP 65536
int main(int argc, char **argv) {
if (argc != 3) {
fprintf(stderr, "Usage: %s <ip> <port>\n", argv[0]);
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) { perror("socket"); return 1; }
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons((uint16_t)port);
if (inet_pton(AF_INET, ip, &addr.sin_addr) != 1) {
fprintf(stderr, "bad ip\n");
close(fd);
return 1;
}
if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("connect");
close(fd);
return 1;
}
size_t pkt_len = 0;
uint32_t reqId = 1;
// ---- SET key value ----
{
const char *args[] = {"foo", "bar"};
printf("SET ");
for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) {
printf("%02X ", *p);
}
printf("\n");
testcase(OP_SET, args, 2);
}
// ---- GET key ----
{
const char *args[] = {"foo"};
testcase(OP_GET, args, 1);
}
// ---- SET key ----
{
const char *args[] = {"text", "\r\n\0aaaa\r\n\0"};
printf("SET ");
for (const unsigned char *p = (const unsigned char *)args[1]; *p; p++) {
printf("%02X ", *p);
}
printf("\n");
testcase(OP_SET, args, 2);
}
// ---- GET key ----
{
const char *args[] = {"text"};
testcase(OP_GET, args, 1);
}
close(fd);
return 0;
}