Files
ldb/diskuring/diskuring.c

340 lines
9.2 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "diskuring.h"
#include "memory/alloc_dispatch.h"
#include <poll.h>
#include <sys/eventfd.h>
static destroy_queue_t g_destroy_queue = {NULL, PTHREAD_MUTEX_INITIALIZER};
static long long push_to_queue = 0;
static long long push_to_sqe = 0;
static long long get_from_cqe = 0;
static long long release_cnt = 0;
void task_init(task_t *t)
{
push_to_queue ++;
pthread_mutex_init(&t->m, NULL);
pthread_cond_init(&t->cv, NULL);
t->done = 0;
t->res = 0;
t->next = NULL;
}
void task_finish(task_t *t, int res)
{
pthread_mutex_lock(&t->m);
t->res = res;
t->done = 1;
pthread_cond_broadcast(&t->cv);
pthread_mutex_unlock(&t->m);
}
int task_wait(task_t *t)
{
pthread_mutex_lock(&t->m);
while (!t->done)
pthread_cond_wait(&t->cv, &t->m);
int r = t->res;
pthread_mutex_unlock(&t->m);
return r;
}
void task_destroy(task_t *t)
{
pthread_mutex_destroy(&t->m);
pthread_cond_destroy(&t->cv);
if (t->iovs) {
for (int i = 0; i < t->iovcnt; i++) {
if (t->iovs[i].iov_base) {
kvs_free(t->iovs[i].iov_base);
}
}
kvs_free(t->iovs);
}
kvs_free(t);
}
static void queue_push(iouring_ctx_t *ctx, task_t *t)
{
pthread_mutex_lock(&ctx->q_m);
if (ctx->q_tail)
ctx->q_tail->next = t;
else
ctx->q_head = t;
ctx->q_tail = t;
pthread_cond_signal(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
}
static task_t *queue_pop(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
task_t *t = ctx->q_head;
if (t) {
ctx->q_head = t->next;
if (!ctx->q_head) {
ctx->q_tail = NULL;
}
t->next = NULL;
}
pthread_mutex_unlock(&ctx->q_m);
return t;
}
static void queue_push_front(iouring_ctx_t *ctx, task_t *t)
{
pthread_mutex_lock(&ctx->q_m);
t->next = ctx->q_head;
ctx->q_head = t;
if (!ctx->q_tail) {
ctx->q_tail = t;
}
pthread_mutex_unlock(&ctx->q_m);
}
extern void sync_wakeup();
static void *worker_main(void *arg)
{
iouring_ctx_t *ctx = (iouring_ctx_t *)arg;
const int BATCH_SIZE = 256; // 每次最多准备这么多,防止一次占满 SQ
while (!ctx->stop)
{
int cq_count = 0;
// ========== 1. 收割 CQE ==========
// 检查溢出
if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) {
fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n");
abort();
}
while (true) {
struct io_uring_cqe *cqe;
unsigned head;
io_uring_for_each_cqe(&ctx->ring, head, cqe) {
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
atomic_fetch_sub(&ctx->in_flight, 1);
task_finish(done, cqe->res);
if (cqe->res < 0) {
fprintf(stderr, "write fail: fd=%d res=%d, offset=%ld\n", done->fd, cqe->res, done->off);
}
pthread_mutex_lock(&g_destroy_queue.lock);
done->next = g_destroy_queue.head;
g_destroy_queue.head = done;
pthread_mutex_unlock(&g_destroy_queue.lock);
get_from_cqe++;
cq_count++;
}
if (cq_count > 0) {
io_uring_cq_advance(&ctx->ring, cq_count);
sync_wakeup();
}
if (cq_count == 0) break;
cq_count = 0;
}
// ========== 2. 批量准备 SQE ==========
int batch_count = 0;
while (true) {
int current_in_flight = atomic_load(&ctx->in_flight);
if (current_in_flight >= ctx->max_in_flight) {
break; // 满了,停止取任务
}
task_t *t = queue_pop(ctx);
if (!t) break;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (!sqe) {
queue_push_front(ctx, t);
break;
}
io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off);
sqe->user_data = (uint64_t)(uintptr_t)t;
batch_count++;
}
// ========== 3. 提交 ==========
if (batch_count > 0) {
int submitted = io_uring_submit(&ctx->ring);
push_to_sqe += submitted;
atomic_fetch_add(&ctx->in_flight, submitted);
continue;
}
// ========== 4. 没事做就等待 ==========
if (batch_count == 0) {
int inflight = atomic_load(&ctx->in_flight);
if (inflight > 0) {
// 有任务在飞等一个CQE
continue;
} else {
// 真没事了,等新任务
pthread_mutex_lock(&ctx->q_m);
while (ctx->q_head == NULL && !ctx->stop) {
pthread_cond_wait(&ctx->q_cv, &ctx->q_m);
}
pthread_mutex_unlock(&ctx->q_m);
}
}
}
printf("Shutdown: draining remaining CQEs...\n");
int final_cq = 0;
struct io_uring_cqe *cqe;
unsigned head;
while (atomic_load(&ctx->in_flight) > 0) {
io_uring_for_each_cqe(&ctx->ring, head, cqe) {
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
atomic_fetch_sub(&ctx->in_flight, 1);
task_finish(done, cqe->res);
pthread_mutex_lock(&g_destroy_queue.lock);
done->next = g_destroy_queue.head;
g_destroy_queue.head = done;
pthread_mutex_unlock(&g_destroy_queue.lock);
get_from_cqe++;
final_cq++;
}
if (final_cq > 0) {
io_uring_cq_advance(&ctx->ring, final_cq);
final_cq = 0;
}
// 如果还有 inflight等一下
if (atomic_load(&ctx->in_flight) > 0) {
io_uring_submit_and_wait(&ctx->ring, 1);
}
}
printf("exit uring, stop: %d, inflight: %d\n", ctx->stop,
atomic_load(&ctx->in_flight));
return NULL;
}
int iouring_init(iouring_ctx_t *ctx, unsigned entries)
{
memset(ctx, 0, sizeof(*ctx));
pthread_mutex_init(&ctx->q_m, NULL);
pthread_cond_init(&ctx->q_cv, NULL);
ctx->stop = 0;
struct io_uring_params params;
memset(&params, 0, sizeof(params));
// params.flags |= IORING_SETUP_CQSIZE;
// params.cq_entries = 256 * 1024;
// params.sq_entries = 128 * 1024;
int ret = io_uring_queue_init_params(entries, &ctx->ring, &params);
if (ret < 0) {
fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n",
ret, strerror(-ret));
return ret;
}
unsigned cq_size = *ctx->ring.cq.kring_entries;
printf("Kernel CQ size: %u\n", cq_size);
ctx->max_in_flight = (cq_size * 8) / 10;
atomic_init(&ctx->in_flight, 0);
ret = pthread_create(&ctx->th, NULL, worker_main, ctx);
if (ret != 0)
{
io_uring_queue_exit(&ctx->ring);
return -ret;
}
return 0;
}
void iouring_shutdown(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
ctx->stop = 1;
pthread_cond_broadcast(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
pthread_join(ctx->th, NULL);
io_uring_queue_exit(&ctx->ring);
pthread_mutex_destroy(&ctx->q_m);
pthread_cond_destroy(&ctx->q_cv);
}
task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){
if (!bufs || !lens || count <= 0) return NULL;
task_t *t = (task_t *)kvs_malloc(sizeof(task_t));
task_init(t);
t->op = TASK_WRITE;
t->fd = fd;
t->off = off;
t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count);
if(!t->iovs) {
kvs_free(t);
return NULL;
}
for(int i = 0;i < count; ++ i){
size_t len = lens[i];
void *buf = kvs_malloc(len);
if(!buf){
for(int j = 0; j < i; ++j){
if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base);
}
kvs_free(t->iovs);
kvs_free(t);
return NULL;
}
memcpy(buf, bufs[i], len);
t->iovs[i].iov_base = buf;
t->iovs[i].iov_len = len;
}
t->iovcnt = count;
queue_push(ctx, t);
return t;
}
int uring_task_complete(iouring_ctx_t *ctx){
pthread_mutex_lock(&ctx->q_m);
int notask = ctx->q_head == NULL;
pthread_mutex_unlock(&ctx->q_m);
int noflight = atomic_load(&ctx->in_flight);
// printf("%d\n", noflight);
return (noflight == 0) && notask;
}
// 主线程定期调用此函数清理
void cleanup_finished_iouring_tasks(iouring_ctx_t *ctx) {
pthread_mutex_lock(&g_destroy_queue.lock);
task_t *list = g_destroy_queue.head;
g_destroy_queue.head = NULL;
pthread_mutex_unlock(&g_destroy_queue.lock);
int cnt = 0;
while (list) {
cnt ++;
task_t *next = list->next;
task_destroy(list); // 在主线程执行销毁
list = next;
}
release_cnt += cnt;
// printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt);
}