Files
ldb/diskuring/diskuring.c.nothread

199 lines
4.4 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "diskuring.h"
#include "memory/alloc_dispatch.h"
#include <poll.h>
#include <sys/eventfd.h>
void task_init(task_t *t)
{
t->done = 0;
t->res = 0;
t->next = NULL;
}
void task_finish(task_t *t, int res)
{
t->res = res;
t->done = 1;
}
void task_destroy(task_t *t)
{
if (t->iovs) {
for (int i = 0; i < t->iovcnt; i++) {
if (t->iovs[i].iov_base) {
kvs_free(t->iovs[i].iov_base);
}
}
kvs_free(t->iovs);
}
kvs_free(t);
}
int iouring_init(iouring_ctx_t *ctx, unsigned entries)
{
memset(ctx, 0, sizeof(*ctx));
struct io_uring_params params;
memset(&params, 0, sizeof(params));
// params.flags |= IORING_SETUP_CQSIZE;
// params.cq_entries = 256 * 1024;
// params.sq_entries = 128 * 1024;
int ret = io_uring_queue_init_params(entries, &ctx->ring, &params);
if (ret < 0) {
fprintf(stderr, "io_uring_queue_init_params failed: %d (%s)\n",
ret, strerror(-ret));
return ret;
}
unsigned cq_size = *ctx->ring.cq.kring_entries;
printf("Kernel CQ size: %u\n", cq_size);
if (ret != 0)
{
io_uring_queue_exit(&ctx->ring);
return -ret;
}
return 0;
}
void iouring_shutdown(iouring_ctx_t *ctx)
{
io_uring_queue_exit(&ctx->ring);
}
void harvest_cqes(iouring_ctx_t *ctx)
{
struct io_uring_cqe *cqe;
unsigned head;
int cq_count = 0;
// 使用 for_each_cqe 薅干净 CQ
io_uring_for_each_cqe(&ctx->ring, head, cqe) {
task_t *done = (task_t *)(uintptr_t)cqe->user_data;
task_finish(done, cqe->res);
if (cqe->res < 0) {
fprintf(stderr, "write fail: fd=%d res=%d\n", done->fd, cqe->res);
}
// 直接 destroy单线程无需全局队列
task_destroy(done);
cq_count++;
}
if (cq_count > 0) {
// printf("harvest cq:%d\n", cq_count);
io_uring_cq_advance(&ctx->ring, cq_count);
}
// 检查 CQ overflow保险
if (*ctx->ring.sq.kflags & IORING_SQ_CQ_OVERFLOW) {
fprintf(stderr, "FATAL: CQ overflow detected!\n");
abort();
}
}
task_t* submit_write(iouring_ctx_t *ctx, int fd, void **bufs, size_t *lens, int count, off_t off){
if (!bufs || !lens || count <= 0) return NULL;
task_t *t = (task_t *)kvs_malloc(sizeof(task_t));
task_init(t);
t->op = TASK_WRITE;
t->fd = fd;
t->off = off;
t->iovs = (struct iovec *)kvs_malloc(sizeof(struct iovec) * count);
if(!t->iovs) {
kvs_free(t);
return NULL;
}
for(int i = 0;i < count; ++ i){
size_t len = lens[i];
void *buf = kvs_malloc(len);
if(!buf){
for(int j = 0; j < i; ++j){
if(t->iovs[j].iov_base) kvs_free(t->iovs[j].iov_base);
}
kvs_free(t->iovs);
kvs_free(t);
return NULL;
}
memcpy(buf, bufs[i], len);
t->iovs[i].iov_base = buf;
t->iovs[i].iov_len = len;
}
t->iovcnt = count;
harvest_cqes(ctx);
if(!ctx->head){
ctx->head = t;
ctx->tail = t;
}else{
ctx->tail->next = t;
ctx->tail = t;
}
int submitted = 0;
while(true){
task_t *cur = ctx->head;
if(!cur){
break;
}
ctx->head = cur->next;
if (!ctx->head) {
ctx->tail = NULL;
}
cur->next = NULL;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (!sqe) {
break;
}
io_uring_prep_writev(sqe, cur->fd, cur->iovs, cur->iovcnt, cur->off);
sqe->user_data = (uint64_t)(uintptr_t)cur;
submitted++;
}
if(submitted > 0){
int ret = io_uring_submit(&ctx->ring);
}
return t;
}
void iouring_tick(iouring_ctx_t *ctx) {
harvest_cqes(ctx);
int submitted = 0;
while(ctx->head){
struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
if (!sqe) {
break;
}
task_t *cur = ctx->head;
ctx->head = cur->next;
if (!ctx->head) {
ctx->tail = NULL;
}
cur->next = NULL;
io_uring_prep_writev(sqe, cur->fd, cur->iovs, cur->iovcnt, cur->off);
sqe->user_data = (uint64_t)(uintptr_t)cur;
submitted++;
}
if(submitted > 0){
int ret = io_uring_submit(&ctx->ring);
}
}