From 57720a313591bbc2229cd81e19ddabbdcc158a32 Mon Sep 17 00:00:00 2001 From: 1iaan Date: Tue, 3 Mar 2026 14:24:44 +0000 Subject: [PATCH] diskuring fix --- diskuring/diskuring.c | 201 ++++++++++++++++++++++++++++++++++++++++++ diskuring/diskuring.h | 1 + kvs_protocol_resp.c | 3 +- 3 files changed, 204 insertions(+), 1 deletion(-) diff --git a/diskuring/diskuring.c b/diskuring/diskuring.c index dacc969..513e102 100644 --- a/diskuring/diskuring.c +++ b/diskuring/diskuring.c @@ -6,6 +6,7 @@ #include #include #include +#include #define IOURING_MAX_WORKERS 16 #define IOURING_MIN_ENTRIES_PER_WORKER 128u @@ -14,6 +15,44 @@ extern void sync_wakeup(); +typedef struct { + _Atomic uint64_t submit_calls; + _Atomic uint64_t submit_pack_ns; + _Atomic uint64_t submit_alloc_ns; + _Atomic uint64_t submit_copy_ns; + _Atomic uint64_t submit_queue_ns; + _Atomic uint64_t submit_backpressure_ns; + _Atomic uint64_t submit_backpressure_loops; + _Atomic uint64_t cleanup_calls; + _Atomic uint64_t cleanup_ns; + _Atomic uint64_t cleanup_tasks; +} iouring_profile_stats_t; + +static iouring_profile_stats_t g_prof; +static _Atomic uint64_t g_prof_seq; +static int g_prof_enable = 0; +static uint64_t g_prof_sample_mask = 0; +static uint64_t g_prof_scale = 1; + +static inline uint64_t mono_ns(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; +} + +static inline int prof_should_sample(void) { + uint64_t seq; + if (!g_prof_enable) { + return 0; + } + seq = atomic_fetch_add_explicit(&g_prof_seq, 1, memory_order_relaxed); + return (seq & g_prof_sample_mask) == 0; +} + +static inline uint64_t div_u64(uint64_t a, uint64_t b) { + return b ? (a / b) : 0; +} + static int parse_env_int(const char *name, int defv, int minv, int maxv) { const char *v = getenv(name); char *end = NULL; @@ -397,6 +436,20 @@ int iouring_init(iouring_ctx_t *ctx, unsigned entries) { atomic_init(&ctx->stop, 0); atomic_init(&ctx->rr_next, 0); atomic_init(&ctx->destroy_queue.head, NULL); + memset(&g_prof, 0, sizeof(g_prof)); + atomic_init(&g_prof_seq, 0); + + g_prof_enable = parse_env_int("KVS_IOURING_PROFILE", 0, 0, 1); + if (g_prof_enable) { + int shift = parse_env_int("KVS_IOURING_PROFILE_SHIFT", 6, 0, 12); + g_prof_sample_mask = ((uint64_t)1 << (uint64_t)shift) - 1; + g_prof_scale = (uint64_t)1 << (uint64_t)shift; + printf("io_uring profile enabled: sample=1/%llu\n", + (unsigned long long)g_prof_scale); + } else { + g_prof_sample_mask = 0; + g_prof_scale = 1; + } worker_nr = default_worker_nr(); if (worker_nr < 1) { @@ -443,16 +496,104 @@ static void wake_all_workers(iouring_ctx_t *ctx) { void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { task_t *list; + uint64_t start_ns = 0; + uint64_t tasks = 0; if (!ctx) { return; } + if (g_prof_enable) { + start_ns = mono_ns(); + } + list = destroy_queue_steal_all(ctx); while (list) { task_t *next = list->next; task_destroy(list); + tasks++; list = next; } + + if (g_prof_enable) { + uint64_t ns = mono_ns() - start_ns; + atomic_fetch_add_explicit(&g_prof.cleanup_calls, 1, memory_order_relaxed); + atomic_fetch_add_explicit(&g_prof.cleanup_ns, ns, memory_order_relaxed); + atomic_fetch_add_explicit(&g_prof.cleanup_tasks, tasks, memory_order_relaxed); + } +} + +void iouring_profile_dump(iouring_ctx_t *ctx) { + uint64_t submit_calls; + uint64_t submit_pack_ns; + uint64_t submit_alloc_ns; + uint64_t submit_copy_ns; + uint64_t submit_queue_ns; + uint64_t submit_bp_ns; + uint64_t submit_bp_loops; + uint64_t cleanup_calls; + uint64_t cleanup_ns; + uint64_t cleanup_tasks; + uint64_t submit_total_ns; + uint64_t main_total_ns; + (void)ctx; + + if (!g_prof_enable) { + printf("[iouring-prof] disabled (set KVS_IOURING_PROFILE=1)\n"); + return; + } + + submit_calls = atomic_load_explicit(&g_prof.submit_calls, memory_order_relaxed); + submit_pack_ns = atomic_load_explicit(&g_prof.submit_pack_ns, memory_order_relaxed); + submit_alloc_ns = atomic_load_explicit(&g_prof.submit_alloc_ns, memory_order_relaxed); + submit_copy_ns = atomic_load_explicit(&g_prof.submit_copy_ns, memory_order_relaxed); + submit_queue_ns = atomic_load_explicit(&g_prof.submit_queue_ns, memory_order_relaxed); + submit_bp_ns = atomic_load_explicit(&g_prof.submit_backpressure_ns, memory_order_relaxed); + submit_bp_loops = atomic_load_explicit(&g_prof.submit_backpressure_loops, memory_order_relaxed); + cleanup_calls = atomic_load_explicit(&g_prof.cleanup_calls, memory_order_relaxed); + cleanup_ns = atomic_load_explicit(&g_prof.cleanup_ns, memory_order_relaxed); + cleanup_tasks = atomic_load_explicit(&g_prof.cleanup_tasks, memory_order_relaxed); + submit_total_ns = submit_pack_ns + submit_queue_ns; + main_total_ns = submit_total_ns + cleanup_ns; + + printf("[iouring-prof] submits=%llu cleanup_calls=%llu cleanup_tasks=%llu\n", + (unsigned long long)submit_calls, + (unsigned long long)cleanup_calls, + (unsigned long long)cleanup_tasks); + printf("[iouring-prof] submit_pack=%llums (alloc=%llums copy=%llums) submit_queue=%llums cleanup=%llums total_main=%llums\n", + (unsigned long long)(submit_pack_ns / 1000000ull), + (unsigned long long)(submit_alloc_ns / 1000000ull), + (unsigned long long)(submit_copy_ns / 1000000ull), + (unsigned long long)(submit_queue_ns / 1000000ull), + (unsigned long long)(cleanup_ns / 1000000ull), + (unsigned long long)(main_total_ns / 1000000ull)); + printf("[iouring-prof] per_submit(ns): pack=%llu alloc=%llu copy=%llu queue=%llu backpressure=%llu loops=%llu\n", + (unsigned long long)div_u64(submit_pack_ns, submit_calls), + (unsigned long long)div_u64(submit_alloc_ns, submit_calls), + (unsigned long long)div_u64(submit_copy_ns, submit_calls), + (unsigned long long)div_u64(submit_queue_ns, submit_calls), + (unsigned long long)div_u64(submit_bp_ns, submit_calls), + (unsigned long long)div_u64(submit_bp_loops, submit_calls)); + printf("[iouring-prof] per_cleanup(ns)=%llu per_task_free(ns)=%llu\n", + (unsigned long long)div_u64(cleanup_ns, cleanup_calls), + (unsigned long long)div_u64(cleanup_ns, cleanup_tasks)); + if (main_total_ns > 0) { + printf("[iouring-prof] main-share: pack=%.1f%% queue=%.1f%% cleanup=%.1f%%\n", + (double)submit_pack_ns * 100.0 / (double)main_total_ns, + (double)submit_queue_ns * 100.0 / (double)main_total_ns, + (double)cleanup_ns * 100.0 / (double)main_total_ns); + if (submit_pack_ns > 0) { + double other_pct = 100.0 - + ((double)submit_alloc_ns * 100.0 / (double)submit_pack_ns) - + ((double)submit_copy_ns * 100.0 / (double)submit_pack_ns); + if (other_pct < 0.0) { + other_pct = 0.0; + } + printf("[iouring-prof] pack-share: alloc=%.1f%% copy=%.1f%% other=%.1f%%\n", + (double)submit_alloc_ns * 100.0 / (double)submit_pack_ns, + (double)submit_copy_ns * 100.0 / (double)submit_pack_ns, + other_pct); + } + } } void iouring_shutdown(iouring_ctx_t *ctx) { @@ -486,14 +627,22 @@ void iouring_shutdown(iouring_ctx_t *ctx) { static int queue_task_with_backpressure(iouring_ctx_t *ctx, task_t *t) { uint32_t rr; int n; + uint64_t start_ns = 0; + uint64_t loops = 0; + int sampled = prof_should_sample(); if (!ctx || !ctx->workers || !t) { return -1; } + if (sampled) { + start_ns = mono_ns(); + } + n = ctx->worker_nr; rr = atomic_fetch_add_explicit(&ctx->rr_next, 1, memory_order_relaxed); while (atomic_load_explicit(&ctx->stop, memory_order_acquire) == 0) { + loops++; for (int i = 0; i < n; i++) { int idx = (int)((rr + (uint32_t)i) % (uint32_t)n); iouring_worker_t *w = &ctx->workers[idx]; @@ -502,6 +651,13 @@ static int queue_task_with_backpressure(iouring_ctx_t *ctx, task_t *t) { if (need_notify) { worker_notify(w); } + if (sampled) { + uint64_t ns = mono_ns() - start_ns; + atomic_fetch_add_explicit(&g_prof.submit_backpressure_ns, ns * g_prof_scale, + memory_order_relaxed); + atomic_fetch_add_explicit(&g_prof.submit_backpressure_loops, loops * g_prof_scale, + memory_order_relaxed); + } return 0; } } @@ -511,6 +667,14 @@ static int queue_task_with_backpressure(iouring_ctx_t *ctx, task_t *t) { sched_yield(); } + if (sampled) { + uint64_t ns = mono_ns() - start_ns; + atomic_fetch_add_explicit(&g_prof.submit_backpressure_ns, ns * g_prof_scale, + memory_order_relaxed); + atomic_fetch_add_explicit(&g_prof.submit_backpressure_loops, loops * g_prof_scale, + memory_order_relaxed); + } + return -1; } @@ -519,10 +683,21 @@ task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int size_t total = 0; uint8_t *packed = NULL; size_t copied = 0; + uint64_t pack_start = 0; + uint64_t alloc_start = 0; + uint64_t copy_start = 0; + uint64_t queue_start = 0; + int sampled = prof_should_sample(); if (!ctx || !ctx->workers || !bufs || !lens || count <= 0) { return NULL; } + atomic_fetch_add_explicit(&g_prof.submit_calls, 1, memory_order_relaxed); + + if (sampled) { + pack_start = mono_ns(); + alloc_start = pack_start; + } t = (task_t *)kvs_malloc(sizeof(task_t)); if (!t) { @@ -561,6 +736,13 @@ task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int return NULL; } + if (sampled) { + uint64_t alloc_ns = mono_ns() - alloc_start; + atomic_fetch_add_explicit(&g_prof.submit_alloc_ns, alloc_ns * g_prof_scale, + memory_order_relaxed); + copy_start = mono_ns(); + } + for (int i = 0; i < count; ++i) { size_t len = lens[i]; if (len == 0) { @@ -570,14 +752,33 @@ task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int copied += len; } + if (sampled) { + uint64_t copy_ns = mono_ns() - copy_start; + atomic_fetch_add_explicit(&g_prof.submit_copy_ns, copy_ns * g_prof_scale, + memory_order_relaxed); + } + t->iovs[0].iov_base = packed; t->iovs[0].iov_len = copied; + if (sampled) { + uint64_t pack_ns = mono_ns() - pack_start; + atomic_fetch_add_explicit(&g_prof.submit_pack_ns, pack_ns * g_prof_scale, + memory_order_relaxed); + queue_start = mono_ns(); + } + if (queue_task_with_backpressure(ctx, t) != 0) { task_destroy(t); return NULL; } + if (sampled) { + uint64_t queue_ns = mono_ns() - queue_start; + atomic_fetch_add_explicit(&g_prof.submit_queue_ns, queue_ns * g_prof_scale, + memory_order_relaxed); + } + return t; } diff --git a/diskuring/diskuring.h b/diskuring/diskuring.h index c3925af..0c8c9e7 100644 --- a/diskuring/diskuring.h +++ b/diskuring/diskuring.h @@ -75,6 +75,7 @@ void iouring_shutdown(iouring_ctx_t *ctx); task_t *submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off); int uring_task_complete(iouring_ctx_t *ctx); void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx); +void iouring_profile_dump(iouring_ctx_t *ctx); extern iouring_ctx_t global_uring_ctx; diff --git a/kvs_protocol_resp.c b/kvs_protocol_resp.c index ee0225e..05d5d1c 100644 --- a/kvs_protocol_resp.c +++ b/kvs_protocol_resp.c @@ -555,6 +555,7 @@ int resp_dispatch(const resp_cmd_t *cmd, resp_value_t *out_value) { *out_value = resp_simple("OK"); return 0; case KVS_CMD_MEM_PRINT:{ + iouring_profile_dump(&global_uring_ctx); int ret = kvs_mem_printf(); *out_value = resp_int(ret); return 0; @@ -580,4 +581,4 @@ void __ssync(const uint8_t *ip, uint32_t ip_len, int port, unsigned long long se void __sready(){ -} \ No newline at end of file +}