Files
ldb/diskuring/diskuring.c

204 lines
4.8 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "diskuring.h"
#include "memory/alloc_dispatch.h"
#include <poll.h>
#include <sys/eventfd.h>
void task_init(task_t *t)
{
pthread_mutex_init(&t->m, NULL);
pthread_cond_init(&t->cv, NULL);
t->done = 0;
t->res = 0;
t->next = NULL;
}
void task_finish(task_t *t, int res)
{
pthread_mutex_lock(&t->m);
t->res = res;
t->done = 1;
pthread_cond_broadcast(&t->cv);
pthread_mutex_unlock(&t->m);
}
int task_wait(task_t *t)
{
pthread_mutex_lock(&t->m);
while (!t->done)
pthread_cond_wait(&t->cv, &t->m);
int r = t->res;
pthread_mutex_unlock(&t->m);
return r;
}
void task_destroy(task_t *t)
{
pthread_mutex_destroy(&t->m);
pthread_cond_destroy(&t->cv);
if (t->iovs) {
for (int i = 0; i < t->iovcnt; i++) {
if (t->iovs[i].iov_base) {
kvs_free(t->iovs[i].iov_base);
}
}
kvs_free(t->iovs);
}
kvs_free(t);
}
static void queue_push(iouring_ctx_t *ctx, task_t *t)
{
pthread_mutex_lock(&ctx->q_m);
if (ctx->q_tail)
ctx->q_tail->next = t;
else
ctx->q_head = t;
ctx->q_tail = t;
pthread_cond_signal(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
}
static task_t *queue_pop_all(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
while (!ctx->stop && ctx->q_head == NULL)
{
pthread_cond_wait(&ctx->q_cv, &ctx->q_m);
}
task_t *list = ctx->q_head;
ctx->q_head = ctx->q_tail = NULL;
pthread_mutex_unlock(&ctx->q_m);
return list;
}
static void *worker_main(void *arg)
{
iouring_ctx_t *ctx = (iouring_ctx_t *)arg;
while (!ctx->stop)
{
// 1. 阻塞等任务(这里睡觉)
task_t *list = queue_pop_all(ctx);
if (!list)
continue;
// 2. 提交所有任务
task_t *t;
int submit_cnt = 0;
for (t = list; t; t = t->next)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (!sqe)
break;
io_uring_prep_writev(sqe, t->fd, t->iovs, t->iovcnt, t->off);
sqe->user_data = (uint64_t)(uintptr_t)t;
submit_cnt++;
}
io_uring_submit(&ctx->ring);
// 3. 等全部完成
for (int i = 0; i < submit_cnt; ++i)
{
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ctx->ring, &cqe);
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
task_finish(done, cqe->res);
// todo: 失败应该通知主线程提供一个链表把失败的taskpush进去。主线程用定时器任务定时处理失败信息然后destory。
// 暂时用打印说明失败
if(done->res < 0){
printf("uring failed: fd:%d, offset:%ld\n", done->fd, done->off);
}
task_destroy(done);
io_uring_cqe_seen(&ctx->ring, cqe);
}
}
return NULL;
}
int iouring_register_fd(iouring_ctx_t *ctx, int fd) {
int fds[1] = {fd};
int ret = io_uring_register_files(&ctx->ring, fds, 1);
return ret;
}
int iouring_init(iouring_ctx_t *ctx, unsigned entries)
{
memset(ctx, 0, sizeof(*ctx));
pthread_mutex_init(&ctx->q_m, NULL);
pthread_cond_init(&ctx->q_cv, NULL);
struct io_uring_params params;
memset(&params, 0, sizeof(params));
int ret = io_uring_queue_init_params(entries, &ctx->ring, &params);
if (ret < 0)
return ret;
ret = pthread_create(&ctx->th, NULL, worker_main, ctx);
if (ret != 0)
{
io_uring_queue_exit(&ctx->ring);
return -ret;
}
return 0;
}
void iouring_shutdown(iouring_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->q_m);
ctx->stop = 1;
pthread_cond_broadcast(&ctx->q_cv);
pthread_mutex_unlock(&ctx->q_m);
pthread_join(ctx->th, NULL);
io_uring_queue_exit(&ctx->ring);
pthread_mutex_destroy(&ctx->q_m);
pthread_cond_destroy(&ctx->q_cv);
}
task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){
if (!bufs || !lens || count <= 0) return NULL;
task_t *t = (task_t *)kvs_malloc(sizeof(task_t));
task_init(t);
t->op = TASK_WRITE;
t->fd = fd;
t->off = off;
t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count);
if(!t->iovs) {
kvs_free(t);
return NULL;
}
for(int i = 0;i < count; ++ i){
size_t len = lens[i];
void *buf = kvs_malloc(len);
if(!buf){
for(int j = 0; j < i; ++j){
if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base);
}
kvs_free(t->iovs);
kvs_free(t);
return NULL;
}
memcpy(buf, bufs[i], len);
t->iovs[i].iov_base = buf;
t->iovs[i].iov_len = len;
}
t->iovcnt = count;
queue_push(ctx, t);
return t;
}