#include "diskuring.h" #include "memory/alloc_dispatch.h" #include #include #include #include #include #include #define IOURING_MAX_WORKERS 16 #define IOURING_MIN_ENTRIES_PER_WORKER 128u #define IOURING_SPSC_MIN_CAP 1024u #define IOURING_SUBMIT_BATCH 256 extern void sync_wakeup(); typedef struct { _Atomic uint64_t submit_calls; _Atomic uint64_t submit_pack_ns; _Atomic uint64_t submit_alloc_ns; _Atomic uint64_t submit_copy_ns; _Atomic uint64_t submit_queue_ns; _Atomic uint64_t submit_backpressure_ns; _Atomic uint64_t submit_backpressure_loops; _Atomic uint64_t cleanup_calls; _Atomic uint64_t cleanup_ns; _Atomic uint64_t cleanup_tasks; } iouring_profile_stats_t; static iouring_profile_stats_t g_prof; static _Atomic uint64_t g_prof_seq; static int g_prof_enable = 0; static uint64_t g_prof_sample_mask = 0; static uint64_t g_prof_scale = 1; static inline uint64_t mono_ns(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; } static inline int prof_should_sample(void) { uint64_t seq; if (!g_prof_enable) { return 0; } seq = atomic_fetch_add_explicit(&g_prof_seq, 1, memory_order_relaxed); return (seq & g_prof_sample_mask) == 0; } static inline uint64_t div_u64(uint64_t a, uint64_t b) { return b ? (a / b) : 0; } static int parse_env_int(const char *name, int defv, int minv, int maxv) { const char *v = getenv(name); char *end = NULL; long n = 0; if (!v || !*v) { return defv; } errno = 0; n = strtol(v, &end, 10); if (errno != 0 || !end || *end != '\0') { return defv; } if (n < minv) { return minv; } if (n > maxv) { return maxv; } return (int)n; } static int default_worker_nr(void) { int n = 3; return parse_env_int("KVS_URING_WORKERS", n, 1, IOURING_MAX_WORKERS); } static inline uint32_t spsc_next(const spsc_queue_t *q, uint32_t idx) { idx++; if (idx >= q->cap) { idx = 0; } return idx; } static int spsc_queue_init(spsc_queue_t *q, uint32_t cap) { if (!q || cap < 2) { return -1; } q->slots = (task_t **)calloc(cap, sizeof(task_t *)); if (!q->slots) { return -1; } q->cap = cap; atomic_init(&q->head, 0); atomic_init(&q->tail, 0); atomic_init(&q->size, 0); return 0; } static void spsc_queue_destroy(spsc_queue_t *q) { if (!q) { return; } free(q->slots); q->slots = NULL; q->cap = 0; } static int spsc_try_push(spsc_queue_t *q, task_t *t, int *need_notify) { uint32_t tail; uint32_t next; uint32_t head; uint32_t prev_size; if (!q || !t) { return -1; } if (need_notify) { *need_notify = 0; } tail = atomic_load_explicit(&q->tail, memory_order_relaxed); next = spsc_next(q, tail); head = atomic_load_explicit(&q->head, memory_order_acquire); if (next == head) { return -1; } q->slots[tail] = t; atomic_store_explicit(&q->tail, next, memory_order_release); prev_size = atomic_fetch_add_explicit(&q->size, 1, memory_order_release); if (need_notify && prev_size == 0) { *need_notify = 1; } return 0; } static task_t *spsc_try_pop(spsc_queue_t *q) { uint32_t head; uint32_t tail; task_t *t; if (!q) { return NULL; } head = atomic_load_explicit(&q->head, memory_order_relaxed); tail = atomic_load_explicit(&q->tail, memory_order_acquire); if (head == tail) { return NULL; } t = q->slots[head]; q->slots[head] = NULL; atomic_store_explicit(&q->head, spsc_next(q, head), memory_order_release); atomic_fetch_sub_explicit(&q->size, 1, memory_order_release); return t; } static int spsc_empty(spsc_queue_t *q) { return atomic_load_explicit(&q->size, memory_order_acquire) == 0; } static void destroy_queue_push(iouring_ctx_t *ctx, task_t *t) { task_t *old_head; do { old_head = atomic_load_explicit(&ctx->destroy_queue.head, memory_order_relaxed); t->next = old_head; } while (!atomic_compare_exchange_weak_explicit( &ctx->destroy_queue.head, &old_head, t, memory_order_release, memory_order_relaxed)); } static task_t *destroy_queue_steal_all(iouring_ctx_t *ctx) { return atomic_exchange_explicit(&ctx->destroy_queue.head, NULL, memory_order_acquire); } static void worker_notify(iouring_worker_t *w) { uint64_t one = 1; while (1) { ssize_t n = write(w->event_fd, &one, sizeof(one)); if (n == (ssize_t)sizeof(one)) { return; } if (n < 0 && errno == EINTR) { continue; } if (n < 0 && errno == EAGAIN) { return; } return; } } static void worker_wait_event(iouring_worker_t *w) { uint64_t v; while (1) { ssize_t n = read(w->event_fd, &v, sizeof(v)); if (n == (ssize_t)sizeof(v)) { return; } if (n < 0 && errno == EINTR) { continue; } return; } } static void worker_collect_cq(iouring_worker_t *w, int *completed) { while (1) { struct io_uring_cqe *cqe = NULL; int rc = io_uring_peek_cqe(&w->ring, &cqe); if (rc < 0 || !cqe) { break; } if (cqe->user_data != 0) { task_t *done = (task_t *)(uintptr_t)cqe->user_data; atomic_fetch_sub_explicit(&w->in_flight, 1, memory_order_relaxed); task_finish(done, cqe->res); if (cqe->res < 0) { fprintf(stderr, "write fail: wid=%d fd=%d res=%d off=%ld\n", w->worker_id, done->fd, cqe->res, (long)done->off); } destroy_queue_push(w->parent, done); (*completed)++; } io_uring_cqe_seen(&w->ring, cqe); } } static void *worker_main(void *arg) { iouring_worker_t *w = (iouring_worker_t *)arg; iouring_ctx_t *ctx = w->parent; task_t *local_head = NULL; task_t *local_tail = NULL; while (1) { int completed = 0; int prepared = 0; bool stop = atomic_load_explicit(&ctx->stop, memory_order_acquire) != 0; if ((*w->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) != 0) { fprintf(stderr, "FATAL: CQ overflow on worker %d\n", w->worker_id); abort(); } worker_collect_cq(w, &completed); if (completed > 0) { sync_wakeup(); } while (prepared < IOURING_SUBMIT_BATCH) { task_t *t = NULL; struct io_uring_sqe *sqe = NULL; if (atomic_load_explicit(&w->in_flight, memory_order_relaxed) >= w->max_in_flight) { break; } if (local_head) { t = local_head; local_head = local_head->next; if (!local_head) { local_tail = NULL; } t->next = NULL; } else { t = spsc_try_pop(&w->submit_q); } if (!t) { break; } sqe = io_uring_get_sqe(&w->ring); if (!sqe) { if (local_tail) { local_tail->next = t; local_tail = t; } else { local_head = t; local_tail = t; } break; } io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); sqe->user_data = (uint64_t)(uintptr_t)t; prepared++; } if (prepared > 0) { int submitted = io_uring_submit(&w->ring); if (submitted < 0) { if (submitted != -EINTR && submitted != -EAGAIN) { fprintf(stderr, "io_uring_submit worker=%d ret=%d\n", w->worker_id, submitted); } } else if (submitted > 0) { atomic_fetch_add_explicit(&w->in_flight, submitted, memory_order_relaxed); continue; } } if (stop && atomic_load_explicit(&w->in_flight, memory_order_relaxed) == 0 && spsc_empty(&w->submit_q) && local_head == NULL) { break; } if (atomic_load_explicit(&w->in_flight, memory_order_relaxed) > 0) { io_uring_submit_and_wait(&w->ring, 1); continue; } if (!spsc_empty(&w->submit_q) || local_head) { continue; } worker_wait_event(w); } return NULL; } void task_init(task_t *t) { if (!t) { return; } t->done = 0; t->res = 0; t->next = NULL; } void task_finish(task_t *t, int res) { if (!t) { return; } t->res = res; atomic_store_explicit(&t->done, 1, memory_order_release); } int task_wait(task_t *t) { if (!t) { return -EINVAL; } while (atomic_load_explicit(&t->done, memory_order_acquire) == 0) { sched_yield(); } return t->res; } void task_destroy(task_t *t) { if (!t) { return; } if (t->iovs) { for (int i = 0; i < t->iovcnt; i++) { if (t->iovs[i].iov_base) { kvs_free(t->iovs[i].iov_base); } } kvs_free(t->iovs); } kvs_free(t); } static int init_worker(iouring_ctx_t *ctx, iouring_worker_t *w, int worker_id, unsigned entries) { struct io_uring_params params; unsigned cq_size = 0; uint32_t spsc_cap = 0; int ret = 0; memset(w, 0, sizeof(*w)); w->worker_id = worker_id; w->parent = ctx; atomic_init(&w->in_flight, 0); w->event_fd = eventfd(0, EFD_CLOEXEC); if (w->event_fd < 0) { return -errno; } memset(¶ms, 0, sizeof(params)); ret = io_uring_queue_init_params(entries, &w->ring, ¶ms); if (ret < 0) { close(w->event_fd); w->event_fd = -1; return ret; } cq_size = *w->ring.cq.kring_entries; w->max_in_flight = (int)((cq_size * 8u) / 10u); if (w->max_in_flight < 64) { w->max_in_flight = 64; } spsc_cap = (uint32_t)(w->max_in_flight * 2); if (spsc_cap < IOURING_SPSC_MIN_CAP) { spsc_cap = IOURING_SPSC_MIN_CAP; } spsc_cap += 1; if (spsc_queue_init(&w->submit_q, spsc_cap) != 0) { io_uring_queue_exit(&w->ring); close(w->event_fd); w->event_fd = -1; return -ENOMEM; } ret = pthread_create(&w->th, NULL, worker_main, w); if (ret != 0) { spsc_queue_destroy(&w->submit_q); io_uring_queue_exit(&w->ring); close(w->event_fd); w->event_fd = -1; return -ret; } printf("io_uring worker[%d]: entries=%u cq=%u max_in_flight=%d queue_cap=%u\n", worker_id, entries, cq_size, w->max_in_flight, spsc_cap - 1); return 0; } int iouring_init(iouring_ctx_t *ctx, unsigned entries) { unsigned per_worker_entries; int worker_nr; int i; if (!ctx) { return -EINVAL; } memset(ctx, 0, sizeof(*ctx)); atomic_init(&ctx->stop, 0); atomic_init(&ctx->rr_next, 0); atomic_init(&ctx->destroy_queue.head, NULL); memset(&g_prof, 0, sizeof(g_prof)); atomic_init(&g_prof_seq, 0); g_prof_enable = parse_env_int("KVS_IOURING_PROFILE", 0, 0, 1); if (g_prof_enable) { int shift = parse_env_int("KVS_IOURING_PROFILE_SHIFT", 6, 0, 12); g_prof_sample_mask = ((uint64_t)1 << (uint64_t)shift) - 1; g_prof_scale = (uint64_t)1 << (uint64_t)shift; printf("io_uring profile enabled: sample=1/%llu\n", (unsigned long long)g_prof_scale); } else { g_prof_sample_mask = 0; g_prof_scale = 1; } worker_nr = default_worker_nr(); if (worker_nr < 1) { worker_nr = 1; } if (entries < (unsigned)worker_nr * IOURING_MIN_ENTRIES_PER_WORKER) { per_worker_entries = IOURING_MIN_ENTRIES_PER_WORKER; } else { per_worker_entries = entries / (unsigned)worker_nr; } if (per_worker_entries < IOURING_MIN_ENTRIES_PER_WORKER) { per_worker_entries = IOURING_MIN_ENTRIES_PER_WORKER; } ctx->workers = (iouring_worker_t *)calloc((size_t)worker_nr, sizeof(iouring_worker_t)); if (!ctx->workers) { return -ENOMEM; } ctx->worker_nr = worker_nr; ctx->entries_per_worker = per_worker_entries; for (i = 0; i < worker_nr; i++) { int rc = init_worker(ctx, &ctx->workers[i], i, per_worker_entries); if (rc != 0) { ctx->worker_nr = i; iouring_shutdown(ctx); return rc; } } printf("io_uring initialized with %d workers (n*SPSC)\n", worker_nr); return 0; } static void wake_all_workers(iouring_ctx_t *ctx) { if (!ctx || !ctx->workers) { return; } for (int i = 0; i < ctx->worker_nr; i++) { worker_notify(&ctx->workers[i]); } } void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { task_t *list; uint64_t start_ns = 0; uint64_t tasks = 0; if (!ctx) { return; } if (g_prof_enable) { start_ns = mono_ns(); } list = destroy_queue_steal_all(ctx); while (list) { task_t *next = list->next; task_destroy(list); tasks++; list = next; } if (g_prof_enable) { uint64_t ns = mono_ns() - start_ns; atomic_fetch_add_explicit(&g_prof.cleanup_calls, 1, memory_order_relaxed); atomic_fetch_add_explicit(&g_prof.cleanup_ns, ns, memory_order_relaxed); atomic_fetch_add_explicit(&g_prof.cleanup_tasks, tasks, memory_order_relaxed); } } void iouring_profile_dump(iouring_ctx_t *ctx) { uint64_t submit_calls; uint64_t submit_pack_ns; uint64_t submit_alloc_ns; uint64_t submit_copy_ns; uint64_t submit_queue_ns; uint64_t submit_bp_ns; uint64_t submit_bp_loops; uint64_t cleanup_calls; uint64_t cleanup_ns; uint64_t cleanup_tasks; uint64_t submit_total_ns; uint64_t main_total_ns; (void)ctx; if (!g_prof_enable) { printf("[iouring-prof] disabled (set KVS_IOURING_PROFILE=1)\n"); return; } submit_calls = atomic_load_explicit(&g_prof.submit_calls, memory_order_relaxed); submit_pack_ns = atomic_load_explicit(&g_prof.submit_pack_ns, memory_order_relaxed); submit_alloc_ns = atomic_load_explicit(&g_prof.submit_alloc_ns, memory_order_relaxed); submit_copy_ns = atomic_load_explicit(&g_prof.submit_copy_ns, memory_order_relaxed); submit_queue_ns = atomic_load_explicit(&g_prof.submit_queue_ns, memory_order_relaxed); submit_bp_ns = atomic_load_explicit(&g_prof.submit_backpressure_ns, memory_order_relaxed); submit_bp_loops = atomic_load_explicit(&g_prof.submit_backpressure_loops, memory_order_relaxed); cleanup_calls = atomic_load_explicit(&g_prof.cleanup_calls, memory_order_relaxed); cleanup_ns = atomic_load_explicit(&g_prof.cleanup_ns, memory_order_relaxed); cleanup_tasks = atomic_load_explicit(&g_prof.cleanup_tasks, memory_order_relaxed); submit_total_ns = submit_pack_ns + submit_queue_ns; main_total_ns = submit_total_ns + cleanup_ns; printf("[iouring-prof] submits=%llu cleanup_calls=%llu cleanup_tasks=%llu\n", (unsigned long long)submit_calls, (unsigned long long)cleanup_calls, (unsigned long long)cleanup_tasks); printf("[iouring-prof] submit_pack=%llums (alloc=%llums copy=%llums) submit_queue=%llums cleanup=%llums total_main=%llums\n", (unsigned long long)(submit_pack_ns / 1000000ull), (unsigned long long)(submit_alloc_ns / 1000000ull), (unsigned long long)(submit_copy_ns / 1000000ull), (unsigned long long)(submit_queue_ns / 1000000ull), (unsigned long long)(cleanup_ns / 1000000ull), (unsigned long long)(main_total_ns / 1000000ull)); printf("[iouring-prof] per_submit(ns): pack=%llu alloc=%llu copy=%llu queue=%llu backpressure=%llu loops=%llu\n", (unsigned long long)div_u64(submit_pack_ns, submit_calls), (unsigned long long)div_u64(submit_alloc_ns, submit_calls), (unsigned long long)div_u64(submit_copy_ns, submit_calls), (unsigned long long)div_u64(submit_queue_ns, submit_calls), (unsigned long long)div_u64(submit_bp_ns, submit_calls), (unsigned long long)div_u64(submit_bp_loops, submit_calls)); printf("[iouring-prof] per_cleanup(ns)=%llu per_task_free(ns)=%llu\n", (unsigned long long)div_u64(cleanup_ns, cleanup_calls), (unsigned long long)div_u64(cleanup_ns, cleanup_tasks)); if (main_total_ns > 0) { printf("[iouring-prof] main-share: pack=%.1f%% queue=%.1f%% cleanup=%.1f%%\n", (double)submit_pack_ns * 100.0 / (double)main_total_ns, (double)submit_queue_ns * 100.0 / (double)main_total_ns, (double)cleanup_ns * 100.0 / (double)main_total_ns); if (submit_pack_ns > 0) { double other_pct = 100.0 - ((double)submit_alloc_ns * 100.0 / (double)submit_pack_ns) - ((double)submit_copy_ns * 100.0 / (double)submit_pack_ns); if (other_pct < 0.0) { other_pct = 0.0; } printf("[iouring-prof] pack-share: alloc=%.1f%% copy=%.1f%% other=%.1f%%\n", (double)submit_alloc_ns * 100.0 / (double)submit_pack_ns, (double)submit_copy_ns * 100.0 / (double)submit_pack_ns, other_pct); } } } void iouring_shutdown(iouring_ctx_t *ctx) { int i; if (!ctx || !ctx->workers) { return; } atomic_store_explicit(&ctx->stop, 1, memory_order_release); wake_all_workers(ctx); for (i = 0; i < ctx->worker_nr; i++) { iouring_worker_t *w = &ctx->workers[i]; if (w->th) { pthread_join(w->th, NULL); } if (w->event_fd >= 0) { close(w->event_fd); w->event_fd = -1; } io_uring_queue_exit(&w->ring); spsc_queue_destroy(&w->submit_q); } cleanup_finished_iouring_tasks(ctx); free(ctx->workers); ctx->workers = NULL; ctx->worker_nr = 0; } static int queue_task_with_backpressure(iouring_ctx_t *ctx, task_t *t) { uint32_t rr; int n; uint64_t start_ns = 0; uint64_t loops = 0; int sampled = prof_should_sample(); if (!ctx || !ctx->workers || !t) { return -1; } if (sampled) { start_ns = mono_ns(); } n = ctx->worker_nr; rr = atomic_fetch_add_explicit(&ctx->rr_next, 1, memory_order_relaxed); while (atomic_load_explicit(&ctx->stop, memory_order_acquire) == 0) { loops++; for (int i = 0; i < n; i++) { int idx = (int)((rr + (uint32_t)i) % (uint32_t)n); iouring_worker_t *w = &ctx->workers[idx]; int need_notify = 0; if (spsc_try_push(&w->submit_q, t, &need_notify) == 0) { if (need_notify) { worker_notify(w); } if (sampled) { uint64_t ns = mono_ns() - start_ns; atomic_fetch_add_explicit(&g_prof.submit_backpressure_ns, ns * g_prof_scale, memory_order_relaxed); atomic_fetch_add_explicit(&g_prof.submit_backpressure_loops, loops * g_prof_scale, memory_order_relaxed); } return 0; } } /* 队列满时主动回收,避免主线程无界撑内存。 */ cleanup_finished_iouring_tasks(ctx); sched_yield(); } if (sampled) { uint64_t ns = mono_ns() - start_ns; atomic_fetch_add_explicit(&g_prof.submit_backpressure_ns, ns * g_prof_scale, memory_order_relaxed); atomic_fetch_add_explicit(&g_prof.submit_backpressure_loops, loops * g_prof_scale, memory_order_relaxed); } return -1; } task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off) { task_t *t; size_t total = 0; uint8_t *packed = NULL; size_t copied = 0; uint64_t pack_start = 0; uint64_t alloc_start = 0; uint64_t copy_start = 0; uint64_t queue_start = 0; int sampled = prof_should_sample(); if (!ctx || !ctx->workers || !bufs || !lens || count <= 0) { return NULL; } atomic_fetch_add_explicit(&g_prof.submit_calls, 1, memory_order_relaxed); if (sampled) { pack_start = mono_ns(); alloc_start = pack_start; } t = (task_t *)kvs_malloc(sizeof(task_t)); if (!t) { return NULL; } task_init(t); t->op = TASK_WRITE; t->fd = fd; t->off = off; t->iovcnt = 1; t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec)); if (!t->iovs) { kvs_free(t); return NULL; } for (int i = 0; i < count; ++i) { if (lens[i] > SIZE_MAX - total) { kvs_free(t->iovs); kvs_free(t); return NULL; } total += lens[i]; } if (total == 0) { kvs_free(t->iovs); kvs_free(t); return NULL; } packed = (uint8_t *)kvs_malloc(total); if (!packed) { kvs_free(t->iovs); kvs_free(t); return NULL; } if (sampled) { uint64_t alloc_ns = mono_ns() - alloc_start; atomic_fetch_add_explicit(&g_prof.submit_alloc_ns, alloc_ns * g_prof_scale, memory_order_relaxed); copy_start = mono_ns(); } for (int i = 0; i < count; ++i) { size_t len = lens[i]; if (len == 0) { continue; } memcpy(packed + copied, bufs[i], len); copied += len; } if (sampled) { uint64_t copy_ns = mono_ns() - copy_start; atomic_fetch_add_explicit(&g_prof.submit_copy_ns, copy_ns * g_prof_scale, memory_order_relaxed); } t->iovs[0].iov_base = packed; t->iovs[0].iov_len = copied; if (sampled) { uint64_t pack_ns = mono_ns() - pack_start; atomic_fetch_add_explicit(&g_prof.submit_pack_ns, pack_ns * g_prof_scale, memory_order_relaxed); queue_start = mono_ns(); } if (queue_task_with_backpressure(ctx, t) != 0) { task_destroy(t); return NULL; } if (sampled) { uint64_t queue_ns = mono_ns() - queue_start; atomic_fetch_add_explicit(&g_prof.submit_queue_ns, queue_ns * g_prof_scale, memory_order_relaxed); } return t; } int uring_task_complete(iouring_ctx_t *ctx) { if (!ctx || !ctx->workers) { return 1; } for (int i = 0; i < ctx->worker_nr; i++) { iouring_worker_t *w = &ctx->workers[i]; if (!spsc_empty(&w->submit_q)) { return 0; } if (atomic_load_explicit(&w->in_flight, memory_order_relaxed) > 0) { return 0; } } return atomic_load_explicit(&ctx->destroy_queue.head, memory_order_acquire) == NULL; }