From 2ec61bdf85b002e91121f1f800138300599a0b1f Mon Sep 17 00:00:00 2001 From: 1iaan Date: Tue, 3 Mar 2026 12:56:07 +0000 Subject: [PATCH] n*spsc uring_pool --- config/config.xml | 2 +- diskuring/diskuring.c | 825 +++++++++++++++++++++------------ diskuring/diskuring.c.nothread | 199 -------- diskuring/diskuring.h | 59 ++- diskuring/diskuring.h.nothread | 50 -- kvs_array_bin.c | 4 +- kvs_hash_bin.c | 6 +- kvs_rbtree_bin.c | 6 +- kvstore.c | 212 +++++++-- 9 files changed, 747 insertions(+), 616 deletions(-) delete mode 100644 diskuring/diskuring.c.nothread delete mode 100644 diskuring/diskuring.h.nothread diff --git a/config/config.xml b/config/config.xml index 4df1944..47db930 100644 --- a/config/config.xml +++ b/config/config.xml @@ -18,7 +18,7 @@ - none + incremental data kvs_oplog.db diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index b4ad9be..dacc969 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -1,37 +1,320 @@ #include "diskuring.h" #include "memory/alloc_dispatch.h" -#include + +#include +#include +#include +#include #include -static destroy_queue_t g_destroy_queue = {ATOMIC_VAR_INIT(NULL)}; -static destroy_queue_t g_submit_queue = {ATOMIC_VAR_INIT(NULL)}; +#define IOURING_MAX_WORKERS 16 +#define IOURING_MIN_ENTRIES_PER_WORKER 128u +#define IOURING_SPSC_MIN_CAP 1024u +#define IOURING_SUBMIT_BATCH 256 -static long long push_to_queue = 0; -static long long push_to_sqe = 0; -static long long get_from_cqe = 0; -static long long release_cnt = 0; +extern void sync_wakeup(); -void task_init(task_t *t) -{ - push_to_queue ++; +static int parse_env_int(const char *name, int defv, int minv, int maxv) { + const char *v = getenv(name); + char *end = NULL; + long n = 0; + if (!v || !*v) { + return defv; + } + errno = 0; + n = strtol(v, &end, 10); + if (errno != 0 || !end || *end != '\0') { + return defv; + } + if (n < minv) { + return minv; + } + if (n > maxv) { + return maxv; + } + return (int)n; +} + +static int default_worker_nr(void) { + int n = 3; + return parse_env_int("KVS_URING_WORKERS", n, 1, IOURING_MAX_WORKERS); +} + +static inline uint32_t spsc_next(const spsc_queue_t *q, uint32_t idx) { + idx++; + if (idx >= q->cap) { + idx = 0; + } + return idx; +} + +static int spsc_queue_init(spsc_queue_t *q, uint32_t cap) { + if (!q || cap < 2) { + return -1; + } + q->slots = (task_t **)calloc(cap, sizeof(task_t *)); + if (!q->slots) { + return -1; + } + q->cap = cap; + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); + atomic_init(&q->size, 0); + return 0; +} + +static void spsc_queue_destroy(spsc_queue_t *q) { + if (!q) { + return; + } + free(q->slots); + q->slots = NULL; + q->cap = 0; +} + +static int spsc_try_push(spsc_queue_t *q, task_t *t, int *need_notify) { + uint32_t tail; + uint32_t next; + uint32_t head; + uint32_t prev_size; + if (!q || !t) { + return -1; + } + if (need_notify) { + *need_notify = 0; + } + tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + next = spsc_next(q, tail); + head = atomic_load_explicit(&q->head, memory_order_acquire); + if (next == head) { + return -1; + } + q->slots[tail] = t; + atomic_store_explicit(&q->tail, next, memory_order_release); + prev_size = atomic_fetch_add_explicit(&q->size, 1, memory_order_release); + if (need_notify && prev_size == 0) { + *need_notify = 1; + } + return 0; +} + +static task_t *spsc_try_pop(spsc_queue_t *q) { + uint32_t head; + uint32_t tail; + task_t *t; + if (!q) { + return NULL; + } + head = atomic_load_explicit(&q->head, memory_order_relaxed); + tail = atomic_load_explicit(&q->tail, memory_order_acquire); + if (head == tail) { + return NULL; + } + t = q->slots[head]; + q->slots[head] = NULL; + atomic_store_explicit(&q->head, spsc_next(q, head), memory_order_release); + atomic_fetch_sub_explicit(&q->size, 1, memory_order_release); + return t; +} + +static int spsc_empty(spsc_queue_t *q) { + return atomic_load_explicit(&q->size, memory_order_acquire) == 0; +} + +static void destroy_queue_push(iouring_ctx_t *ctx, task_t *t) { + task_t *old_head; + do { + old_head = atomic_load_explicit(&ctx->destroy_queue.head, memory_order_relaxed); + t->next = old_head; + } while (!atomic_compare_exchange_weak_explicit( + &ctx->destroy_queue.head, &old_head, t, memory_order_release, memory_order_relaxed)); +} + +static task_t *destroy_queue_steal_all(iouring_ctx_t *ctx) { + return atomic_exchange_explicit(&ctx->destroy_queue.head, NULL, memory_order_acquire); +} + +static void worker_notify(iouring_worker_t *w) { + uint64_t one = 1; + while (1) { + ssize_t n = write(w->event_fd, &one, sizeof(one)); + if (n == (ssize_t)sizeof(one)) { + return; + } + if (n < 0 && errno == EINTR) { + continue; + } + if (n < 0 && errno == EAGAIN) { + return; + } + return; + } +} + +static void worker_wait_event(iouring_worker_t *w) { + uint64_t v; + while (1) { + ssize_t n = read(w->event_fd, &v, sizeof(v)); + if (n == (ssize_t)sizeof(v)) { + return; + } + if (n < 0 && errno == EINTR) { + continue; + } + return; + } +} + +static void worker_collect_cq(iouring_worker_t *w, int *completed) { + while (1) { + struct io_uring_cqe *cqe = NULL; + int rc = io_uring_peek_cqe(&w->ring, &cqe); + if (rc < 0 || !cqe) { + break; + } + + if (cqe->user_data != 0) { + task_t *done = (task_t *)(uintptr_t)cqe->user_data; + atomic_fetch_sub_explicit(&w->in_flight, 1, memory_order_relaxed); + task_finish(done, cqe->res); + if (cqe->res < 0) { + fprintf(stderr, "write fail: wid=%d fd=%d res=%d off=%ld\n", + w->worker_id, done->fd, cqe->res, (long)done->off); + } + destroy_queue_push(w->parent, done); + (*completed)++; + } + + io_uring_cqe_seen(&w->ring, cqe); + } +} + +static void *worker_main(void *arg) { + iouring_worker_t *w = (iouring_worker_t *)arg; + iouring_ctx_t *ctx = w->parent; + task_t *local_head = NULL; + task_t *local_tail = NULL; + + while (1) { + int completed = 0; + int prepared = 0; + bool stop = atomic_load_explicit(&ctx->stop, memory_order_acquire) != 0; + + if ((*w->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) != 0) { + fprintf(stderr, "FATAL: CQ overflow on worker %d\n", w->worker_id); + abort(); + } + + worker_collect_cq(w, &completed); + if (completed > 0) { + sync_wakeup(); + } + + while (prepared < IOURING_SUBMIT_BATCH) { + task_t *t = NULL; + struct io_uring_sqe *sqe = NULL; + + if (atomic_load_explicit(&w->in_flight, memory_order_relaxed) >= w->max_in_flight) { + break; + } + + if (local_head) { + t = local_head; + local_head = local_head->next; + if (!local_head) { + local_tail = NULL; + } + t->next = NULL; + } else { + t = spsc_try_pop(&w->submit_q); + } + + if (!t) { + break; + } + + sqe = io_uring_get_sqe(&w->ring); + if (!sqe) { + if (local_tail) { + local_tail->next = t; + local_tail = t; + } else { + local_head = t; + local_tail = t; + } + break; + } + + io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); + sqe->user_data = (uint64_t)(uintptr_t)t; + prepared++; + } + + if (prepared > 0) { + int submitted = io_uring_submit(&w->ring); + if (submitted < 0) { + if (submitted != -EINTR && submitted != -EAGAIN) { + fprintf(stderr, "io_uring_submit worker=%d ret=%d\n", w->worker_id, submitted); + } + } else if (submitted > 0) { + atomic_fetch_add_explicit(&w->in_flight, submitted, memory_order_relaxed); + continue; + } + } + + if (stop && + atomic_load_explicit(&w->in_flight, memory_order_relaxed) == 0 && + spsc_empty(&w->submit_q) && + local_head == NULL) { + break; + } + + if (atomic_load_explicit(&w->in_flight, memory_order_relaxed) > 0) { + io_uring_submit_and_wait(&w->ring, 1); + continue; + } + + if (!spsc_empty(&w->submit_q) || local_head) { + continue; + } + + worker_wait_event(w); + } + + return NULL; +} + +void task_init(task_t *t) { + if (!t) { + return; + } t->done = 0; t->res = 0; t->next = NULL; } -void task_finish(task_t *t, int res) -{ +void task_finish(task_t *t, int res) { + if (!t) { + return; + } t->res = res; - t->done = 1; + atomic_store_explicit(&t->done, 1, memory_order_release); } -int task_wait(task_t *t) -{ - +int task_wait(task_t *t) { + if (!t) { + return -EINVAL; + } + while (atomic_load_explicit(&t->done, memory_order_acquire) == 0) { + sched_yield(); + } + return t->res; } -void task_destroy(task_t *t) -{ +void task_destroy(task_t *t) { + if (!t) { + return; + } if (t->iovs) { for (int i = 0; i < t->iovcnt; i++) { @@ -45,321 +328,273 @@ void task_destroy(task_t *t) kvs_free(t); } -static void submit_queue_push(iouring_ctx_t *ctx, task_t *t) -{ - task_t *old_head; - do { - old_head = atomic_load_explicit(&g_submit_queue.head, memory_order_relaxed); - t->next = old_head; - } while (!atomic_compare_exchange_weak_explicit( - &g_submit_queue.head, &old_head, t, - memory_order_release, - memory_order_relaxed)); +static int init_worker(iouring_ctx_t *ctx, iouring_worker_t *w, int worker_id, unsigned entries) { + struct io_uring_params params; + unsigned cq_size = 0; + uint32_t spsc_cap = 0; + int ret = 0; - /* 若之前队列为空,通知 worker */ - if (old_head == NULL) { - uint64_t val = 1; - write(ctx->event_fd, &val, sizeof(val)); - } -} + memset(w, 0, sizeof(*w)); + w->worker_id = worker_id; + w->parent = ctx; + atomic_init(&w->in_flight, 0); -static task_t *submit_steal_all(iouring_ctx_t *ctx) -{ - return atomic_exchange_explicit(&g_submit_queue.head, NULL, - memory_order_acquire); -} - -static void submit_queue_putback(iouring_ctx_t *ctx, task_t *head) -{ - while (head) { - task_t *nxt = head->next; - submit_queue_push(ctx, head); - head = nxt; - } -} - -static int submit_queue_empty(iouring_ctx_t *ctx) -{ - return atomic_load_explicit(&g_submit_queue.head, - memory_order_acquire) == NULL; -} - -static void destroy_queue_push(task_t *t) -{ - task_t *old_head; - do { - old_head = atomic_load_explicit(&g_destroy_queue.head, memory_order_relaxed); - t->next = old_head; - } while (!atomic_compare_exchange_strong_explicit( - &g_destroy_queue.head, - &old_head, - t, - memory_order_release, - memory_order_relaxed)); -} - -static task_t *destroy_queue_steal_all(void) -{ - return atomic_exchange_explicit(&g_destroy_queue.head, NULL, memory_order_acquire); -} - -/* =============================================================================================== */ - -extern void sync_wakeup(); -static void *worker_main(void *arg) -{ - iouring_ctx_t *ctx = (iouring_ctx_t *)arg; - const int BATCH_SIZE = 512; // 每次最多准备这么多,防止一次占满 SQ - - while (!ctx->stop) - { - int cq_count = 0; - - // ========== 1. 收割 CQE ========== - // 检查溢出 - if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { - fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n"); - abort(); - } - - while (true) { - struct io_uring_cqe *cqe; - unsigned head; - - io_uring_for_each_cqe(&ctx->ring, head, cqe) { - if (cqe->user_data == 0) { - continue; - } - task_t *done = (task_t *)(uintptr_t)cqe->user_data; - atomic_fetch_sub(&ctx->in_flight, 1); - task_finish(done, cqe->res); - - if (cqe->res < 0) { - fprintf(stderr, "write fail: fd=%d res=%d, offset=%ld\n", done->fd, cqe->res, done->off); - } - - destroy_queue_push(done); - - get_from_cqe++; - cq_count++; - } - - if (cq_count > 0) { - io_uring_cq_advance(&ctx->ring, cq_count); - sync_wakeup(); - } - - if (cq_count == 0) break; - cq_count = 0; - } - - - - // ========== 2. 批量准备 SQE ========== - int batch_count = 0; - task_t *pending = submit_steal_all(ctx); - while (pending) { - if (atomic_load(&ctx->in_flight) >= ctx->max_in_flight) { - submit_queue_putback(ctx, pending); - pending = NULL; - break; // 满了,停止取任务 - } - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); - if (!sqe) { - submit_queue_putback(ctx, pending); - pending = NULL; - break; - } - - task_t *t = pending; - pending = pending->next; - t->next = NULL; - - io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); - sqe->user_data = (uint64_t)(uintptr_t)t; - batch_count++; - } - - // ========== 3. 提交 ========== - if (batch_count > 0) { - int submitted = io_uring_submit(&ctx->ring); - push_to_sqe += submitted; - atomic_fetch_add(&ctx->in_flight, submitted); - continue; - } - - - // ========== 4. 没事做就等待 ========== - int inflight = atomic_load(&ctx->in_flight); - if (inflight > 0) { - io_uring_submit_and_wait(&ctx->ring, 1); - // 有任务在飞,等一个CQE - continue; - } else { - // 真没事了,等新任务 - if (submit_queue_empty(ctx)) { - struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); - if (sqe) { - uint64_t buf; - io_uring_prep_read(sqe, ctx->event_fd, &buf, sizeof(buf), 0); - sqe->user_data = 0; // Special for event - io_uring_submit_and_wait(&ctx->ring, 1); - // After wait, consume the cqe - struct io_uring_cqe *cqe; - io_uring_peek_cqe(&ctx->ring, &cqe); - if (cqe && cqe->user_data == 0) { - io_uring_cq_advance(&ctx->ring, 1); - } - } - } - } - } - - printf("Shutdown: draining remaining CQEs...\n"); - int final_cq = 0; - struct io_uring_cqe *cqe; - unsigned head; - - while (atomic_load(&ctx->in_flight) > 0) { - io_uring_for_each_cqe(&ctx->ring, head, cqe) { - task_t *done = (task_t *)(uintptr_t)cqe->user_data; - atomic_fetch_sub(&ctx->in_flight, 1); - task_finish(done, cqe->res); - - destroy_queue_push(done); - - get_from_cqe++; - final_cq++; - } - - if (final_cq > 0) { - io_uring_cq_advance(&ctx->ring, final_cq); - final_cq = 0; - } - - // 如果还有 inflight,等一下 - if (atomic_load(&ctx->in_flight) > 0) { - io_uring_submit_and_wait(&ctx->ring, 1); - } - } - - printf("exit uring, stop: %d, inflight: %d\n", ctx->stop, - atomic_load(&ctx->in_flight)); - return NULL; -} - -int iouring_init(iouring_ctx_t *ctx, unsigned entries) -{ - memset(ctx, 0, sizeof(*ctx)); - ctx->stop = 0; - - atomic_init(&g_submit_queue.head, NULL); - atomic_init(&g_destroy_queue.head, NULL); - - - ctx->event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - if (ctx->event_fd < 0) { - // Handle error + w->event_fd = eventfd(0, EFD_CLOEXEC); + if (w->event_fd < 0) { return -errno; } - struct io_uring_params params; memset(¶ms, 0, sizeof(params)); - - int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); + ret = io_uring_queue_init_params(entries, &w->ring, ¶ms); if (ret < 0) { - fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n", - ret, strerror(-ret)); + close(w->event_fd); + w->event_fd = -1; return ret; } - - unsigned cq_size = *ctx->ring.cq.kring_entries; - printf("Kernel CQ size: %u\n", cq_size); - ctx->max_in_flight = (cq_size * 8) / 10; - atomic_init(&ctx->in_flight, 0); + cq_size = *w->ring.cq.kring_entries; + w->max_in_flight = (int)((cq_size * 8u) / 10u); + if (w->max_in_flight < 64) { + w->max_in_flight = 64; + } - ret = pthread_create(&ctx->th, NULL, worker_main, ctx); - if (ret != 0) - { - io_uring_queue_exit(&ctx->ring); + spsc_cap = (uint32_t)(w->max_in_flight * 2); + if (spsc_cap < IOURING_SPSC_MIN_CAP) { + spsc_cap = IOURING_SPSC_MIN_CAP; + } + spsc_cap += 1; + if (spsc_queue_init(&w->submit_q, spsc_cap) != 0) { + io_uring_queue_exit(&w->ring); + close(w->event_fd); + w->event_fd = -1; + return -ENOMEM; + } + + ret = pthread_create(&w->th, NULL, worker_main, w); + if (ret != 0) { + spsc_queue_destroy(&w->submit_q); + io_uring_queue_exit(&w->ring); + close(w->event_fd); + w->event_fd = -1; return -ret; } + + printf("io_uring worker[%d]: entries=%u cq=%u max_in_flight=%d queue_cap=%u\n", + worker_id, entries, cq_size, w->max_in_flight, spsc_cap - 1); return 0; } -void iouring_shutdown(iouring_ctx_t *ctx) -{ - ctx->stop = 1; - uint64_t val = 1; - write(ctx->event_fd, &val, sizeof(val)); +int iouring_init(iouring_ctx_t *ctx, unsigned entries) { + unsigned per_worker_entries; + int worker_nr; + int i; + if (!ctx) { + return -EINVAL; + } - pthread_join(ctx->th, NULL); - io_uring_queue_exit(&ctx->ring); + memset(ctx, 0, sizeof(*ctx)); + atomic_init(&ctx->stop, 0); + atomic_init(&ctx->rr_next, 0); + atomic_init(&ctx->destroy_queue.head, NULL); - close(ctx->event_fd); + worker_nr = default_worker_nr(); + if (worker_nr < 1) { + worker_nr = 1; + } + + if (entries < (unsigned)worker_nr * IOURING_MIN_ENTRIES_PER_WORKER) { + per_worker_entries = IOURING_MIN_ENTRIES_PER_WORKER; + } else { + per_worker_entries = entries / (unsigned)worker_nr; + } + if (per_worker_entries < IOURING_MIN_ENTRIES_PER_WORKER) { + per_worker_entries = IOURING_MIN_ENTRIES_PER_WORKER; + } + + ctx->workers = (iouring_worker_t *)calloc((size_t)worker_nr, sizeof(iouring_worker_t)); + if (!ctx->workers) { + return -ENOMEM; + } + ctx->worker_nr = worker_nr; + ctx->entries_per_worker = per_worker_entries; + + for (i = 0; i < worker_nr; i++) { + int rc = init_worker(ctx, &ctx->workers[i], i, per_worker_entries); + if (rc != 0) { + ctx->worker_nr = i; + iouring_shutdown(ctx); + return rc; + } + } + + printf("io_uring initialized with %d workers (n*SPSC)\n", worker_nr); + return 0; } -task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){ - if (!bufs || !lens || count <= 0) return NULL; +static void wake_all_workers(iouring_ctx_t *ctx) { + if (!ctx || !ctx->workers) { + return; + } + for (int i = 0; i < ctx->worker_nr; i++) { + worker_notify(&ctx->workers[i]); + } +} - task_t *t = (task_t *)kvs_malloc(sizeof(task_t)); +void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { + task_t *list; + if (!ctx) { + return; + } + + list = destroy_queue_steal_all(ctx); + while (list) { + task_t *next = list->next; + task_destroy(list); + list = next; + } +} + +void iouring_shutdown(iouring_ctx_t *ctx) { + int i; + if (!ctx || !ctx->workers) { + return; + } + + atomic_store_explicit(&ctx->stop, 1, memory_order_release); + wake_all_workers(ctx); + + for (i = 0; i < ctx->worker_nr; i++) { + iouring_worker_t *w = &ctx->workers[i]; + if (w->th) { + pthread_join(w->th, NULL); + } + if (w->event_fd >= 0) { + close(w->event_fd); + w->event_fd = -1; + } + io_uring_queue_exit(&w->ring); + spsc_queue_destroy(&w->submit_q); + } + + cleanup_finished_iouring_tasks(ctx); + free(ctx->workers); + ctx->workers = NULL; + ctx->worker_nr = 0; +} + +static int queue_task_with_backpressure(iouring_ctx_t *ctx, task_t *t) { + uint32_t rr; + int n; + if (!ctx || !ctx->workers || !t) { + return -1; + } + + n = ctx->worker_nr; + rr = atomic_fetch_add_explicit(&ctx->rr_next, 1, memory_order_relaxed); + + while (atomic_load_explicit(&ctx->stop, memory_order_acquire) == 0) { + for (int i = 0; i < n; i++) { + int idx = (int)((rr + (uint32_t)i) % (uint32_t)n); + iouring_worker_t *w = &ctx->workers[idx]; + int need_notify = 0; + if (spsc_try_push(&w->submit_q, t, &need_notify) == 0) { + if (need_notify) { + worker_notify(w); + } + return 0; + } + } + + /* 队列满时主动回收,避免主线程无界撑内存。 */ + cleanup_finished_iouring_tasks(ctx); + sched_yield(); + } + + return -1; +} + +task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off) { + task_t *t; + size_t total = 0; + uint8_t *packed = NULL; + size_t copied = 0; + + if (!ctx || !ctx->workers || !bufs || !lens || count <= 0) { + return NULL; + } + + t = (task_t *)kvs_malloc(sizeof(task_t)); + if (!t) { + return NULL; + } task_init(t); t->op = TASK_WRITE; t->fd = fd; t->off = off; - - t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count); - if(!t->iovs) { + t->iovcnt = 1; + t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec)); + if (!t->iovs) { kvs_free(t); return NULL; } - for(int i = 0;i < count; ++ i){ - size_t len = lens[i]; - void *buf = kvs_malloc(len); - if(!buf){ - for(int j = 0; j < i; ++j){ - if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base); - } + for (int i = 0; i < count; ++i) { + if (lens[i] > SIZE_MAX - total) { kvs_free(t->iovs); kvs_free(t); return NULL; } - memcpy(buf, bufs[i], len); - t->iovs[i].iov_base = buf; - t->iovs[i].iov_len = len; - - // t->iovs[i].iov_base = bufs[i]; - // t->iovs[i].iov_len = lens[i]; - + total += lens[i]; + } + + if (total == 0) { + kvs_free(t->iovs); + kvs_free(t); + return NULL; + } + + packed = (uint8_t *)kvs_malloc(total); + if (!packed) { + kvs_free(t->iovs); + kvs_free(t); + return NULL; + } + + for (int i = 0; i < count; ++i) { + size_t len = lens[i]; + if (len == 0) { + continue; + } + memcpy(packed + copied, bufs[i], len); + copied += len; + } + + t->iovs[0].iov_base = packed; + t->iovs[0].iov_len = copied; + + if (queue_task_with_backpressure(ctx, t) != 0) { + task_destroy(t); + return NULL; } - t->iovcnt = count; - - submit_queue_push(ctx, t); return t; } -int uring_task_complete(iouring_ctx_t *ctx){ - return submit_queue_empty(ctx) && atomic_load(&ctx->in_flight) == 0; -} - -// 主线程定期调用此函数清理 -void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { - task_t *list = destroy_queue_steal_all(); - - int cnt = 0; - task_t *cur = list; - while (cur) { - cnt++; - task_t *next = cur->next; - task_destroy(cur); - cur = next; +int uring_task_complete(iouring_ctx_t *ctx) { + if (!ctx || !ctx->workers) { + return 1; } - release_cnt += cnt; - // printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt); -} \ No newline at end of file + for (int i = 0; i < ctx->worker_nr; i++) { + iouring_worker_t *w = &ctx->workers[i]; + if (!spsc_empty(&w->submit_q)) { + return 0; + } + if (atomic_load_explicit(&w->in_flight, memory_order_relaxed) > 0) { + return 0; + } + } + + return atomic_load_explicit(&ctx->destroy_queue.head, memory_order_acquire) == NULL; +} diff --git a/diskuring/diskuring.c.nothread b/diskuring/diskuring.c.nothread deleted file mode 100644 index 3fded18..0000000 --- a/diskuring/diskuring.c.nothread +++ /dev/null @@ -1,199 +0,0 @@ -#include "diskuring.h" -#include "memory/alloc_dispatch.h" -#include -#include - -void task_init(task_t *t) -{ - t->done = 0; - t->res = 0; - t->next = NULL; -} - -void task_finish(task_t *t, int res) -{ - t->res = res; - t->done = 1; -} - -void task_destroy(task_t *t) -{ - if (t->iovs) { - for (int i = 0; i < t->iovcnt; i++) { - if (t->iovs[i].iov_base) { - kvs_free(t->iovs[i].iov_base); - } - } - kvs_free(t->iovs); - } - - kvs_free(t); -} - -int iouring_init(iouring_ctx_t *ctx, unsigned entries) -{ - memset(ctx, 0, sizeof(*ctx)); - - struct io_uring_params params; - memset(¶ms, 0, sizeof(params)); - - // params.flags |= IORING_SETUP_CQSIZE; - // params.cq_entries = 256 * 1024; - // params.sq_entries = 128 * 1024; - int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); - if (ret < 0) { - fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n", - ret, strerror(-ret)); - return ret; - } - - unsigned cq_size = *ctx->ring.cq.kring_entries; - printf("Kernel CQ size: %u\n", cq_size); - - if (ret != 0) - { - io_uring_queue_exit(&ctx->ring); - return -ret; - } - return 0; -} - -void iouring_shutdown(iouring_ctx_t *ctx) -{ - io_uring_queue_exit(&ctx->ring); -} - - -void harvest_cqes(iouring_ctx_t *ctx) -{ - struct io_uring_cqe *cqe; - unsigned head; - int cq_count = 0; - - // 使用 for_each_cqe 薅干净 CQ - io_uring_for_each_cqe(&ctx->ring, head, cqe) { - task_t *done = (task_t *)(uintptr_t)cqe->user_data; - task_finish(done, cqe->res); - - if (cqe->res < 0) { - fprintf(stderr, "write fail: fd=%d res=%d\n", done->fd, cqe->res); - } - // 直接 destroy(单线程,无需全局队列) - task_destroy(done); - - cq_count++; - } - - if (cq_count > 0) { - // printf("harvest cq:%d\n", cq_count); - io_uring_cq_advance(&ctx->ring, cq_count); - } - - // 检查 CQ overflow(保险) - if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { - fprintf(stderr, "FATAL: CQ overflow detected!\n"); - abort(); - } -} - -task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){ - if (!bufs || !lens || count <= 0) return NULL; - - task_t *t = (task_t *)kvs_malloc(sizeof(task_t)); - task_init(t); - t->op = TASK_WRITE; - t->fd = fd; - t->off = off; - - t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count); - if(!t->iovs) { - kvs_free(t); - return NULL; - } - - for(int i = 0;i < count; ++ i){ - size_t len = lens[i]; - void *buf = kvs_malloc(len); - if(!buf){ - for(int j = 0; j < i; ++j){ - if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base); - } - kvs_free(t->iovs); - kvs_free(t); - return NULL; - } - memcpy(buf, bufs[i], len); - t->iovs[i].iov_base = buf; - t->iovs[i].iov_len = len; - } - - t->iovcnt = count; - - harvest_cqes(ctx); - - if(!ctx->head){ - ctx->head = t; - ctx->tail = t; - }else{ - ctx->tail->next = t; - ctx->tail = t; - } - - int submitted = 0; - while(true){ - task_t *cur = ctx->head; - if(!cur){ - break; - } - ctx->head = cur->next; - if (!ctx->head) { - ctx->tail = NULL; - } - cur->next = NULL; - - struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); - if (!sqe) { - break; - } - - io_uring_prep_writev(sqe, cur->fd, cur->iovs, cur->iovcnt, cur->off); - sqe->user_data = (uint64_t)(uintptr_t)cur; - - submitted++; - } - - if(submitted > 0){ - int ret = io_uring_submit(&ctx->ring); - } - - return t; -} - -void iouring_tick(iouring_ctx_t *ctx) { - - harvest_cqes(ctx); - - int submitted = 0; - while(ctx->head){ - struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); - if (!sqe) { - break; - } - task_t *cur = ctx->head; - ctx->head = cur->next; - if (!ctx->head) { - ctx->tail = NULL; - } - cur->next = NULL; - - io_uring_prep_writev(sqe, cur->fd, cur->iovs, cur->iovcnt, cur->off); - sqe->user_data = (uint64_t)(uintptr_t)cur; - - submitted++; - } - - - if(submitted > 0){ - int ret = io_uring_submit(&ctx->ring); - } -} \ No newline at end of file diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h index f5b5942..c3925af 100644 --- a/diskuring/diskuring.h +++ b/diskuring/diskuring.h @@ -3,13 +3,13 @@ #include #include -#include #include -#include -#include -#include -#include #include +#include +#include +#include +#include +#include typedef enum { TASK_READ, TASK_WRITE } task_op_t; @@ -18,49 +18,64 @@ typedef struct task { int fd; off_t off; - int res; // cqe->res - int done; // 0/1 + int res; + _Atomic int done; - struct iovec *iovs; // iovec 数组 - int iovcnt; // iovec 数量 + struct iovec *iovs; + int iovcnt; struct task *next; } task_t; typedef struct { _Atomic(task_t *) head; -} task_stack_t; +} destroy_queue_t; typedef struct { - _Atomic(task_t *) head; -} destroy_queue_t; + task_t **slots; + uint32_t cap; + _Atomic uint32_t head; + _Atomic uint32_t tail; + _Atomic uint32_t size; +} spsc_queue_t; + +struct iouring_ctx_s; +typedef struct iouring_ctx_s iouring_ctx_t; typedef struct { struct io_uring ring; pthread_t th; - int event_fd; - - int stop; _Atomic int in_flight; - int max_in_flight; -} iouring_ctx_t; + int max_in_flight; + int worker_id; + spsc_queue_t submit_q; + iouring_ctx_t *parent; +} iouring_worker_t; +struct iouring_ctx_s { + iouring_worker_t *workers; + int worker_nr; + unsigned entries_per_worker; + + _Atomic int stop; + _Atomic uint32_t rr_next; + + destroy_queue_t destroy_queue; +}; void task_init(task_t *t); void task_finish(task_t *t, int res); - int task_wait(task_t *t); void task_destroy(task_t *t); int iouring_init(iouring_ctx_t *ctx, unsigned entries); void iouring_shutdown(iouring_ctx_t *ctx); -task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); +task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); int uring_task_complete(iouring_ctx_t *ctx); - -void cleanup_finished_iouring_tasks(); +void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx); extern iouring_ctx_t global_uring_ctx; -#endif \ No newline at end of file +#endif diff --git a/diskuring/diskuring.h.nothread b/diskuring/diskuring.h.nothread deleted file mode 100644 index 615e9b3..0000000 --- a/diskuring/diskuring.h.nothread +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef __DISK_IOURING_H__ -#define __DISK_IOURING_H__ - -#include -#include -#include -#include -#include -#include -#include -#include - -#define BATCH_SIZE 256 -typedef enum { TASK_READ, TASK_WRITE } task_op_t; - -typedef struct task { - task_op_t op; - int fd; - off_t off; - - int res; // cqe->res - int done; // 0/1 - - struct iovec *iovs; // iovec 数组 - int iovcnt; // iovec 数量 - - struct task *next; -} task_t; - -typedef struct { - struct io_uring ring; - int pending_count; - - task_t *head; - task_t *tail; -} iouring_ctx_t; - -void task_init(task_t *t); -void task_finish(task_t *t, int res); -void task_destroy(task_t *t); - -int iouring_init(iouring_ctx_t *ctx, unsigned entries); -void iouring_shutdown(iouring_ctx_t *ctx); - -task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); -void iouring_tick(iouring_ctx_t *ctx); - -extern iouring_ctx_t global_uring_ctx; - -#endif \ No newline at end of file diff --git a/kvs_array_bin.c b/kvs_array_bin.c index af7f563..58ae3df 100644 --- a/kvs_array_bin.c +++ b/kvs_array_bin.c @@ -235,7 +235,7 @@ int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename for (int i = 0; i < count; i++) total += lens[i]; task_t *t = submit_write(uring, fd, bufs, lens, count, current_off); - cleanup_finished_iouring_tasks(); + cleanup_finished_iouring_tasks(uring); if (!t) { perror("task init failed"); @@ -249,7 +249,7 @@ int kvs_array_save(iouring_ctx_t *uring, kvs_array_t *inst, const char* filename clean: while (!uring_task_complete(uring)) { usleep(1000); - cleanup_finished_iouring_tasks(); + cleanup_finished_iouring_tasks(uring); } close(fd); return 0; diff --git a/kvs_hash_bin.c b/kvs_hash_bin.c index b529d61..5537e69 100755 --- a/kvs_hash_bin.c +++ b/kvs_hash_bin.c @@ -314,7 +314,7 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){ perror("task init failed"); goto clean; } - cleanup_finished_iouring_tasks(); + cleanup_finished_iouring_tasks(uring); current_off += (off_t) total; } @@ -323,7 +323,7 @@ int kvs_hash_save(iouring_ctx_t *uring, kvs_hash_t *inst, const char* filename){ clean: while (!uring_task_complete(uring)) { usleep(1000); - cleanup_finished_iouring_tasks(); + cleanup_finished_iouring_tasks(uring); } close(fd); return 0; @@ -381,4 +381,4 @@ int kvs_hash_load(kvs_hash_t *inst, const char* filename){ } fclose(fp); return 0; -} \ No newline at end of file +} diff --git a/kvs_rbtree_bin.c b/kvs_rbtree_bin.c index 27cf03d..c885f40 100644 --- a/kvs_rbtree_bin.c +++ b/kvs_rbtree_bin.c @@ -504,7 +504,7 @@ static int kvs_rbtree_save_node(iouring_ctx_t *uring, int fd, off_t *current_off for (int i = 0; i < count; i++) total += lens[i]; task_t *t = submit_write(uring, fd, bufs, lens, count, *current_off); - cleanup_finished_iouring_tasks(); + cleanup_finished_iouring_tasks(uring); if(!t) { perror("task init failed"); @@ -532,7 +532,7 @@ int kvs_rbtree_save(iouring_ctx_t *uring, kvs_rbtree_t *inst, const char* filena while (!uring_task_complete(uring)) { usleep(1000); - cleanup_finished_iouring_tasks(); + cleanup_finished_iouring_tasks(uring); } close(fd); return rc; @@ -589,4 +589,4 @@ int kvs_rbtree_load(kvs_rbtree_t *inst, const char* filename){ fclose(fp); return 0; -} \ No newline at end of file +} diff --git a/kvstore.c b/kvstore.c index 868bdac..3fca3f0 100644 --- a/kvstore.c +++ b/kvstore.c @@ -41,14 +41,120 @@ void __completed_cmd(const uint8_t *cmd, size_t len, unsigned long long seq){ } -#include -#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) -#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) +#include +#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) +#define TIME_SUB_US(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000000 + (tv1.tv_usec - tv2.tv_usec)) + +static int checked_size_add(size_t a, size_t b, size_t *out) { + if (!out || a > SIZE_MAX - b) { + return -1; + } + *out = a + b; + return 0; +} + +static int resp_value_encoded_len(const resp_value_t *v, size_t *out_len) { + size_t len = 0; + + if (!v || !out_len) { + return -1; + } + + switch (v->type) { + case RESP_T_SIMPLE_STR: + case RESP_T_ERROR: + if (checked_size_add(1, (size_t)v->bulk.len, &len) < 0 || + checked_size_add(len, 2, &len) < 0) { + return -1; + } + break; + + case RESP_T_INTEGER: { + char tmp[64]; + int n = snprintf(tmp, sizeof(tmp), "%lld", (long long)v->i64); + if (n <= 0) { + return -1; + } + if (checked_size_add(1, (size_t)n, &len) < 0 || + checked_size_add(len, 2, &len) < 0) { + return -1; + } + break; + } + + case RESP_T_NIL: + len = 5; /* "$-1\r\n" */ + break; + + case RESP_T_BULK_STR: { + char tmp[32]; + int n; + size_t t; + + if (v->bulk.len > 0 && !v->bulk.ptr) { + return -1; + } + + n = snprintf(tmp, sizeof(tmp), "%u", (unsigned)v->bulk.len); + if (n <= 0) { + return -1; + } + + if (checked_size_add(1, (size_t)n, &t) < 0 || /* '$' + len digits */ + checked_size_add(t, 2, &t) < 0 || /* \r\n */ + checked_size_add(t, (size_t)v->bulk.len, &t) < 0 || + checked_size_add(t, 2, &len) < 0) { /* trailing \r\n */ + return -1; + } + break; + } + + default: + return -1; + } + + *out_len = len; + return 0; +} + +static int flush_pending_response(struct conn *conn, uint8_t *buf, size_t *out_len) { + if (!conn || !buf || !out_len) { + return -1; + } + if (*out_len == 0) { + return 0; + } + if (chain_buffer_append(&conn->wbuf, buf, *out_len) < 0) { + return -1; + } + *out_len = 0; + return 0; +} + +static int is_update_cmd(const resp_cmd_t *cmd) { + const resp_slice_t *c0; + + if (!cmd || cmd->argc == 0 || !cmd->argv[0].ptr || cmd->argv[0].len == 0) { + return 0; + } + + c0 = &cmd->argv[0]; + return ascii_casecmp(c0->ptr, c0->len, "SET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "DEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "MOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "RMOD") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HSET") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HDEL") == 0 || + ascii_casecmp(c0->ptr, c0->len, "HMOD") == 0; +} + int kvs_protocol(struct conn* conn){ -#if TIME_COLLECT == 1 - struct timeval func_start; - gettimeofday(&func_start, NULL); - long total_oplog_us = 0; +#if TIME_COLLECT == 1 + struct timeval func_start; + gettimeofday(&func_start, NULL); + long total_oplog_us = 0; #endif if (!conn) return -1; @@ -62,11 +168,11 @@ int kvs_protocol(struct conn* conn){ uint8_t response[KVS_MAX_RESPONSE]; int consumed = 0; - int out_len = 0; - - while(consumed < request_length ){ - const uint8_t *p = request+consumed; - int remain = request_length - consumed; + size_t out_len = 0; + + while(consumed < request_length ){ + const uint8_t *p = request+consumed; + int remain = request_length - consumed; resp_cmd_t cmd; memset(&cmd, 0, sizeof(cmd)); @@ -155,13 +261,15 @@ int kvs_protocol(struct conn* conn){ // } // } - if(global_cfg.persistence == PERSIST_INCREMENTAL){ - kvs_oplog_append(p, len, global_oplog_fd); - } + int need_persist = is_update_cmd(&cmd); + + if(global_cfg.persistence == PERSIST_INCREMENTAL && need_persist){ + kvs_oplog_append(p, len, global_oplog_fd); + } // __completed_cmd(p, len, global_seq); // global_seq ++; - if (global_cfg.replica_mode == REPLICA_ENABLE) { + if (global_cfg.replica_mode == REPLICA_ENABLE && need_persist) { uint32_t off = 0; int ar = replica_shm_append(&g_rep_shm, global_seq, p, (uint32_t)len, &off); if (ar == 0) { @@ -178,38 +286,60 @@ int kvs_protocol(struct conn* conn){ gettimeofday(&oplog_end, NULL); total_oplog_us += (oplog_end.tv_sec - oplog_start.tv_sec) * 1000000 + (oplog_end.tv_usec - oplog_start.tv_usec); -#endif - - /* 构建响应 */ - int cap = KVS_MAX_RESPONSE - out_len; - if (cap <= 0) { - return consumed; +#endif + + /* 构建响应 */ + int resp_len = resp_build_value(&val, response + out_len, sizeof(response) - out_len); + if (resp_len < 0) { + /* 当前批次剩余空间不够,先把已拼好的刷到发送队列再重试 */ + if (flush_pending_response(conn, response, &out_len) < 0) { + return -1; + } + + resp_len = resp_build_value(&val, response, sizeof(response)); + if (resp_len < 0) { + size_t resp_need = 0; + uint8_t *resp_heap = NULL; + + if (resp_value_encoded_len(&val, &resp_need) < 0) { + return -1; + } + + resp_heap = (uint8_t *)malloc(resp_need); + if (!resp_heap) { + return -1; + } + + resp_len = resp_build_value(&val, resp_heap, resp_need); + if (resp_len < 0 || + chain_buffer_append(&conn->wbuf, resp_heap, (size_t)resp_len) < 0) { + free(resp_heap); + return -1; + } + + free(resp_heap); + resp_len = 0; + } } - int resp_len = resp_build_value(&val, response + out_len, (size_t)cap); - if (resp_len < 0) { - return consumed; - } + out_len += (size_t)resp_len; + + __completed_cmd(request, consumed, 0); + + consumed += len; + } - __completed_cmd(request, consumed, 0); - - out_len += resp_len; - consumed += len; - } - -#if TIME_COLLECT == 1 + #if TIME_COLLECT == 1 struct timeval func_end; gettimeofday(&func_end, NULL); long func_us = (func_end.tv_sec - func_start.tv_sec) * 1000000 + (func_end.tv_usec - func_start.tv_usec); - fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); -#endif - - if (out_len > 0) { - if (chain_buffer_append(&conn->wbuf, response, (size_t)out_len) < 0) { - return -1; - } - } + fprintf(stderr, "kvs_protocol: total %ld us, oplog %ld us\n", func_us, total_oplog_us); +#endif + + if (flush_pending_response(conn, response, &out_len) < 0) { + return -1; + } return consumed; }