uring落盘的无锁队列修改

This commit is contained in:
1iaan
2026-02-11 11:59:40 +00:00
parent c1458a6693
commit 68bb4b3f9c
16 changed files with 293 additions and 1135 deletions

View File

@@ -3,7 +3,8 @@
#include <poll.h>
#include <sys/eventfd.h>
static destroy_queue_t g_destroy_queue = {NULL, PTHREAD_MUTEX_INITIALIZER};
static destroy_queue_t g_destroy_queue = {ATOMIC_VAR_INIT(NULL)};
static destroy_queue_t g_submit_queue = {ATOMIC_VAR_INIT(NULL)};
static long long push_to_queue = 0;
static long long push_to_sqe = 0;
@@ -13,8 +14,6 @@ static long long release_cnt = 0;
void task_init(task_t *t)
{
push_to_queue ++;
pthread_mutex_init(&t->m, NULL);
pthread_cond_init(&t->cv, NULL);
t->done = 0;
t->res = 0;
t->next = NULL;
@@ -22,27 +21,18 @@ void task_init(task_t *t)
void task_finish(task_t *t, int res)
{
pthread_mutex_lock(&t->m);
t->res = res;
t->done = 1;
pthread_cond_broadcast(&t->cv);
pthread_mutex_unlock(&t->m);
}
int task_wait(task_t *t)
{
pthread_mutex_lock(&t->m);
while (!t->done)
pthread_cond_wait(&t->cv, &t->m);
int r = t->res;
pthread_mutex_unlock(&t->m);
return r;
}
void task_destroy(task_t *t)
{
pthread_mutex_destroy(&t->m);
pthread_cond_destroy(&t->cv);
if (t->iovs) {
for (int i = 0; i < t->iovcnt; i++) {
if (t->iovs[i].iov_base) {
@@ -55,49 +45,71 @@ void task_destroy(task_t *t)
kvs_free(t);
}
static void queue_push(iouring_ctx_t *ctx, task_t *t)
static void submit_queue_push(iouring_ctx_t *ctx, task_t *t)
{
pthread_mutex_lock(&ctx->q_m);
if (ctx->q_tail)
ctx->q_tail->next = t;
else
ctx->q_head = t;
ctx->q_tail = t;
pthread_cond_signal(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
task_t *old_head;
do {
old_head = atomic_load_explicit(&g_submit_queue.head, memory_order_relaxed);
t->next = old_head;
} while (!atomic_compare_exchange_weak_explicit(
&g_submit_queue.head, &old_head, t,
memory_order_release,
memory_order_relaxed));
/* 若之前队列为空,通知 worker */
if (old_head == NULL) {
uint64_t val = 1;
write(ctx->event_fd, &val, sizeof(val));
}
}
static task_t *queue_pop(iouring_ctx_t *ctx)
static task_t *submit_steal_all(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
task_t *t = ctx->q_head;
if (t) {
ctx->q_head = t->next;
if (!ctx->q_head) {
ctx->q_tail = NULL;
}
t->next = NULL;
}
pthread_mutex_unlock(&ctx->q_m);
return t;
return atomic_exchange_explicit(&g_submit_queue.head, NULL,
memory_order_acquire);
}
static void queue_push_front(iouring_ctx_t *ctx, task_t *t)
static void submit_queue_putback(iouring_ctx_t *ctx, task_t *head)
{
pthread_mutex_lock(&ctx->q_m);
t->next = ctx->q_head;
ctx->q_head = t;
if (!ctx->q_tail) {
ctx->q_tail = t;
while (head) {
task_t *nxt = head->next;
submit_queue_push(ctx, head);
head = nxt;
}
pthread_mutex_unlock(&ctx->q_m);
}
static int submit_queue_empty(iouring_ctx_t *ctx)
{
return atomic_load_explicit(&g_submit_queue.head,
memory_order_acquire) == NULL;
}
static void destroy_queue_push(task_t *t)
{
task_t *old_head;
do {
old_head = atomic_load_explicit(&g_destroy_queue.head, memory_order_relaxed);
t->next = old_head;
} while (!atomic_compare_exchange_strong_explicit(
&g_destroy_queue.head,
&old_head,
t,
memory_order_release,
memory_order_relaxed));
}
static task_t *destroy_queue_steal_all(void)
{
return atomic_exchange_explicit(&g_destroy_queue.head, NULL, memory_order_acquire);
}
/* =============================================================================================== */
extern void sync_wakeup();
static void *worker_main(void *arg)
{
iouring_ctx_t *ctx = (iouring_ctx_t *)arg;
const int BATCH_SIZE = 256; // 每次最多准备这么多,防止一次占满 SQ
const int BATCH_SIZE = 512; // 每次最多准备这么多,防止一次占满 SQ
while (!ctx->stop)
{
@@ -115,6 +127,9 @@ static void *worker_main(void *arg)
unsigned head;
io_uring_for_each_cqe(&ctx->ring, head, cqe) {
if (cqe->user_data == 0) {
continue;
}
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
atomic_fetch_sub(&ctx->in_flight, 1);
task_finish(done, cqe->res);
@@ -123,10 +138,7 @@ static void *worker_main(void *arg)
fprintf(stderr, "write fail: fd=%d res=%d, offset=%ld\n", done->fd, cqe->res, done->off);
}
pthread_mutex_lock(&g_destroy_queue.lock);
done->next = g_destroy_queue.head;
g_destroy_queue.head = done;
pthread_mutex_unlock(&g_destroy_queue.lock);
destroy_queue_push(done);
get_from_cqe++;
cq_count++;
@@ -145,19 +157,25 @@ static void *worker_main(void *arg)
// ========== 2. 批量准备 SQE ==========
int batch_count = 0;
while (true) {
int current_in_flight = atomic_load(&ctx->in_flight);
if (current_in_flight >= ctx->max_in_flight) {
task_t *pending = submit_steal_all(ctx);
while (pending) {
if (atomic_load(&ctx->in_flight) >= ctx->max_in_flight) {
submit_queue_putback(ctx, pending);
pending = NULL;
break; // 满了,停止取任务
}
task_t *t = queue_pop(ctx);
if (!t) break;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (!sqe) {
queue_push_front(ctx, t);
submit_queue_putback(ctx, pending);
pending = NULL;
break;
}
task_t *t = pending;
pending = pending->next;
t->next = NULL;
io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off);
sqe->user_data = (uint64_t)(uintptr_t)t;
batch_count++;
@@ -168,24 +186,32 @@ static void *worker_main(void *arg)
int submitted = io_uring_submit(&ctx->ring);
push_to_sqe += submitted;
atomic_fetch_add(&ctx->in_flight, submitted);
continue;
}
// ========== 4. 没事做就等待 ==========
if (batch_count == 0) {
int inflight = atomic_load(&ctx->in_flight);
if (inflight > 0) {
// 有任务在飞等一个CQE
continue;
} else {
// 真没事了,等新任务
pthread_mutex_lock(&ctx->q_m);
while (ctx->q_head == NULL && !ctx->stop) {
pthread_cond_wait(&ctx->q_cv, &ctx->q_m);
int inflight = atomic_load(&ctx->in_flight);
if (inflight > 0) {
io_uring_submit_and_wait(&ctx->ring, 1);
// 有任务在飞等一个CQE
continue;
} else {
// 真没事了,等新任务
if (submit_queue_empty(ctx)) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (sqe) {
uint64_t buf;
io_uring_prep_read(sqe, ctx->event_fd, &buf, sizeof(buf), 0);
sqe->user_data = 0; // Special for event
io_uring_submit_and_wait(&ctx->ring, 1);
// After wait, consume the cqe
struct io_uring_cqe *cqe;
io_uring_peek_cqe(&ctx->ring, &cqe);
if (cqe && cqe->user_data == 0) {
io_uring_cq_advance(&ctx->ring, 1);
}
}
pthread_mutex_unlock(&ctx->q_m);
}
}
}
@@ -201,10 +227,7 @@ static void *worker_main(void *arg)
atomic_fetch_sub(&ctx->in_flight, 1);
task_finish(done, cqe->res);
pthread_mutex_lock(&g_destroy_queue.lock);
done->next = g_destroy_queue.head;
g_destroy_queue.head = done;
pthread_mutex_unlock(&g_destroy_queue.lock);
destroy_queue_push(done);
get_from_cqe++;
final_cq++;
@@ -229,16 +252,21 @@ static void *worker_main(void *arg)
int iouring_init(iouring_ctx_t *ctx, unsigned entries)
{
memset(ctx, 0, sizeof(*ctx));
pthread_mutex_init(&ctx->q_m, NULL);
pthread_cond_init(&ctx->q_cv, NULL);
ctx->stop = 0;
atomic_init(&g_submit_queue.head, NULL);
atomic_init(&g_destroy_queue.head, NULL);
ctx->event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (ctx->event_fd < 0) {
// Handle error
return -errno;
}
struct io_uring_params params;
memset(&params, 0, sizeof(params));
// params.flags |= IORING_SETUP_CQSIZE;
// params.cq_entries = 256 * 1024;
// params.sq_entries = 128 * 1024;
int ret = io_uring_queue_init_params(entries, &ctx->ring, &params);
if (ret < 0) {
fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n",
@@ -263,16 +291,15 @@ int iouring_init(iouring_ctx_t *ctx, unsigned entries)
void iouring_shutdown(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
ctx->stop = 1;
pthread_cond_broadcast(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
uint64_t val = 1;
write(ctx->event_fd, &val, sizeof(val));
pthread_join(ctx->th, NULL);
io_uring_queue_exit(&ctx->ring);
pthread_mutex_destroy(&ctx->q_m);
pthread_cond_destroy(&ctx->q_cv);
close(ctx->event_fd);
}
task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){
@@ -304,37 +331,35 @@ task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int
memcpy(buf, bufs[i], len);
t->iovs[i].iov_base = buf;
t->iovs[i].iov_len = len;
// t->iovs[i].iov_base = bufs[i];
// t->iovs[i].iov_len = lens[i];
}
t->iovcnt = count;
queue_push(ctx, t);
submit_queue_push(ctx, t);
return t;
}
int uring_task_complete(iouring_ctx_t *ctx){
pthread_mutex_lock(&ctx->q_m);
int notask = ctx->q_head == NULL;
pthread_mutex_unlock(&ctx->q_m);
int noflight = atomic_load(&ctx->in_flight);
// printf("%d\n", noflight);
return (noflight == 0) && notask;
return submit_queue_empty(ctx) && atomic_load(&ctx->in_flight) == 0;
}
// 主线程定期调用此函数清理
void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) {
pthread_mutex_lock(&g_destroy_queue.lock);
task_t *list = g_destroy_queue.head;
g_destroy_queue.head = NULL;
pthread_mutex_unlock(&g_destroy_queue.lock);
task_t *list = destroy_queue_steal_all();
int cnt = 0;
while (list) {
cnt ++;
task_t *next = list->next;
task_destroy(list); // 在主线程执行销毁
list = next;
task_t *cur = list;
while (cur) {
cnt++;
task_t *next = cur->next;
task_destroy(cur);
cur = next;
}
release_cnt += cnt;
// printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt);
}

View File

@@ -9,7 +9,7 @@
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <stdatomic.h>
typedef enum { TASK_READ, TASK_WRITE } task_op_t;
@@ -24,30 +24,28 @@ typedef struct task {
struct iovec *iovs; // iovec 数组
int iovcnt; // iovec 数量
pthread_mutex_t m;
pthread_cond_t cv;
struct task *next;
} task_t;
typedef struct {
_Atomic(task_t *) head;
} task_stack_t;
typedef struct {
_Atomic(task_t *) head;
} destroy_queue_t;
typedef struct {
struct io_uring ring;
pthread_t th;
pthread_mutex_t q_m;
pthread_cond_t q_cv;
task_t *q_head, *q_tail;
int event_fd;
int stop;
atomic_int in_flight;
_Atomic int in_flight;
int max_in_flight;
} iouring_ctx_t;
typedef struct {
task_t *head;
pthread_mutex_t lock;
} destroy_queue_t;
void task_init(task_t *t);
void task_finish(task_t *t, int res);