#include "diskuring.h" #include "memory/alloc_dispatch.h" #include #include static destroy_queue_t g_destroy_queue = {ATOMIC_VAR_INIT(NULL)}; static destroy_queue_t g_submit_queue = {ATOMIC_VAR_INIT(NULL)}; static long long push_to_queue = 0; static long long push_to_sqe = 0; static long long get_from_cqe = 0; static long long release_cnt = 0; void task_init(task_t *t) { push_to_queue ++; t->done = 0; t->res = 0; t->next = NULL; } void task_finish(task_t *t, int res) { t->res = res; t->done = 1; } int task_wait(task_t *t) { } void task_destroy(task_t *t) { if (t->iovs) { for (int i = 0; i < t->iovcnt; i++) { if (t->iovs[i].iov_base) { kvs_free(t->iovs[i].iov_base); } } kvs_free(t->iovs); } kvs_free(t); } static void submit_queue_push(iouring_ctx_t *ctx, task_t *t) { task_t *old_head; do { old_head = atomic_load_explicit(&g_submit_queue.head, memory_order_relaxed); t->next = old_head; } while (!atomic_compare_exchange_weak_explicit( &g_submit_queue.head, &old_head, t, memory_order_release, memory_order_relaxed)); /* 若之前队列为空,通知 worker */ if (old_head == NULL) { uint64_t val = 1; write(ctx->event_fd, &val, sizeof(val)); } } static task_t *submit_steal_all(iouring_ctx_t *ctx) { return atomic_exchange_explicit(&g_submit_queue.head, NULL, memory_order_acquire); } static void submit_queue_putback(iouring_ctx_t *ctx, task_t *head) { while (head) { task_t *nxt = head->next; submit_queue_push(ctx, head); head = nxt; } } static int submit_queue_empty(iouring_ctx_t *ctx) { return atomic_load_explicit(&g_submit_queue.head, memory_order_acquire) == NULL; } static void destroy_queue_push(task_t *t) { task_t *old_head; do { old_head = atomic_load_explicit(&g_destroy_queue.head, memory_order_relaxed); t->next = old_head; } while (!atomic_compare_exchange_strong_explicit( &g_destroy_queue.head, &old_head, t, memory_order_release, memory_order_relaxed)); } static task_t *destroy_queue_steal_all(void) { return atomic_exchange_explicit(&g_destroy_queue.head, NULL, memory_order_acquire); } /* =============================================================================================== */ extern void sync_wakeup(); static void *worker_main(void *arg) { iouring_ctx_t *ctx = (iouring_ctx_t *)arg; const int BATCH_SIZE = 512; // 每次最多准备这么多,防止一次占满 SQ while (!ctx->stop) { int cq_count = 0; // ========== 1. 收割 CQE ========== // 检查溢出 if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) { fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n"); abort(); } while (true) { struct io_uring_cqe *cqe; unsigned head; io_uring_for_each_cqe(&ctx->ring, head, cqe) { if (cqe->user_data == 0) { continue; } task_t *done = (task_t *)(uintptr_t)cqe->user_data; atomic_fetch_sub(&ctx->in_flight, 1); task_finish(done, cqe->res); if (cqe->res < 0) { fprintf(stderr, "write fail: fd=%d res=%d, offset=%ld\n", done->fd, cqe->res, done->off); } destroy_queue_push(done); get_from_cqe++; cq_count++; } if (cq_count > 0) { io_uring_cq_advance(&ctx->ring, cq_count); sync_wakeup(); } if (cq_count == 0) break; cq_count = 0; } // ========== 2. 批量准备 SQE ========== int batch_count = 0; task_t *pending = submit_steal_all(ctx); while (pending) { if (atomic_load(&ctx->in_flight) >= ctx->max_in_flight) { submit_queue_putback(ctx, pending); pending = NULL; break; // 满了,停止取任务 } struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); if (!sqe) { submit_queue_putback(ctx, pending); pending = NULL; break; } task_t *t = pending; pending = pending->next; t->next = NULL; io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off); sqe->user_data = (uint64_t)(uintptr_t)t; batch_count++; } // ========== 3. 提交 ========== if (batch_count > 0) { int submitted = io_uring_submit(&ctx->ring); push_to_sqe += submitted; atomic_fetch_add(&ctx->in_flight, submitted); continue; } // ========== 4. 没事做就等待 ========== int inflight = atomic_load(&ctx->in_flight); if (inflight > 0) { io_uring_submit_and_wait(&ctx->ring, 1); // 有任务在飞,等一个CQE continue; } else { // 真没事了,等新任务 if (submit_queue_empty(ctx)) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring); if (sqe) { uint64_t buf; io_uring_prep_read(sqe, ctx->event_fd, &buf, sizeof(buf), 0); sqe->user_data = 0; // Special for event io_uring_submit_and_wait(&ctx->ring, 1); // After wait, consume the cqe struct io_uring_cqe *cqe; io_uring_peek_cqe(&ctx->ring, &cqe); if (cqe && cqe->user_data == 0) { io_uring_cq_advance(&ctx->ring, 1); } } } } } printf("Shutdown: draining remaining CQEs...\n"); int final_cq = 0; struct io_uring_cqe *cqe; unsigned head; while (atomic_load(&ctx->in_flight) > 0) { io_uring_for_each_cqe(&ctx->ring, head, cqe) { task_t *done = (task_t *)(uintptr_t)cqe->user_data; atomic_fetch_sub(&ctx->in_flight, 1); task_finish(done, cqe->res); destroy_queue_push(done); get_from_cqe++; final_cq++; } if (final_cq > 0) { io_uring_cq_advance(&ctx->ring, final_cq); final_cq = 0; } // 如果还有 inflight,等一下 if (atomic_load(&ctx->in_flight) > 0) { io_uring_submit_and_wait(&ctx->ring, 1); } } printf("exit uring, stop: %d, inflight: %d\n", ctx->stop, atomic_load(&ctx->in_flight)); return NULL; } int iouring_init(iouring_ctx_t *ctx, unsigned entries) { memset(ctx, 0, sizeof(*ctx)); ctx->stop = 0; atomic_init(&g_submit_queue.head, NULL); atomic_init(&g_destroy_queue.head, NULL); ctx->event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (ctx->event_fd < 0) { // Handle error return -errno; } struct io_uring_params params; memset(¶ms, 0, sizeof(params)); int ret = io_uring_queue_init_params(entries, &ctx->ring, ¶ms); if (ret < 0) { fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n", ret, strerror(-ret)); return ret; } unsigned cq_size = *ctx->ring.cq.kring_entries; printf("Kernel CQ size: %u\n", cq_size); ctx->max_in_flight = (cq_size * 8) / 10; atomic_init(&ctx->in_flight, 0); ret = pthread_create(&ctx->th, NULL, worker_main, ctx); if (ret != 0) { io_uring_queue_exit(&ctx->ring); return -ret; } return 0; } void iouring_shutdown(iouring_ctx_t *ctx) { ctx->stop = 1; uint64_t val = 1; write(ctx->event_fd, &val, sizeof(val)); pthread_join(ctx->th, NULL); io_uring_queue_exit(&ctx->ring); close(ctx->event_fd); } task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){ if (!bufs || !lens || count <= 0) return NULL; task_t *t = (task_t *)kvs_malloc(sizeof(task_t)); task_init(t); t->op = TASK_WRITE; t->fd = fd; t->off = off; t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count); if(!t->iovs) { kvs_free(t); return NULL; } for(int i = 0;i < count; ++ i){ size_t len = lens[i]; void *buf = kvs_malloc(len); if(!buf){ for(int j = 0; j < i; ++j){ if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base); } kvs_free(t->iovs); kvs_free(t); return NULL; } memcpy(buf, bufs[i], len); t->iovs[i].iov_base = buf; t->iovs[i].iov_len = len; // t->iovs[i].iov_base = bufs[i]; // t->iovs[i].iov_len = lens[i]; } t->iovcnt = count; submit_queue_push(ctx, t); return t; } int uring_task_complete(iouring_ctx_t *ctx){ return submit_queue_empty(ctx) && atomic_load(&ctx->in_flight) == 0; } // 主线程定期调用此函数清理 void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) { task_t *list = destroy_queue_steal_all(); int cnt = 0; task_t *cur = list; while (cur) { cnt++; task_t *next = cur->next; task_destroy(cur); cur = next; } release_cnt += cnt; // printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt); }