#include "kvstore.h" #include "kvs_rw_tools.h" #include "memory/alloc_dispatch.h" #include "diskuring/diskuring.h" #include #include #include #define KVS_HASH_INITIAL_GLOBAL_DEPTH 1u #define KVS_HASH_BUCKET_SPLIT_THRESHOLD 64u #define KVS_HASH_MAX_GLOBAL_DEPTH 31u kvs_hash_t global_hash; static uint32_t _hash_u32(const void *key, uint32_t key_len) { const uint8_t *p = (const uint8_t *)key; uint32_t hash = 2166136261u; for (uint32_t i = 0; i < key_len; i++) { hash ^= p[i]; hash *= 16777619u; } return hash; } static inline uint32_t _dir_index(uint32_t hashv, uint32_t global_depth) { if (global_depth == 0) { return 0; } return hashv & ((1u << global_depth) - 1u); } static hashbucket_t *_bucket_create(uint32_t local_depth) { hashbucket_t *bucket = (hashbucket_t *)kvs_malloc(sizeof(hashbucket_t)); if (!bucket) { return NULL; } bucket->head = NULL; bucket->local_depth = local_depth; bucket->item_count = 0; bucket->next_all = NULL; return bucket; } static void _bucket_list_push(kvs_hash_t *hash, hashbucket_t *bucket) { bucket->next_all = hash->bucket_list; hash->bucket_list = bucket; } static inline uint8_t *_get_node_key(const hashnode_t *node) { return (uint8_t *)node + sizeof(hashnode_t); } static inline uint8_t *_get_node_value(const hashnode_t *node) { return (uint8_t *)node + sizeof(hashnode_t) + node->key_len; } static int _key_equal(const hashnode_t *node, const void *key, uint32_t key_len) { if (!node || !key) { return 0; } if (node->key_len != key_len) { return 0; } return memcmp(_get_node_key(node), key, key_len) == 0; } static hashnode_t *_create_node(const void *key, uint32_t key_len, const void *value, uint32_t value_len) { size_t total_size = sizeof(hashnode_t) + key_len + value_len; hashnode_t *node; uint8_t *data_ptr; if (!key || key_len == 0) { return NULL; } node = (hashnode_t *)kvs_malloc(total_size); if (!node) { return NULL; } memset(node, 0, sizeof(hashnode_t)); node->key_len = key_len; node->value_len = value_len; node->next = NULL; data_ptr = (uint8_t *)node + sizeof(hashnode_t); memcpy(data_ptr, key, key_len); if (value_len > 0 && value) { memcpy(data_ptr + key_len, value, value_len); } return node; } static hashnode_t *_find_node(hashbucket_t *bucket, const void *key, uint32_t key_len, hashnode_t ***out_indirect) { hashnode_t **indirect; if (out_indirect) { *out_indirect = NULL; } if (!bucket || !key || key_len == 0) { return NULL; } indirect = &bucket->head; while (*indirect) { if (_key_equal(*indirect, key, key_len)) { if (out_indirect) { *out_indirect = indirect; } return *indirect; } indirect = &(*indirect)->next; } return NULL; } static int _update_node_value(hashnode_t **node_ptr, const void *value, uint32_t value_len) { hashnode_t *old_node; hashnode_t *new_node; if (!node_ptr || !*node_ptr) { return -1; } if (value_len > 0 && !value) { return -1; } old_node = *node_ptr; if (old_node->value_len == value_len) { if (value_len > 0) { memcpy(_get_node_value(old_node), value, value_len); } return 0; } new_node = _create_node(_get_node_key(old_node), old_node->key_len, value, value_len); if (!new_node) { return -1; } new_node->next = old_node->next; *node_ptr = new_node; kvs_free(old_node); return 0; } static int _double_directory(kvs_hash_t *hash) { uint32_t old_size; uint32_t new_size; hashbucket_t **new_dir; uint32_t i; if (!hash || !hash->directory) { return -1; } if (hash->global_depth >= KVS_HASH_MAX_GLOBAL_DEPTH) { return 1; } old_size = hash->dir_size; if (old_size == 0 || old_size > UINT32_MAX / 2u) { return 1; } new_size = old_size << 1; new_dir = (hashbucket_t **)kvs_malloc(sizeof(hashbucket_t *) * new_size); if (!new_dir) { return 1; } for (i = 0; i < old_size; i++) { new_dir[i] = hash->directory[i]; new_dir[i + old_size] = hash->directory[i]; } kvs_free(hash->directory); hash->directory = new_dir; hash->dir_size = new_size; hash->global_depth++; return 0; } static int _split_bucket(kvs_hash_t *hash, uint32_t dir_idx) { hashbucket_t *old_bucket; hashbucket_t *new_bucket; hashnode_t *node; uint32_t split_bit; uint32_t i; if (!hash || !hash->directory || dir_idx >= hash->dir_size) { return -1; } old_bucket = hash->directory[dir_idx]; if (!old_bucket) { return -1; } if (old_bucket->local_depth >= KVS_HASH_MAX_GLOBAL_DEPTH) { return 1; } if (old_bucket->local_depth == hash->global_depth) { int rc = _double_directory(hash); if (rc != 0) { return rc; } } new_bucket = _bucket_create(old_bucket->local_depth + 1); if (!new_bucket) { return -1; } _bucket_list_push(hash, new_bucket); old_bucket->local_depth++; split_bit = 1u << (old_bucket->local_depth - 1u); for (i = 0; i < hash->dir_size; i++) { if (hash->directory[i] == old_bucket && (i & split_bit)) { hash->directory[i] = new_bucket; } } node = old_bucket->head; old_bucket->head = NULL; old_bucket->item_count = 0; while (node) { hashnode_t *next = node->next; uint32_t idx = _dir_index(_hash_u32(_get_node_key(node), node->key_len), hash->global_depth); hashbucket_t *target = hash->directory[idx]; node->next = target->head; target->head = node; target->item_count++; node = next; } return 0; } int kvs_hash_create(kvs_hash_t *hash) { uint32_t init_depth; uint32_t i; hashbucket_t *initial_bucket; if (!hash) { return -1; } memset(hash, 0, sizeof(*hash)); init_depth = KVS_HASH_INITIAL_GLOBAL_DEPTH; if (init_depth > KVS_HASH_MAX_GLOBAL_DEPTH) { init_depth = KVS_HASH_MAX_GLOBAL_DEPTH; } hash->global_depth = init_depth; hash->dir_size = 1u << init_depth; if (hash->dir_size == 0) { return -1; } hash->directory = (hashbucket_t **)kvs_malloc(sizeof(hashbucket_t *) * hash->dir_size); if (!hash->directory) { return -1; } memset(hash->directory, 0, sizeof(hashbucket_t *) * hash->dir_size); initial_bucket = _bucket_create(0); if (!initial_bucket) { kvs_free(hash->directory); hash->directory = NULL; hash->dir_size = 0; return -1; } _bucket_list_push(hash, initial_bucket); for (i = 0; i < hash->dir_size; i++) { hash->directory[i] = initial_bucket; } hash->count = 0; return 0; } void kvs_hash_destroy(kvs_hash_t *hash) { hashbucket_t *bucket; if (!hash) { return; } bucket = hash->bucket_list; while (bucket) { hashbucket_t *next_bucket = bucket->next_all; hashnode_t *node = bucket->head; while (node) { hashnode_t *next = node->next; kvs_free(node); node = next; } kvs_free(bucket); bucket = next_bucket; } kvs_free(hash->directory); memset(hash, 0, sizeof(*hash)); } int kvs_hash_set_bin(kvs_hash_t *hash, const void *key, uint32_t key_len, const void *value, uint32_t value_len) { uint32_t hashv; uint32_t idx; hashbucket_t *bucket; hashnode_t **indirect = NULL; hashnode_t *found; hashnode_t *new_node; if (!hash || !hash->directory || !key || key_len == 0) { return -1; } if (value_len > 0 && !value) { return -1; } hashv = _hash_u32(key, key_len); idx = _dir_index(hashv, hash->global_depth); bucket = hash->directory[idx]; if (!bucket) { return -1; } found = _find_node(bucket, key, key_len, &indirect); if (found) { return (_update_node_value(indirect, value, value_len) == 0) ? 0 : -2; } while (bucket->item_count >= KVS_HASH_BUCKET_SPLIT_THRESHOLD) { int split_rc = _split_bucket(hash, idx); if (split_rc < 0) { return -2; } if (split_rc > 0) { break; } idx = _dir_index(hashv, hash->global_depth); bucket = hash->directory[idx]; } new_node = _create_node(key, key_len, value, value_len); if (!new_node) { return -2; } new_node->next = bucket->head; bucket->head = new_node; bucket->item_count++; hash->count++; return 0; } void *kvs_hash_get_bin(kvs_hash_t *hash, const void *key, uint32_t key_len, uint32_t *out_value_len) { uint32_t idx; hashbucket_t *bucket; hashnode_t *node; if (!out_value_len) { return NULL; } *out_value_len = 0; if (!hash || !hash->directory || !key || key_len == 0) { return NULL; } idx = _dir_index(_hash_u32(key, key_len), hash->global_depth); bucket = hash->directory[idx]; if (!bucket) { return NULL; } node = _find_node(bucket, key, key_len, NULL); if (!node) { return NULL; } *out_value_len = node->value_len; return (node->value_len > 0) ? _get_node_value(node) : NULL; } int kvs_hash_get_copy_bin(kvs_hash_t *hash, const void *key, uint32_t key_len, void **out_buf, uint32_t *out_len) { uint32_t idx; hashbucket_t *bucket; hashnode_t *node; void *copy; if (!out_buf || !out_len) { return -1; } *out_buf = NULL; *out_len = 0; if (!hash || !hash->directory || !key || key_len == 0) { return -1; } idx = _dir_index(_hash_u32(key, key_len), hash->global_depth); bucket = hash->directory[idx]; if (!bucket) { return -1; } node = _find_node(bucket, key, key_len, NULL); if (!node) { return 1; } *out_len = node->value_len; if (node->value_len == 0) { return 0; } copy = kvs_malloc(node->value_len); if (!copy) { *out_len = 0; return -2; } memcpy(copy, _get_node_value(node), node->value_len); *out_buf = copy; return 0; } int kvs_hash_mod_bin(kvs_hash_t *hash, const void *key, uint32_t key_len, const void *value, uint32_t value_len) { uint32_t idx; hashbucket_t *bucket; hashnode_t **indirect = NULL; hashnode_t *node; if (!hash || !hash->directory || !key || key_len == 0) { return -1; } if (value_len > 0 && !value) { return -1; } idx = _dir_index(_hash_u32(key, key_len), hash->global_depth); bucket = hash->directory[idx]; if (!bucket) { return -1; } node = _find_node(bucket, key, key_len, &indirect); if (!node) { return 1; } return (_update_node_value(indirect, value, value_len) == 0) ? 0 : -2; } int kvs_hash_del_bin(kvs_hash_t *hash, const void *key, uint32_t key_len) { uint32_t idx; hashbucket_t *bucket; hashnode_t **indirect = NULL; hashnode_t *node; if (!hash || !hash->directory || !key || key_len == 0) { return -1; } idx = _dir_index(_hash_u32(key, key_len), hash->global_depth); bucket = hash->directory[idx]; if (!bucket) { return -1; } node = _find_node(bucket, key, key_len, &indirect); if (!node) { return 1; } *indirect = node->next; kvs_free(node); bucket->item_count--; hash->count--; return 0; } int kvs_hash_exist_bin(kvs_hash_t *hash, const void *key, uint32_t key_len) { uint32_t idx; hashbucket_t *bucket; if (!hash || !hash->directory || !key || key_len == 0) { return -1; } idx = _dir_index(_hash_u32(key, key_len), hash->global_depth); bucket = hash->directory[idx]; if (!bucket) { return 1; } return _find_node(bucket, key, key_len, NULL) ? 0 : 1; } int kvs_hash_count(kvs_hash_t *hash) { return hash ? hash->count : 0; } int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char *filename) { int fd; off_t current_off = 0; int rc = 0; hashbucket_t *bucket; if (!uring || !inst || !filename) { return -1; } fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); if (fd < 0) { return -2; } for (bucket = inst->bucket_list; bucket != NULL; bucket = bucket->next_all) { hashnode_t *n = bucket->head; while (n != NULL) { uint32_t klen = htonl(n->key_len); uint32_t vlen = htonl(n->value_len); uint8_t *key_ptr = _get_node_key(n); uint8_t *val_ptr = (n->value_len > 0) ? _get_node_value(n) : NULL; void *bufs[4]; size_t lens[4]; int count = 0; size_t total = 0; task_t *t; bufs[count] = &klen; lens[count++] = sizeof(klen); bufs[count] = &vlen; lens[count++] = sizeof(vlen); if (n->key_len > 0) { bufs[count] = key_ptr; lens[count++] = n->key_len; } if (n->value_len > 0) { bufs[count] = val_ptr; lens[count++] = n->value_len; } for (int j = 0; j < count; j++) { total += lens[j]; } t = submit_write(uring, fd, bufs, lens, count, current_off); if (!t) { rc = -3; goto done; } cleanup_finished_iouring_tasks(uring); current_off += (off_t)total; n = n->next; } } done: while (!uring_task_complete(uring)) { usleep(1000); cleanup_finished_iouring_tasks(uring); } close(fd); return rc; } int kvs_hash_load(kvs_hash_t *inst, const char *filename) { int fd; int rc = 0; if (!inst || !filename) { return -1; } if (!inst->directory || inst->dir_size == 0) { return -1; } fd = open(filename, O_RDONLY); if (fd < 0) { return -2; } while (1) { uint32_t klen_n = 0; uint32_t vlen_n = 0; uint32_t klen; uint32_t vlen; uint8_t *keybuf = NULL; uint8_t *valbuf = NULL; int rr; rr = read_full(fd, &klen_n, sizeof(klen_n)); if (rr == 0) { rc = 0; break; } if (rr < 0) { rc = -3; break; } rr = read_full(fd, &vlen_n, sizeof(vlen_n)); if (rr <= 0) { rc = -3; break; } klen = ntohl(klen_n); vlen = ntohl(vlen_n); if (klen == 0) { rc = -3; break; } keybuf = (uint8_t *)kvs_malloc((size_t)klen); if (!keybuf) { rc = -4; break; } rr = read_full(fd, keybuf, (size_t)klen); if (rr <= 0) { kvs_free(keybuf); rc = -3; break; } if (vlen > 0) { valbuf = (uint8_t *)kvs_malloc((size_t)vlen); if (!valbuf) { kvs_free(keybuf); rc = -4; break; } rr = read_full(fd, valbuf, (size_t)vlen); if (rr <= 0) { kvs_free(valbuf); kvs_free(keybuf); rc = -3; break; } } rr = kvs_hash_set_bin(inst, keybuf, klen, valbuf, vlen); kvs_free(keybuf); if (valbuf) { kvs_free(valbuf); } if (rr < 0) { rc = -5; break; } } close(fd); return rc; }