Files
ldb/diskuring/diskuring.c
2026-01-31 15:38:52 +00:00

373 lines
11 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "diskuring.h"
#include "memory/alloc_dispatch.h"
#include <poll.h>
#include <sys/eventfd.h>
static destroy_queue_t g_destroy_queue = {NULL, PTHREAD_MUTEX_INITIALIZER};
static long long push_to_queue = 0;
static long long push_to_sqe = 0;
static long long get_from_cqe = 0;
static long long release_cnt = 0;
void task_init(task_t *t)
{
push_to_queue ++;
pthread_mutex_init(&t->m, NULL);
pthread_cond_init(&t->cv, NULL);
t->done = 0;
t->res = 0;
t->next = NULL;
}
void task_finish(task_t *t, int res)
{
pthread_mutex_lock(&t->m);
t->res = res;
t->done = 1;
pthread_cond_broadcast(&t->cv);
pthread_mutex_unlock(&t->m);
}
int task_wait(task_t *t)
{
pthread_mutex_lock(&t->m);
while (!t->done)
pthread_cond_wait(&t->cv, &t->m);
int r = t->res;
pthread_mutex_unlock(&t->m);
return r;
}
void task_destroy(task_t *t)
{
pthread_mutex_destroy(&t->m);
pthread_cond_destroy(&t->cv);
if (t->iovs) {
for (int i = 0; i < t->iovcnt; i++) {
if (t->iovs[i].iov_base) {
kvs_free(t->iovs[i].iov_base);
}
}
kvs_free(t->iovs);
}
kvs_free(t);
}
static void queue_push(iouring_ctx_t *ctx, task_t *t)
{
pthread_mutex_lock(&ctx->q_m);
if (ctx->q_tail)
ctx->q_tail->next = t;
else
ctx->q_head = t;
ctx->q_tail = t;
pthread_cond_signal(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
}
static void queue_push_front(iouring_ctx_t *ctx, task_t *list_head, task_t *list_tail) {
pthread_mutex_lock(&ctx->q_m);
list_tail->next = ctx->q_head;
ctx->q_head = list_head;
if (!ctx->q_tail) {
ctx->q_tail = list_tail;
}
pthread_cond_signal(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m); }
static task_t *queue_pop_all(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
task_t *list = ctx->q_head;
ctx->q_head = ctx->q_tail = NULL;
pthread_mutex_unlock(&ctx->q_m);
return list;
}
static task_t *queue_pop_n(iouring_ctx_t *ctx, int n)
{
if (n <= 0)
return NULL;
pthread_mutex_lock(&ctx->q_m);
task_t *head = ctx->q_head;
if (!head) {
pthread_mutex_unlock(&ctx->q_m);
return NULL;
}
task_t *curr = head;
task_t *prev = NULL;
int count = 0;
while (curr && count < n) {
prev = curr;
curr = curr->next;
count++;
}
ctx->q_head = curr;
if (!curr) {
// 队列被取空
ctx->q_tail = NULL;
}
prev->next = NULL;
pthread_mutex_unlock(&ctx->q_m);
return head;
}
extern void sync_wakeup();
static void *worker_main(void *arg)
{
iouring_ctx_t *ctx = (iouring_ctx_t *)arg;
const int BATCH_SIZE = 256; // 每次最多准备这么多,防止一次占满 SQ
while (!ctx->stop)
{
int cq_count = 0;
// ========== 1. 疯狂收割 CQE必须优先做释放 in_flight 额度)==========
// 使用 while 而不是 if确保把 CQ 薅干净
while (true) {
struct io_uring_cqe *cqe;
unsigned head;
io_uring_for_each_cqe(&ctx->ring, head, cqe) {
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
// 先减计数(必须在处理前减,否则可能瞬间突破上限)
atomic_fetch_sub(&ctx->in_flight, 1);
task_finish(done, cqe->res);
if (cqe->res < 0) {
fprintf(stderr, "write fail: fd=%d res=%d\n", done->fd, cqe->res);
}
// 加入销毁队列
pthread_mutex_lock(&g_destroy_queue.lock);
done->next = g_destroy_queue.head;
g_destroy_queue.head = done;
pthread_mutex_unlock(&g_destroy_queue.lock);
get_from_cqe++;
cq_count++;
}
if (cq_count > 0) {
io_uring_cq_advance(&ctx->ring, cq_count);
sync_wakeup();
}
// 如果这次没收满,说明 CQ 空了,退出收割循环
if (cq_count == 0) break;
cq_count = 0; // 重置继续薅(可能有新的完成了)
}
// 检查溢出(保险起见,虽然有了背压不该再溢出)
if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) {
fprintf(stderr, "FATAL: CQ overflow detected! Backpressure broken!\n");
abort(); // 直接崩溃,说明逻辑有 bug
}
// ========== 2. 计算还能提交多少 ==========
int current_in_flight = atomic_load(&ctx->in_flight);
int available_slots = ctx->max_in_flight - current_in_flight;
if (available_slots <= 0) {
// 满了!不能取新任务,必须等待 CQE忙等或阻塞等
// 方案 B阻塞等 CQE推荐
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ctx->ring, &cqe);
if (ret == 0 && !ctx->stop) {
// 收到一个 CQE回循环开头处理
continue;
}
continue;
}
// ========== 3. 从任务队列取任务(只取 available_slots 个)==========
task_t *task_list = queue_pop_n(ctx, available_slots);
if (!task_list) {
if (!ctx->stop && atomic_load(&ctx->in_flight) > 0) {
int ret = io_uring_submit_and_wait(&ctx->ring, 1);
continue;
}
// 没任务,等待条件变量
pthread_mutex_lock(&ctx->q_m);
while (ctx->q_head == NULL && !ctx->stop) {
pthread_cond_wait(&ctx->q_cv, &ctx->q_m);
}
pthread_mutex_unlock(&ctx->q_m);
continue;
}
// ========== 4. 准备 SQE受限于 available_slots==========
int batch_count = 0;
task_t *curr = task_list;
task_t *prev = NULL;
task_t *submitted_head = task_list; // 记录这次实际要提交的部分
task_t *remaining_head = NULL; // 装不下的部分
while (curr && batch_count < available_slots) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (!sqe) {
// SQ 满了(这种情况在控制 inflight 后很少见,但保险起见)
break;
}
io_uring_prep_writev(sqe, curr->fd, curr->iovs, curr->iovcnt, curr->off);
sqe->user_data = (uint64_t)(uintptr_t)curr;
batch_count++;
prev = curr;
curr = curr->next;
}
// 断开链表:已准备的 和 未准备的
if (prev) {
prev->next = NULL; // 已提交的部分结尾
}
remaining_head = curr; // 剩下的部分(如果有)
// ========== 5. 提交并增加计数 ==========
if (batch_count > 0) {
int submitted = io_uring_submit(&ctx->ring);
if (submitted != batch_count) {
fprintf(stderr, "CRITICAL: prep %d but submit %d\n", batch_count, submitted);
// 这种情况很严重,说明 ring 损坏了,建议 abort
abort();
}
atomic_fetch_add(&ctx->in_flight, submitted);
push_to_sqe += submitted;
}
// ========== 6. 把没提交的任务塞回队列头部(保持顺序)==========
if (remaining_head) {
task_t *tail = remaining_head;
while (tail->next) tail = tail->next;
queue_push_front(ctx, remaining_head, tail);
}
}
printf("exit uring, stop: %d, inflight: %d\n", ctx->stop,
atomic_load(&ctx->in_flight));
return NULL;
}
int iouring_register_fd(iouring_ctx_t *ctx, int fd) {
int fds[1] = {fd};
int ret = io_uring_register_files(&ctx->ring, fds, 1);
return ret;
}
int iouring_init(iouring_ctx_t *ctx, unsigned entries)
{
memset(ctx, 0, sizeof(*ctx));
pthread_mutex_init(&ctx->q_m, NULL);
pthread_cond_init(&ctx->q_cv, NULL);
ctx->stop = 0;
struct io_uring_params params;
memset(&params, 0, sizeof(params));
// params.flags |= IORING_SETUP_CQSIZE;
// params.cq_entries = 256 * 1024;
// params.sq_entries = 128 * 1024;
int ret = io_uring_queue_init_params(entries, &ctx->ring, &params);
if (ret < 0) {
fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n",
ret, strerror(-ret));
return ret;
}
unsigned cq_size = *ctx->ring.cq.kring_entries;
printf("Kernel CQ size: %u\n", cq_size);
ctx->max_in_flight = (cq_size * 8) / 10;
atomic_init(&ctx->in_flight, 0);
ret = pthread_create(&ctx->th, NULL, worker_main, ctx);
if (ret != 0)
{
io_uring_queue_exit(&ctx->ring);
return -ret;
}
return 0;
}
void iouring_shutdown(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
ctx->stop = 1;
pthread_cond_broadcast(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
pthread_join(ctx->th, NULL);
io_uring_queue_exit(&ctx->ring);
pthread_mutex_destroy(&ctx->q_m);
pthread_cond_destroy(&ctx->q_cv);
}
task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){
if (!bufs || !lens || count <= 0) return NULL;
task_t *t = (task_t *)kvs_malloc(sizeof(task_t));
task_init(t);
t->op = TASK_WRITE;
t->fd = fd;
t->off = off;
t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count);
if(!t->iovs) {
kvs_free(t);
return NULL;
}
for(int i = 0;i < count; ++ i){
size_t len = lens[i];
void *buf = kvs_malloc(len);
if(!buf){
for(int j = 0; j < i; ++j){
if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base);
}
kvs_free(t->iovs);
kvs_free(t);
return NULL;
}
memcpy(buf, bufs[i], len);
t->iovs[i].iov_base = buf;
t->iovs[i].iov_len = len;
}
t->iovcnt = count;
queue_push(ctx, t);
return t;
}
// 主线程定期调用此函数清理
void cleanup_finished_iouring_tasks() {
pthread_mutex_lock(&g_destroy_queue.lock);
task_t *list = g_destroy_queue.head;
g_destroy_queue.head = NULL;
pthread_mutex_unlock(&g_destroy_queue.lock);
int cnt = 0;
while (list) {
cnt ++;
task_t *next = list->next;
task_destroy(list); // 在主线程执行销毁
list = next;
}
// printf("clean: %d\n\n", cnt);
// mp_print();
release_cnt += cnt;
// printf("push:%lld, sqe:%lld, cqe:%lld, rls:%lld\n", push_to_queue, push_to_sqe, get_from_cqe, release_cnt);
}