rebuild
This commit is contained in:
812
src/spdk_engine/io_engine.c
Normal file
812
src/spdk_engine/io_engine.c
Normal file
@@ -0,0 +1,812 @@
|
||||
#include "spdk_engine/io_engine.h"
|
||||
#include "config.h"
|
||||
#include "common/utils.h"
|
||||
|
||||
#include <spdk/event.h>
|
||||
#include <spdk/log.h>
|
||||
#include <spdk/bdev.h>
|
||||
#include <spdk/blob.h>
|
||||
#include <spdk/blob_bdev.h>
|
||||
#include <spdk/thread.h>
|
||||
#include <semaphore.h>
|
||||
#include <errno.h>
|
||||
#include <pthread.h>
|
||||
#include <string.h>
|
||||
|
||||
struct zvfs_spdk_io_engine g_engine = {0};
|
||||
static int g_engine_init_rc = -EAGAIN;
|
||||
static pthread_mutex_t g_super_blob_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
static spdk_blob_id g_super_blob_id_cache = SPDK_BLOBID_INVALID;
|
||||
|
||||
static __thread struct zvfs_tls_ctx tls = {0};
|
||||
|
||||
// 初始化操作上下文
|
||||
struct json_load_ctx {
|
||||
bool done;
|
||||
int rc;
|
||||
};
|
||||
|
||||
struct bs_init_ctx {
|
||||
bool done;
|
||||
int rc;
|
||||
struct spdk_blob_store *bs;
|
||||
};
|
||||
|
||||
// metadata 操作通用上下文
|
||||
struct md_op_ctx {
|
||||
void (*fn)(struct md_op_ctx *ctx);
|
||||
volatile bool done;
|
||||
int rc;
|
||||
// op-specific fields
|
||||
union {
|
||||
struct { // for create
|
||||
uint64_t size_hint;
|
||||
spdk_blob_id blob_id;
|
||||
} create;
|
||||
struct { // for open
|
||||
spdk_blob_id blob_id;
|
||||
struct spdk_blob *blob;
|
||||
} open;
|
||||
struct { // for resize/sync/close
|
||||
struct zvfs_blob_handle *handle;
|
||||
uint64_t new_size; // for resize
|
||||
} handle_op;
|
||||
struct { // for delete
|
||||
spdk_blob_id blob_id;
|
||||
} delete;
|
||||
struct { // for get/set super
|
||||
spdk_blob_id blob_id;
|
||||
} super;
|
||||
};
|
||||
char *op_name;
|
||||
};
|
||||
|
||||
// IO completion 上下文
|
||||
struct io_completion_ctx {
|
||||
bool done;
|
||||
int rc;
|
||||
};
|
||||
|
||||
// metadata poller 线程函数
|
||||
static void *md_poller_fn(void *arg) {
|
||||
spdk_set_thread(g_engine.md_thread);
|
||||
while (true) {
|
||||
spdk_thread_poll(g_engine.md_thread, 0, 0);
|
||||
usleep(1000);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// 前向声明
|
||||
static struct spdk_io_channel *get_current_channel(void);
|
||||
static int dispatch_md_op(struct md_op_ctx *ctx);
|
||||
static int dispatch_md_op_quiet(struct md_op_ctx *ctx);
|
||||
static void md_op_cb(void *arg);
|
||||
static int open_bdev_and_init_bs(const char *bdev_name);
|
||||
static int load_json_config(void);
|
||||
static int ensure_engine_ready(const char *op);
|
||||
|
||||
// callbacks
|
||||
static void json_app_load_done(int rc, void *arg);
|
||||
static void zvfs_spdk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx);
|
||||
static void bs_init_cb(void *arg, struct spdk_blob_store *bs, int bserrno);
|
||||
static void blob_create_cb(void *arg, spdk_blob_id blobid, int rc);
|
||||
static void blob_open_cb(void *arg, struct spdk_blob *blob, int rc);
|
||||
static void blob_resize_cb(void *arg, int rc);
|
||||
static void blob_sync_md_cb(void *arg, int rc);
|
||||
static void blob_close_cb(void *arg, int rc);
|
||||
static void blob_delete_cb(void *arg, int rc);
|
||||
static void io_completion_cb(void *arg, int rc);
|
||||
static void blob_get_super_cb(void *arg, spdk_blob_id blobid, int rc);
|
||||
static void blob_set_super_cb(void *arg, int rc);
|
||||
|
||||
// op functions on matadata
|
||||
static void blob_create_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_open_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_resize_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_sync_md_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_close_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_delete_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_get_super_on_md(struct md_op_ctx *ctx);
|
||||
static void blob_set_super_on_md(struct md_op_ctx *ctx);
|
||||
|
||||
__attribute__((constructor)) static void preload_init(void) {
|
||||
const char *auto_init = getenv("ZVFS_AUTO_INIT");
|
||||
if (!auto_init || strcmp(auto_init, "1") != 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
printf("\n\n auto init \n\n");
|
||||
const char *bdev_name = getenv("SPDK_BDEV_NAME") ? getenv("SPDK_BDEV_NAME") : ZVFS_BDEV;
|
||||
g_engine_init_rc = io_engine_init(bdev_name);
|
||||
if (g_engine_init_rc != 0) {
|
||||
SPDK_ERRLOG("io_engine_init failed in constructor: %d\n", g_engine_init_rc);
|
||||
}
|
||||
}
|
||||
|
||||
static int wait_done(bool *done_ptr, int *rc_ptr, const char *op) {
|
||||
int iter = 0;
|
||||
while (!*done_ptr) {
|
||||
if (tls.thread) {
|
||||
spdk_thread_poll(tls.thread, 0, 0);
|
||||
}else{
|
||||
SPDK_ERRLOG("not init tls.thread\n");
|
||||
return -EBADE;
|
||||
}
|
||||
if (++iter > WAITER_MAX_TIME) {
|
||||
SPDK_ERRLOG("%s timeout\n", op);
|
||||
return -ETIMEDOUT;
|
||||
}
|
||||
}
|
||||
|
||||
if (*rc_ptr != 0) {
|
||||
SPDK_ERRLOG("%s failed in callback: %d\n", op, *rc_ptr);
|
||||
return *rc_ptr;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int wait_done_volatile(volatile bool *done_ptr, int *rc_ptr, const char *op) {
|
||||
int iter = 0;
|
||||
while (!*done_ptr) {
|
||||
if (tls.thread) {
|
||||
spdk_thread_poll(tls.thread, 0, 0);
|
||||
}else{
|
||||
SPDK_ERRLOG("not init tls.thread\n");
|
||||
return -EBADE;
|
||||
}
|
||||
if (++iter > WAITER_MAX_TIME) {
|
||||
SPDK_ERRLOG("%s timeout\n", op);
|
||||
return -ETIMEDOUT;
|
||||
}
|
||||
}
|
||||
|
||||
if (*rc_ptr != 0) {
|
||||
SPDK_ERRLOG("%s failed in callback: %d\n", op, *rc_ptr);
|
||||
return *rc_ptr;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// no rc error
|
||||
static int wait_done_volatile_quiet(volatile bool *done_ptr, int *rc_ptr, const char *op) {
|
||||
int iter = 0;
|
||||
while (!*done_ptr) {
|
||||
if (tls.thread) {
|
||||
spdk_thread_poll(tls.thread, 0, 0);
|
||||
} else {
|
||||
SPDK_ERRLOG("not init tls.thread\n");
|
||||
return -EBADE;
|
||||
}
|
||||
if (++iter > WAITER_MAX_TIME) {
|
||||
SPDK_ERRLOG("%s timeout\n", op);
|
||||
return -ETIMEDOUT;
|
||||
}
|
||||
}
|
||||
|
||||
return *rc_ptr;
|
||||
}
|
||||
|
||||
int io_engine_init(const char *bdev_name) {
|
||||
if (g_engine_init_rc == 0 && g_engine.bs != NULL && g_engine.md_thread != NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct spdk_env_opts env_opts;
|
||||
spdk_env_opts_init(&env_opts);
|
||||
env_opts.name = "zvfs";
|
||||
|
||||
|
||||
if (spdk_env_init(&env_opts) != 0) {
|
||||
SPDK_ERRLOG("spdk_env_init failed\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
spdk_log_set_print_level(SPDK_LOG_NOTICE);
|
||||
spdk_log_set_level(SPDK_LOG_NOTICE);
|
||||
spdk_log_open(NULL);
|
||||
|
||||
if (spdk_thread_lib_init(NULL, 0) != 0) {
|
||||
SPDK_ERRLOG("spdk_thread_lib_init failed\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
// 为主线程 lazy init(constructor 在主线程跑)
|
||||
tls.thread = spdk_thread_create("main_thread", NULL);
|
||||
if (!tls.thread) {
|
||||
SPDK_ERRLOG("create main_thread failed\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
spdk_set_thread(tls.thread);
|
||||
|
||||
if (load_json_config() != 0) {
|
||||
SPDK_ERRLOG("Failed to load SPDK config\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* 这里是因为要让一个线程专门负责poll
|
||||
*/
|
||||
// 创建 md_thread
|
||||
g_engine.md_thread = spdk_thread_create("md_thread", NULL);
|
||||
if (!g_engine.md_thread) {
|
||||
SPDK_ERRLOG("create md_thread failed\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
// 起专用 poller pthread for md_thread
|
||||
pthread_t md_poller_tid;
|
||||
if (pthread_create(&md_poller_tid, NULL, md_poller_fn, NULL) != 0) {
|
||||
SPDK_ERRLOG("pthread_create for md_poller failed\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
if (pthread_detach(md_poller_tid) != 0) {
|
||||
SPDK_ERRLOG("pthread_detach for md_poller failed\n");
|
||||
g_engine_init_rc = -1;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
// init bdev/bs
|
||||
g_super_blob_id_cache = SPDK_BLOBID_INVALID;
|
||||
int rc = open_bdev_and_init_bs(bdev_name);
|
||||
if (rc != 0) {
|
||||
g_engine_init_rc = rc;
|
||||
return rc;
|
||||
}
|
||||
g_engine_init_rc = 0;
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
static int load_json_config(void) {
|
||||
const char *path = getenv("SPDK_JSON_CONFIG");
|
||||
if(!path) path = SPDK_JSON_PATH;
|
||||
|
||||
|
||||
struct json_load_ctx ctx = {
|
||||
.done = false,
|
||||
.rc = 0
|
||||
};
|
||||
spdk_subsystem_init_from_json_config(path, SPDK_DEFAULT_RPC_ADDR, json_app_load_done,
|
||||
&ctx, true);
|
||||
return wait_done(&ctx.done, &ctx.rc, "load_json_config");
|
||||
}
|
||||
|
||||
// lazy get channel
|
||||
static struct spdk_io_channel *get_current_channel(void) {
|
||||
if (ensure_engine_ready("get_current_channel") != 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tls.thread) {
|
||||
spdk_thread_poll(tls.thread, 0, 0);
|
||||
}
|
||||
|
||||
if (!tls.thread) {
|
||||
char name[32];
|
||||
snprintf(name, sizeof(name), "worker_%lu", pthread_self());
|
||||
tls.thread = spdk_thread_create(name, NULL);
|
||||
if (!tls.thread) {
|
||||
SPDK_ERRLOG("spdk_thread_create failed\n");
|
||||
return NULL;
|
||||
}
|
||||
spdk_set_thread(tls.thread);
|
||||
}
|
||||
|
||||
if (!tls.channel) {
|
||||
tls.channel = spdk_bs_alloc_io_channel(g_engine.bs);
|
||||
if (!tls.channel) {
|
||||
SPDK_ERRLOG("alloc io_channel failed\n");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
return tls.channel;
|
||||
}
|
||||
|
||||
// 通用 dispatch md op
|
||||
static int dispatch_md_op(struct md_op_ctx *ctx) {
|
||||
int rc = ensure_engine_ready(ctx->op_name ? ctx->op_name : "dispatch_md_op");
|
||||
if (rc != 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
ctx->done = false;
|
||||
ctx->rc = 0;
|
||||
|
||||
spdk_thread_send_msg(g_engine.md_thread, md_op_cb, ctx);
|
||||
|
||||
return wait_done_volatile(&ctx->done, &ctx->rc, ctx->op_name);
|
||||
}
|
||||
|
||||
static int dispatch_md_op_quiet(struct md_op_ctx *ctx) {
|
||||
int rc = ensure_engine_ready(ctx->op_name ? ctx->op_name : "dispatch_md_op_quiet");
|
||||
if (rc != 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
ctx->done = false;
|
||||
ctx->rc = 0;
|
||||
|
||||
spdk_thread_send_msg(g_engine.md_thread, md_op_cb, ctx);
|
||||
|
||||
return wait_done_volatile_quiet(&ctx->done, &ctx->rc, ctx->op_name);
|
||||
}
|
||||
|
||||
static int ensure_engine_ready(const char *op) {
|
||||
if (g_engine_init_rc != 0) {
|
||||
SPDK_ERRLOG("%s: io engine init failed, rc=%d\n", op, g_engine_init_rc);
|
||||
return g_engine_init_rc;
|
||||
}
|
||||
|
||||
if (!g_engine.bs || !g_engine.md_thread) {
|
||||
SPDK_ERRLOG("%s: io engine not ready (bs=%p, md_thread=%p)\n",
|
||||
op, (void *)g_engine.bs, (void *)g_engine.md_thread);
|
||||
return -EIO;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void md_op_cb(void *arg) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->fn(ctx);
|
||||
}
|
||||
|
||||
void json_app_load_done(int rc, void *arg) {
|
||||
struct json_load_ctx* ctx = (struct json_load_ctx*)arg;
|
||||
ctx->done = true;
|
||||
ctx->rc = rc;
|
||||
}
|
||||
|
||||
// bdev open + bs init
|
||||
static void zvfs_spdk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
|
||||
void *event_ctx) {
|
||||
// 后续加日志或处理
|
||||
switch (type) {
|
||||
case SPDK_BDEV_EVENT_REMOVE:
|
||||
SPDK_NOTICELOG("bdev removed: %s\n", spdk_bdev_get_name(bdev));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void bs_init_cb(void *arg, struct spdk_blob_store *bs, int bserrno) {
|
||||
struct bs_init_ctx *ctx = (struct bs_init_ctx *)arg;
|
||||
ctx->rc = bserrno;
|
||||
ctx->bs = bs;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static int open_bdev_and_init_bs(const char *bdev_name) {
|
||||
SPDK_NOTICELOG("open_bdev_and_init_bs\n");
|
||||
struct spdk_bs_dev *bs_dev = NULL;
|
||||
int rc = spdk_bdev_create_bs_dev_ext(bdev_name, zvfs_spdk_bdev_event_cb, NULL, &bs_dev);
|
||||
if (rc != 0) {
|
||||
SPDK_ERRLOG("spdk_bdev_create_bs_dev_ext failed: %d\n", rc);
|
||||
return rc;
|
||||
}
|
||||
g_engine.bs_dev = bs_dev;
|
||||
|
||||
struct bs_init_ctx ctx = {
|
||||
.done = false,
|
||||
.rc = 0,
|
||||
.bs = NULL
|
||||
};
|
||||
|
||||
/* 优先加载已有 blobstore;失败时回退到 init。 */
|
||||
spdk_bs_load(bs_dev, NULL, bs_init_cb, &ctx);
|
||||
rc = wait_done(&ctx.done, &ctx.rc, "bs_load");
|
||||
if (rc != 0) {
|
||||
SPDK_NOTICELOG("spdk_bs_load failed (%d), fallback to spdk_bs_init\n", rc);
|
||||
|
||||
/*
|
||||
* 注意:spdk_bs_load 失败路径会销毁传入的 dev。
|
||||
* 这里必须重新 create 一个新的 bs_dev,不能复用旧指针。
|
||||
*/
|
||||
bs_dev = NULL;
|
||||
rc = spdk_bdev_create_bs_dev_ext(bdev_name, zvfs_spdk_bdev_event_cb, NULL, &bs_dev);
|
||||
if (rc != 0) {
|
||||
SPDK_ERRLOG("spdk_bdev_create_bs_dev_ext(for init fallback) failed: %d\n", rc);
|
||||
g_engine.bs_dev = NULL;
|
||||
return rc;
|
||||
}
|
||||
g_engine.bs_dev = bs_dev;
|
||||
|
||||
ctx.done = false;
|
||||
ctx.rc = 0;
|
||||
ctx.bs = NULL;
|
||||
|
||||
spdk_bs_init(bs_dev, NULL, bs_init_cb, &ctx);
|
||||
rc = wait_done(&ctx.done, &ctx.rc, "bs_init");
|
||||
if (rc != 0) {
|
||||
g_engine.bs_dev = NULL;
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
g_engine.bs = ctx.bs;
|
||||
g_engine.io_unit_size = spdk_bs_get_io_unit_size(ctx.bs);
|
||||
g_engine.cluster_size = spdk_bs_get_cluster_size(ctx.bs);
|
||||
|
||||
SPDK_NOTICELOG("Blobstore initialized successfully on bdev: %s\n", bdev_name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void blob_get_super_cb(void *arg, spdk_blob_id blobid, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->super.blob_id = blobid;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_set_super_cb(void *arg, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_get_super_on_md(struct md_op_ctx *ctx) {
|
||||
spdk_bs_get_super(g_engine.bs, blob_get_super_cb, ctx);
|
||||
}
|
||||
|
||||
static void blob_set_super_on_md(struct md_op_ctx *ctx) {
|
||||
spdk_bs_set_super(g_engine.bs, ctx->super.blob_id, blob_set_super_cb, ctx);
|
||||
}
|
||||
|
||||
static int bs_get_super_id(spdk_blob_id *blob_id) {
|
||||
struct md_op_ctx ctx = {
|
||||
.fn = blob_get_super_on_md,
|
||||
.op_name = "blob get super",
|
||||
};
|
||||
ctx.super.blob_id = SPDK_BLOBID_INVALID;
|
||||
|
||||
int rc = dispatch_md_op_quiet(&ctx);
|
||||
if (rc != 0) {
|
||||
return rc;
|
||||
}
|
||||
*blob_id = ctx.super.blob_id;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int bs_set_super_id(spdk_blob_id blob_id) {
|
||||
struct md_op_ctx ctx = {
|
||||
.fn = blob_set_super_on_md,
|
||||
.op_name = "blob set super",
|
||||
};
|
||||
ctx.super.blob_id = blob_id;
|
||||
return dispatch_md_op(&ctx);
|
||||
}
|
||||
|
||||
struct zvfs_blob_handle *blob_get_super(void) {
|
||||
pthread_mutex_lock(&g_super_blob_mutex);
|
||||
|
||||
if (g_super_blob_id_cache != SPDK_BLOBID_INVALID) {
|
||||
struct zvfs_blob_handle *cached = blob_open(g_super_blob_id_cache);
|
||||
if (cached) {
|
||||
pthread_mutex_unlock(&g_super_blob_mutex);
|
||||
return cached;
|
||||
}
|
||||
g_super_blob_id_cache = SPDK_BLOBID_INVALID;
|
||||
}
|
||||
|
||||
spdk_blob_id super_id = SPDK_BLOBID_INVALID;
|
||||
int rc = bs_get_super_id(&super_id);
|
||||
if (rc == 0 && super_id != SPDK_BLOBID_INVALID) {
|
||||
g_super_blob_id_cache = super_id;
|
||||
struct zvfs_blob_handle *existing = blob_open(super_id);
|
||||
if (!existing) {
|
||||
g_super_blob_id_cache = SPDK_BLOBID_INVALID;
|
||||
}
|
||||
pthread_mutex_unlock(&g_super_blob_mutex);
|
||||
return existing;
|
||||
}
|
||||
if (rc == 0 && super_id == SPDK_BLOBID_INVALID) {
|
||||
rc = -ENOENT;
|
||||
}
|
||||
|
||||
if (rc != -ENOENT) {
|
||||
SPDK_ERRLOG("spdk_bs_get_super failed: %d\n", rc);
|
||||
pthread_mutex_unlock(&g_super_blob_mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct zvfs_blob_handle *created = blob_create(0);
|
||||
if (!created) {
|
||||
pthread_mutex_unlock(&g_super_blob_mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rc = bs_set_super_id(created->id);
|
||||
if (rc != 0) {
|
||||
spdk_blob_id created_id = created->id;
|
||||
SPDK_ERRLOG("spdk_bs_set_super failed: %d\n", rc);
|
||||
blob_close(created);
|
||||
blob_delete(created_id);
|
||||
pthread_mutex_unlock(&g_super_blob_mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
g_super_blob_id_cache = created->id;
|
||||
pthread_mutex_unlock(&g_super_blob_mutex);
|
||||
return created;
|
||||
}
|
||||
|
||||
// blob_create
|
||||
static void blob_create_cb(void *arg, spdk_blob_id blobid, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->create.blob_id = blobid;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_create_on_md(struct md_op_ctx *ctx) {
|
||||
struct spdk_blob_opts opts;
|
||||
spdk_blob_opts_init(&opts, sizeof(opts));
|
||||
// size_hint 如果需,但 create 不直接 set size,用 resize 后
|
||||
spdk_bs_create_blob_ext(g_engine.bs, &opts, blob_create_cb, ctx);
|
||||
}
|
||||
|
||||
struct zvfs_blob_handle *blob_create(uint64_t size_hint) {
|
||||
if(size_hint == 0) size_hint = g_engine.cluster_size;
|
||||
struct md_op_ctx ctx = {.fn = blob_create_on_md, .create.size_hint = size_hint, .op_name = "blob create"};
|
||||
int rc = dispatch_md_op(&ctx);
|
||||
if (rc) return NULL;
|
||||
|
||||
struct zvfs_blob_handle *handle = blob_open(ctx.create.blob_id);
|
||||
if (handle && size_hint > 0) {
|
||||
rc = blob_resize(handle, size_hint); // 初始 resize
|
||||
if (rc != 0) {
|
||||
SPDK_ERRLOG("blob_resize failed after create: %d\n", rc);
|
||||
blob_close(handle);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rc = blob_sync_md(handle);
|
||||
if (rc != 0) {
|
||||
SPDK_ERRLOG("blob_sync_md failed after resize: %d\n", rc);
|
||||
blob_close(handle);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
return handle;
|
||||
}
|
||||
|
||||
// blob_open
|
||||
static void blob_open_cb(void *arg, struct spdk_blob *blob, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->open.blob = blob;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_open_on_md(struct md_op_ctx *ctx) {
|
||||
struct spdk_blob_open_opts opts;
|
||||
spdk_blob_open_opts_init(&opts, sizeof(opts));
|
||||
spdk_bs_open_blob_ext(g_engine.bs, ctx->open.blob_id, &opts, blob_open_cb, ctx);
|
||||
}
|
||||
|
||||
struct zvfs_blob_handle *blob_open(uint64_t blob_id) {
|
||||
struct md_op_ctx ctx = {.fn = blob_open_on_md, .open.blob_id = blob_id, .op_name = "blob open"};
|
||||
int rc = dispatch_md_op(&ctx);
|
||||
if (rc) return NULL;
|
||||
|
||||
struct zvfs_blob_handle *handle = malloc(sizeof(*handle));
|
||||
if (!handle) return NULL;
|
||||
|
||||
handle->id = blob_id;
|
||||
handle->blob = ctx.open.blob;
|
||||
handle->size = spdk_blob_get_num_clusters(handle->blob) * g_engine.cluster_size;
|
||||
|
||||
// 预分配固定大小的 DMA buf,后续所有 IO 都经过这块缓存,避免每次 IO 动态申请
|
||||
// 必须用 spdk_dma_malloc 保证地址对齐到 io_unit_size
|
||||
handle->dma_buf_size = ZVFS_DMA_BUF_SIZE;
|
||||
handle->dma_buf = spdk_dma_malloc(ZVFS_DMA_BUF_SIZE, g_engine.io_unit_size, NULL);
|
||||
if (!handle->dma_buf) {
|
||||
SPDK_ERRLOG("spdk_dma_malloc failed for blob %lu\n", blob_id);
|
||||
free(handle);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
// blob_write
|
||||
static void io_completion_cb(void *arg, int rc) {
|
||||
struct io_completion_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
int blob_write(struct zvfs_blob_handle *handle, uint64_t offset, const void *buf, size_t len) {
|
||||
if (tls.thread) {
|
||||
spdk_thread_poll(tls.thread, 0, 0);
|
||||
}
|
||||
|
||||
struct spdk_io_channel *ch = get_current_channel();
|
||||
if (!ch) return -1;
|
||||
if (len == 0) return 0;
|
||||
|
||||
// 越界检查
|
||||
if (offset + len > handle->size) {
|
||||
SPDK_ERRLOG("blob_write out of range: offset=%lu len=%zu blob_size=%lu\n",
|
||||
offset, len, handle->size);
|
||||
return -ERANGE;
|
||||
}
|
||||
|
||||
// 计算对齐后的 IO 范围和 dma_buf 内偏移
|
||||
uint64_t lba_off = 0;
|
||||
uint64_t lba_len = 0;
|
||||
uint32_t buf_off = 0;
|
||||
int rc = zvfs_calc_io_units(offset, len, g_engine.io_unit_size, &lba_off, &lba_len, &buf_off);
|
||||
if (rc != 0) {
|
||||
SPDK_ERRLOG("blob_write calc_io_units failed: %d\n", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
size_t aligned_bytes = lba_len * g_engine.io_unit_size;
|
||||
if (aligned_bytes > ZVFS_DMA_BUF_SIZE) {
|
||||
SPDK_ERRLOG("blob_write aligned_bytes=%zu exceeds ZVFS_DMA_BUF_SIZE\n", aligned_bytes);
|
||||
return -ENOSPC;
|
||||
}
|
||||
|
||||
struct io_completion_ctx io_ctx = {.done = false, .rc = 0};
|
||||
|
||||
spdk_blob_io_read(handle->blob, ch, handle->dma_buf, lba_off, lba_len,
|
||||
io_completion_cb, &io_ctx);
|
||||
|
||||
|
||||
rc = wait_done(&io_ctx.done, &io_ctx.rc, "io_write(read phase)");
|
||||
if (rc != 0) return rc;
|
||||
|
||||
memcpy((uint8_t *)handle->dma_buf + buf_off, buf, len);
|
||||
io_ctx.done = false;
|
||||
io_ctx.rc = 0;
|
||||
|
||||
spdk_blob_io_write(handle->blob, ch, handle->dma_buf, lba_off, lba_len,
|
||||
io_completion_cb, &io_ctx);
|
||||
rc = wait_done(&io_ctx.done, &io_ctx.rc, "io_write(write phase)");
|
||||
if (rc != 0) return rc;
|
||||
|
||||
return io_ctx.rc;
|
||||
}
|
||||
|
||||
// blob_read 类似
|
||||
int blob_read(struct zvfs_blob_handle *handle, uint64_t offset, void *buf, size_t len) {
|
||||
if (tls.thread) {
|
||||
spdk_thread_poll(tls.thread, 0, 0);
|
||||
}
|
||||
|
||||
struct spdk_io_channel *ch = get_current_channel();
|
||||
if (!ch) return -1;
|
||||
if (len == 0) return 0;
|
||||
|
||||
// 越界检查
|
||||
if (offset + len > handle->size) {
|
||||
SPDK_ERRLOG("blob_read out of range: offset=%lu len=%zu blob_size=%lu\n",
|
||||
offset, len, handle->size);
|
||||
return -ERANGE;
|
||||
}
|
||||
|
||||
|
||||
// 计算对齐后的 IO 范围和 dma_buf 内偏移
|
||||
uint64_t lba_off = 0;
|
||||
uint64_t lba_len = 0;
|
||||
uint32_t buf_off = 0;
|
||||
int rc = zvfs_calc_io_units(offset, len, g_engine.io_unit_size, &lba_off, &lba_len, &buf_off);
|
||||
if (rc != 0) {
|
||||
SPDK_ERRLOG("io_read offset/len not aligned to io_unit_size=%lu\n", g_engine.io_unit_size);
|
||||
return rc;
|
||||
}
|
||||
|
||||
// 读入对齐范围到 dma_buf,再从正确偏移处截取到用户 buf
|
||||
size_t aligned_bytes = lba_len * g_engine.io_unit_size;
|
||||
if (aligned_bytes > ZVFS_DMA_BUF_SIZE) {
|
||||
SPDK_ERRLOG("blob_read aligned_bytes=%zu exceeds ZVFS_DMA_BUF_SIZE\n", aligned_bytes);
|
||||
return -ENOSPC;
|
||||
}
|
||||
|
||||
struct io_completion_ctx io_ctx = {.done = false, .rc = 0};
|
||||
|
||||
spdk_blob_io_read(handle->blob, ch, handle->dma_buf, lba_off, lba_len,
|
||||
io_completion_cb, &io_ctx);
|
||||
|
||||
rc = wait_done(&io_ctx.done, &io_ctx.rc, "io_read");
|
||||
if (rc != 0) return rc;
|
||||
|
||||
memcpy(buf, (uint8_t *)handle->dma_buf + buf_off, len);
|
||||
return io_ctx.rc;
|
||||
}
|
||||
|
||||
// blob_resize
|
||||
static void blob_resize_cb(void *arg, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_resize_on_md(struct md_op_ctx *ctx) {
|
||||
uint64_t new_clusters = 0;
|
||||
uint64_t cluster_size = g_engine.cluster_size;
|
||||
int rc = zvfs_calc_ceil_units(ctx->handle_op.new_size, cluster_size, &new_clusters);
|
||||
if (rc != 0) {
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
return;
|
||||
}
|
||||
spdk_blob_resize(ctx->handle_op.handle->blob, new_clusters, blob_resize_cb, ctx);
|
||||
}
|
||||
|
||||
int blob_resize(struct zvfs_blob_handle *handle, uint64_t new_size) {
|
||||
struct md_op_ctx ctx = {.fn = blob_resize_on_md, .op_name = "blob resize"};
|
||||
ctx.handle_op.handle = handle;
|
||||
ctx.handle_op.new_size = new_size;
|
||||
int rc = dispatch_md_op(&ctx);
|
||||
if (rc == 0) {
|
||||
uint64_t new_clusters = 0;
|
||||
zvfs_calc_ceil_units(new_size, g_engine.cluster_size, &new_clusters);
|
||||
handle->size = new_clusters * g_engine.cluster_size;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
// blob_sync_md
|
||||
static void blob_sync_md_cb(void *arg, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_sync_md_on_md(struct md_op_ctx *ctx) {
|
||||
spdk_blob_sync_md(ctx->handle_op.handle->blob, blob_sync_md_cb, ctx);
|
||||
}
|
||||
|
||||
int blob_sync_md(struct zvfs_blob_handle *handle) {
|
||||
struct md_op_ctx ctx = {.fn = blob_sync_md_on_md, .op_name = "blob sync"};
|
||||
ctx.handle_op.handle = handle;
|
||||
return dispatch_md_op(&ctx);
|
||||
}
|
||||
|
||||
// blob_close
|
||||
static void blob_close_cb(void *arg, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_close_on_md(struct md_op_ctx *ctx) {
|
||||
spdk_blob_close(ctx->handle_op.handle->blob, blob_close_cb, ctx);
|
||||
}
|
||||
|
||||
int blob_close(struct zvfs_blob_handle *handle) {
|
||||
struct md_op_ctx ctx = {.fn = blob_close_on_md, .op_name = "blob close"};
|
||||
ctx.handle_op.handle = handle;
|
||||
int rc = dispatch_md_op(&ctx);
|
||||
if (rc == 0) {
|
||||
spdk_dma_free(handle->dma_buf);
|
||||
free(handle);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
// blob_delete
|
||||
static void blob_delete_cb(void *arg, int rc) {
|
||||
struct md_op_ctx *ctx = arg;
|
||||
ctx->rc = rc;
|
||||
ctx->done = true;
|
||||
}
|
||||
|
||||
static void blob_delete_on_md(struct md_op_ctx *ctx) {
|
||||
spdk_bs_delete_blob(g_engine.bs, ctx->delete.blob_id, blob_delete_cb, ctx);
|
||||
}
|
||||
|
||||
int blob_delete(uint64_t blob_id) {
|
||||
struct md_op_ctx ctx = {.fn = blob_delete_on_md, .op_name = "blob delete"};
|
||||
ctx.delete.blob_id = blob_id;
|
||||
return dispatch_md_op(&ctx);
|
||||
}
|
||||
Reference in New Issue
Block a user