解决多线程dmabuffer竞态问题

This commit is contained in:
2026-03-20 20:50:24 +08:00
parent 1732163cbf
commit ac2150e0ed
18 changed files with 565 additions and 103 deletions

View File

@@ -123,6 +123,7 @@ make -j"$(nproc)"
cd ..
make -j"$(nproc)"
mkdir -p tests/bin
make test -j"$(nproc)"
```
@@ -173,8 +174,6 @@ LD_PRELOAD=./src/libzvfs.so ZVFS_TEST_ROOT=/zvfs ./tests/bin/hook_api_test
### 5.3 历史结果
> 以下是历史版本结论,用于说明设计方向。
- QD=1 下可达到 `spdk_nvme_perf` 的约 `90%~95%`
- 相对同机 `O_DIRECT`,顺序写吞吐可有约 `2.2x~2.3x` 提升。
- 非对齐写因 RMW 开销,吞吐明显下降。
@@ -216,53 +215,9 @@ Disk stats (read/write):
sda: ios=0/118, merge=0/104, ticks=0/66, in_queue=67, util=0.24%
```
#### psync
```shell
root@ubuntu20-129:/home/lian/share/zvfs# fio ./fio_script/psync.fio
test: (g=0): rw=randwrite, bs=(R) 16.0KiB-16.0KiB, (W) 16.0KiB-16.0KiB, (T) 16.0KiB-16.0KiB, ioengine=psync, iodepth=64
fio-3.16
Starting 1 thread
Jobs: 1 (f=1): [w(1)][100.0%][w=39.5MiB/s][w=2528 IOPS][eta 00m:00s]
test: (groupid=0, jobs=1): err= 0: pid=16831: Sat Mar 14 14:13:39 2026
Description : ["variable bs"]
write: IOPS=2468, BW=38.6MiB/s (40.4MB/s)(386MiB/10001msec); 0 zone resets
clat (usec): min=184, max=16636, avg=401.35, stdev=153.93
lat (usec): min=184, max=16636, avg=401.86, stdev=153.98
clat percentiles (usec):
| 1.00th=[ 247], 5.00th=[ 273], 10.00th=[ 289], 20.00th=[ 318],
| 30.00th=[ 338], 40.00th=[ 355], 50.00th=[ 375], 60.00th=[ 400],
| 70.00th=[ 429], 80.00th=[ 469], 90.00th=[ 545], 95.00th=[ 611],
| 99.00th=[ 783], 99.50th=[ 873], 99.90th=[ 1106], 99.95th=[ 1287],
| 99.99th=[ 1942]
bw ( KiB/s): min=31136, max=51008, per=100.00%, avg=39550.32, stdev=4617.82, samples=19
iops : min= 1946, max= 3188, avg=2471.79, stdev=288.55, samples=19
lat (usec) : 250=1.32%, 500=83.72%, 750=13.61%, 1000=1.11%
lat (msec) : 2=0.23%, 4=0.01%, 20=0.01%
cpu : usr=2.83%, sys=28.58%, ctx=24699, majf=0, minf=0
IO depths : 1=100.0%, 2=0.0%, 4=0.0%, 8=0.0%, 16=0.0%, 32=0.0%, >=64=0.0%
submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
issued rwts: total=0,24690,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=64
Run status group 0 (all jobs):
WRITE: bw=38.6MiB/s (40.4MB/s), 38.6MiB/s-38.6MiB/s (40.4MB/s-40.4MB/s), io=386MiB (405MB), run=10001-10001msec
Disk stats (read/write):
sda: ios=0/24384, merge=0/4, ticks=0/7764, in_queue=7764, util=99.10%
```
### 5.5 pgbench
```shell
root@ubuntu20:/home/lian/try/zvfs# ./scripts/run_pgbench_no_mmap.sh
当前配置:
host=127.0.0.1 port=5432 db=benchdb
scale=1 clients=1 threads=1 time=15s preload=1
init_jobs=1 init_steps=dtg skip_init=0
[1/2] pgbench 初始化(-i
some of the specified options cannot be used in initialization (-i) mode
root@ubuntu20:/home/lian/try/zvfs# ./scripts/run_pgbench_no_mmap.sh
root@ubuntu20-129:/home/lian/share/zvfs# ./scripts/run_pgbench_no_mmap.sh
当前配置:
host=127.0.0.1 port=5432 db=benchdb
scale=1 clients=1 threads=1 time=15s preload=1
@@ -276,7 +231,7 @@ NOTICE: table "pgbench_history" does not exist, skipping
NOTICE: table "pgbench_tellers" does not exist, skipping
creating tables...
generating data...
100000 of 100000 tuples (100%) done (elapsed 0.02 s, remaining 0.00 s)
100000 of 100000 tuples (100%) done (elapsed 15.06 s, remaining 0.00 s)
done.
[2/2] pgbench 压测(-T
starting vacuum...end.
@@ -286,10 +241,42 @@ query mode: simple
number of clients: 1
number of threads: 1
duration: 15 s
number of transactions actually processed: 1381
latency average = 10.869 ms
tps = 92.003503 (including connections establishing)
tps = 92.206743 (excluding connections establishing)
number of transactions actually processed: 564
latency average = 26.614 ms
tps = 37.573586 (including connections establishing)
tps = 38.176262 (excluding connections establishing)
```
```shell
root@ubuntu20-129:/home/lian/share/zvfs# ./scripts/run_pgbench.sh
当前配置:
host=127.0.0.1 port=5432 db=postgres
scale=1 clients=1 threads=1 time=15s preload=0
init_jobs=1 init_steps=dtg skip_init=0
[1/2] pgbench 初始化(-i
dropping old tables...
NOTICE: table "pgbench_accounts" does not exist, skipping
NOTICE: table "pgbench_branches" does not exist, skipping
NOTICE: table "pgbench_history" does not exist, skipping
NOTICE: table "pgbench_tellers" does not exist, skipping
creating tables...
generating data...
100000 of 100000 tuples (100%) done (elapsed 1.08 s, remaining 0.00 s)
done.
[2/2] pgbench 压测(-T
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
number of clients: 1
number of threads: 1
duration: 15 s
number of transactions actually processed: 586
latency average = 25.602 ms
tps = 39.059387 (including connections establishing)
tps = 39.102273 (excluding connections establishing)
```
---

View File

@@ -19,9 +19,9 @@ DB_PATH="/tmp/rocksdb_manual"
BENCHMARKS="fillrandom,readrandom"
# key数
NUM=1000000
NUM=50000
# 线程数
THREADS=1
THREADS=2
# 随机种子
SEED=1
@@ -52,7 +52,7 @@ DISABLE_WAL=1
SYNC=0
# direct I/O
USE_DIRECT_READS=0
USE_DIRECT_READS=1
USE_DIRECT_IO_FOR_FLUSH_AND_COMPACTION=0
# mmap I/O

View File

@@ -54,7 +54,7 @@ DISABLE_WAL=1
SYNC=0
# direct I/O
USE_DIRECT_READS=0
USE_DIRECT_READS=1
USE_DIRECT_IO_FOR_FLUSH_AND_COMPACTION=0
# mmap I/O

62
scripts/run_pgbench.sh Executable file
View File

@@ -0,0 +1,62 @@
#!/usr/bin/env bash
set -euo pipefail
# 简化版 pgbench 测试脚本:
# 1) 参数都在本文件顶部配置;
# 2) 直接连接 benchdb
# 3) 执行 pgbench 初始化和压测。
# ---------------------------
# 固定配置(按需改这里)
# ---------------------------
PG_HOST="127.0.0.1"
PG_PORT="5432"
PG_DB="postgres"
PG_SCALE="10"
PG_TIME="15"
PG_CLIENTS="32"
PG_THREADS="8"
PG_INIT_JOBS="1"
PG_INIT_STEPS="dtg"
PG_SKIP_INIT="0"
PG_SUPERUSER="postgres"
USE_LD_PRELOAD="0"
# 可选:优先使用这个目录;为空时自动从 PATH 里找
PG_BIN_DIR="/usr/lib/postgresql/12/bin"
if [[ ! -x "${PG_BIN_DIR}/pgbench" ]]; then
PG_BIN_DIR="$(dirname "$(command -v pgbench 2>/dev/null || true)")"
fi
if [[ -z "${PG_BIN_DIR}" || ! -x "${PG_BIN_DIR}/pgbench" ]]; then
echo "未找到 pgbench请先安装 PostgreSQL 客户端或修正 PG_BIN_DIR。" >&2
exit 1
fi
run_pg_cmd() {
if [[ "${USE_LD_PRELOAD}" == "1" ]]; then
sudo -u "${PG_SUPERUSER}" env LD_PRELOAD="${LD_PRELOAD_PATH}" "$@"
else
sudo -u "${PG_SUPERUSER}" "$@"
fi
}
echo "当前配置:"
echo " host=${PG_HOST} port=${PG_PORT} db=${PG_DB}"
echo " scale=${PG_SCALE} clients=${PG_CLIENTS} threads=${PG_THREADS} time=${PG_TIME}s preload=${USE_LD_PRELOAD}"
echo " init_jobs=${PG_INIT_JOBS} init_steps=${PG_INIT_STEPS} skip_init=${PG_SKIP_INIT}"
echo
if [[ "${PG_SKIP_INIT}" != "1" ]]; then
echo "[1/2] pgbench 初始化(-i"
run_pg_cmd "${PG_BIN_DIR}/pgbench" \
-h "${PG_HOST}" -p "${PG_PORT}" -i \
-s "${PG_SCALE}" -I "${PG_INIT_STEPS}" "${PG_DB}"
else
echo "[1/2] 跳过初始化PG_SKIP_INIT=1"
fi
echo "[2/2] pgbench 压测(-T"
run_pg_cmd "${PG_BIN_DIR}/pgbench" \
-h "${PG_HOST}" -p "${PG_PORT}" -c "${PG_CLIENTS}" -j "${PG_THREADS}" -T "${PG_TIME}" "${PG_DB}"

View File

@@ -12,10 +12,10 @@ set -euo pipefail
PG_HOST="127.0.0.1"
PG_PORT="5432"
PG_DB="benchdb"
PG_SCALE="1"
PG_SCALE="10"
PG_TIME="15"
PG_CLIENTS="1"
PG_THREADS="1"
PG_CLIENTS="32"
PG_THREADS="8"
PG_INIT_JOBS="1"
PG_INIT_STEPS="dtg"
PG_SKIP_INIT="0"

View File

@@ -13,7 +13,7 @@ APP = zvfs_daemon
CFLAGS += -I$(abspath $(CURDIR)/..)
C_SRCS := main.c ipc_cq.c ipc_reactor.c spdk_engine.c spdk_engine_wrapper.c $(PROTO_DIR)/ipc_proto.c $(COMMON_DIR)/utils.c
C_SRCS := main.c ipc_cq.c ipc_reactor.c spdk_engine.c spdk_engine_wrapper.c dma_buf_pool.c $(PROTO_DIR)/ipc_proto.c $(COMMON_DIR)/utils.c
SPDK_LIB_LIST = $(ALL_MODULES_LIST) event event_bdev

197
src/daemon/dma_buf_pool.c Normal file
View File

@@ -0,0 +1,197 @@
#include "dma_buf_pool.h"
#include <spdk/env.h>
#include <spdk/dma.h>
#include <spdk/log.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
/**
* 空闲链表节点,嵌入在每块 DMA buf 的头部。
*
* 布局(每块物理内存):
* [ struct dma_buf_node (16B) | ... payload (buf_size bytes) ... ]
*
* acquire 返回的指针指向 payload 起始处node + 1
* release 时通过 (node*)buf - 1 找回 node。
*/
struct dma_buf_node {
struct dma_buf_node *next;
struct dma_buf_pool *pool; /* 反向引用,用于 release 时校验 */
};
struct dma_buf_pool {
struct dma_buf_node *free_list; /* 空闲链表头 */
pthread_mutex_t mu;
size_t buf_size; /* payload 字节数(不含 node header */
size_t align; /* spdk_dma_malloc 对齐 */
int free_count; /* 当前空闲块数 */
int total; /* 已分配总块数(含借出中的) */
};
/* ------------------------------------------------------------------ */
/* 内部:分配一块新的物理 DMA 内存并初始化 node */
/* ------------------------------------------------------------------ */
static struct dma_buf_node *alloc_one_block(struct dma_buf_pool *pool) {
/*
* 总分配大小 = node header + payload
* node header 本身很小16B但我们需要 payload 部分对齐到 align
* 所以整块向上取整到 align 的倍数后多留出 sizeof(node) 的空间。
*
* 简单起见:整块大小 = sizeof(node) + buf_size
* 对齐要求传给 spdk_dma_mallocSPDK 会保证返回地址本身对齐,
* payload 偏移 sizeof(node) 后对于 io_unit_size通常 512/4096
* 仍然对齐(因为 sizeof(node) == 16远小于 io_unit_size
*
* 如果 align < sizeof(node)payload 的对齐依然由 SPDK 保证基地址,
* 实际使用中 io_unit_size >= 512 >> 16完全没问题。
*/
size_t alloc_size = sizeof(struct dma_buf_node) + pool->buf_size;
struct dma_buf_node *node = spdk_dma_malloc(alloc_size, pool->align, NULL);
if (!node) {
SPDK_ERRLOG("dma_buf_pool: spdk_dma_malloc(%zu, align=%zu) failed\n",
alloc_size, pool->align);
return NULL;
}
node->next = NULL;
node->pool = pool;
return node;
}
/* ------------------------------------------------------------------ */
/* 公开接口 */
/* ------------------------------------------------------------------ */
struct dma_buf_pool *dma_buf_pool_create(int initial_count,
size_t buf_size,
size_t align) {
if (initial_count < 1 || buf_size == 0) {
SPDK_ERRLOG("dma_buf_pool_create: invalid args count=%d size=%zu\n",
initial_count, buf_size);
return NULL;
}
struct dma_buf_pool *pool = calloc(1, sizeof(*pool));
if (!pool) return NULL;
pool->buf_size = buf_size;
pool->align = (align > 0) ? align : 512;
pool->free_list = NULL;
pool->free_count = 0;
pool->total = 0;
pthread_mutex_init(&pool->mu, NULL);
/* 预分配 initial_count 块 */
for (int i = 0; i < initial_count; i++) {
struct dma_buf_node *node = alloc_one_block(pool);
if (!node) {
/* 部分成功也可用,记录警告后继续 */
SPDK_WARNLOG("dma_buf_pool_create: only allocated %d / %d blocks\n",
i, initial_count);
break;
}
node->next = pool->free_list;
pool->free_list = node;
pool->free_count++;
pool->total++;
}
SPDK_NOTICELOG("dma_buf_pool created: %d blocks, buf_size=%zu, align=%zu\n",
pool->total, buf_size, pool->align);
return pool;
}
void dma_buf_pool_destroy(struct dma_buf_pool *pool) {
if (!pool) return;
pthread_mutex_lock(&pool->mu);
struct dma_buf_node *node = pool->free_list;
int freed = 0;
while (node) {
struct dma_buf_node *next = node->next;
spdk_dma_free(node);
freed++;
node = next;
}
pool->free_list = NULL;
pool->free_count = 0;
pthread_mutex_unlock(&pool->mu);
if (freed != pool->total) {
SPDK_WARNLOG("dma_buf_pool_destroy: %d blocks still in use (total=%d freed=%d)\n",
pool->total - freed, pool->total, freed);
}
pthread_mutex_destroy(&pool->mu);
free(pool);
}
void *dma_buf_acquire(struct dma_buf_pool *pool) {
if (!pool) return NULL;
pthread_mutex_lock(&pool->mu);
struct dma_buf_node *node = pool->free_list;
if (node) {
/* 快速路径:从空闲链表头弹出 */
pool->free_list = node->next;
pool->free_count--;
pthread_mutex_unlock(&pool->mu);
} else {
/* 慢速路径扩容在锁外分配spdk_dma_malloc 可能较慢) */
pthread_mutex_unlock(&pool->mu);
node = alloc_one_block(pool);
if (!node) {
SPDK_ERRLOG("dma_buf_acquire: OOM, cannot grow pool\n");
return NULL;
}
pthread_mutex_lock(&pool->mu);
pool->total++; /* total 在锁内更新,保证一致性 */
pthread_mutex_unlock(&pool->mu);
SPDK_NOTICELOG("dma_buf_pool grew to %d total blocks\n", pool->total);
}
node->next = NULL;
/* 返回 payload 起始地址node header 之后) */
return (void *)(node + 1);
}
void dma_buf_release(struct dma_buf_pool *pool, void *buf) {
if (!pool || !buf) return;
/* 从 payload 地址反推 node 地址 */
struct dma_buf_node *node = (struct dma_buf_node *)buf - 1;
/* 简单校验:防止 buf 归还到错误的 pool */
if (node->pool != pool) {
SPDK_ERRLOG("dma_buf_release: buf %p does not belong to pool %p\n",
buf, pool);
return;
}
pthread_mutex_lock(&pool->mu);
node->next = pool->free_list;
pool->free_list = node;
pool->free_count++;
pthread_mutex_unlock(&pool->mu);
}
int dma_buf_pool_free_count(struct dma_buf_pool *pool) {
if (!pool) return 0;
pthread_mutex_lock(&pool->mu);
int c = pool->free_count;
pthread_mutex_unlock(&pool->mu);
return c;
}
int dma_buf_pool_total_count(struct dma_buf_pool *pool) {
if (!pool) return 0;
pthread_mutex_lock(&pool->mu);
int c = pool->total;
pthread_mutex_unlock(&pool->mu);
return c;
}

69
src/daemon/dma_buf_pool.h Normal file
View File

@@ -0,0 +1,69 @@
#ifndef __ZVFS_DMA_BUF_POOL_H__
#define __ZVFS_DMA_BUF_POOL_H__
#include <stddef.h>
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* DMA buf pool — mutex + free-list + dynamic grow
*
* 每个 buf 固定 buf_size 字节(对齐到 align
* 池耗尽时自动 spdk_dma_malloc 新块(只增不减)。
* acquire 永不返回 NULLOOM 时返回 NULL 并记录错误)。
* release 将 buf 归还到空闲链表头部O(1))。
*
* 线程安全:所有操作均持 mutex。
*/
struct dma_buf_pool;
/**
* 创建一个 DMA buf pool。
*
* @param initial_count 预分配块数(>= 1
* @param buf_size 每块字节数,建议 1MB = 1048576
* @param align DMA 对齐,通常传 io_unit_size
* @return pool 指针,失败返回 NULL
*/
struct dma_buf_pool *dma_buf_pool_create(int initial_count,
size_t buf_size,
size_t align);
/**
* 销毁 pool释放所有空闲块。
* 调用前必须确保所有借出的 buf 已归还,否则会内存泄漏。
*/
void dma_buf_pool_destroy(struct dma_buf_pool *pool);
/**
* 从 pool 借出一块 DMA buf。
* 若空闲链表为空,自动 spdk_dma_malloc 新块(动态扩容)。
* 返回 NULL 仅在 OOM 时发生。
*/
void *dma_buf_acquire(struct dma_buf_pool *pool);
/**
* 归还一块 DMA buf 到 pool。
* buf 必须是从同一个 pool acquire 出来的。
*/
void dma_buf_release(struct dma_buf_pool *pool, void *buf);
/**
* 当前 pool 中空闲块数量(仅供监控/调试使用)。
*/
int dma_buf_pool_free_count(struct dma_buf_pool *pool);
/**
* 当前 pool 总块数(已分配,含借出中的)。
*/
int dma_buf_pool_total_count(struct dma_buf_pool *pool);
#ifdef __cplusplus
}
#endif
#endif /* __ZVFS_DMA_BUF_POOL_H__ */

View File

@@ -92,7 +92,7 @@ int main()
struct zvfs_reactor *r = zvfs_reactor_create(&opts);
printf("echo server started: %s\n", opts.socket_path);
// printf("echo server started: %s\n", opts.socket_path);
zvfs_reactor_run(r);
@@ -185,7 +185,7 @@ static void on_read(struct zvfs_conn *c, void *ctx)
break; /* 等待更多数据 */
}
printf("[req][%s]\n", cast_opcode2string(req->opcode));
// printf("[req][%s]\n", cast_opcode2string(req->opcode));
req->conn = c;
offset += consumed;

View File

@@ -2,6 +2,7 @@
#include "common/config.h"
#include "spdk_engine.h"
#include "ipc_cq.h"
#include "dma_buf_pool.h"
#include <spdk/event.h>
#include <spdk/log.h>
@@ -377,6 +378,17 @@ int io_engine_init(const char *bdev_name, const char *json_file, int thread_num)
g_engine.handle_cache = NULL;
pthread_mutex_init(&g_engine.cache_mu, NULL);
/* 7. 创建全局 DMA buf pool */
int initial_bufs = g_engine.io_thread_count * 4;
if (initial_bufs < 8) initial_bufs = 8;
g_engine.dma_pool = dma_buf_pool_create(initial_bufs,
ZVFS_DMA_BUF_SIZE,
g_engine.io_unit_size);
if (!g_engine.dma_pool) {
SPDK_ERRLOG("dma_buf_pool_create failed\n");
return -ENOMEM;
}
SPDK_NOTICELOG("io_engine_init done: %d threads (%d io)\n",
thread_num, g_engine.io_thread_count);
return 0;
@@ -458,6 +470,7 @@ struct io_ctx {
uint64_t lba_off;
uint64_t lba_len;
uint32_t buf_off;
void *dma_buf;
};
struct write_autogrow_ctx {
@@ -548,21 +561,11 @@ static void create_sync_cb(void *arg, int bserrno) {
}
handle->blob_id = cctx->blob_id;
handle->blob = cctx->blob;
handle->dma_buf_size = ZVFS_DMA_BUF_SIZE;
atomic_init(&handle->ref_count, 1);
handle->dma_buf = spdk_dma_malloc(ZVFS_DMA_BUF_SIZE, g_engine.io_unit_size, NULL);
if (!handle->dma_buf) {
spdk_blob_close(cctx->blob, blob_close_noop_cb, NULL);
free(handle);
push_err_resp(cctx->req, -ENOMEM);
free(cctx);
return;
}
/* 构造响应 */
struct zvfs_resp *resp = calloc(1, sizeof(*resp));
if (!resp) {
spdk_dma_free(handle->dma_buf);
spdk_blob_close(cctx->blob, blob_close_noop_cb, NULL);
free(handle);
push_err_resp(cctx->req, -ENOMEM);
@@ -576,7 +579,6 @@ static void create_sync_cb(void *arg, int bserrno) {
zvfs_handle_id_t handle_id;
if (engine_cache_insert(handle, &handle_id) != 0) {
spdk_dma_free(handle->dma_buf);
spdk_blob_close(cctx->blob, blob_close_noop_cb, NULL);
free(handle);
push_err_resp(cctx->req, -ENOMEM);
@@ -616,20 +618,10 @@ static void blob_open_done_cb(void *arg, struct spdk_blob *blob, int bserrno) {
if (!handle) { push_err_resp(octx->req, -ENOMEM); free(octx); return; }
handle->blob_id = octx->req->blob_id;
handle->blob = blob;
handle->dma_buf_size = ZVFS_DMA_BUF_SIZE;
atomic_init(&handle->ref_count, 1);
handle->dma_buf = spdk_dma_malloc(ZVFS_DMA_BUF_SIZE, g_engine.io_unit_size, NULL);
if (!handle->dma_buf) {
spdk_blob_close(blob, blob_close_noop_cb, NULL);
free(handle);
push_err_resp(octx->req, -ENOMEM);
free(octx);
return;
}
struct zvfs_resp *resp = calloc(1, sizeof(*resp));
if (!resp) {
spdk_dma_free(handle->dma_buf);
spdk_blob_close(blob, blob_close_noop_cb, NULL);
free(handle);
push_err_resp(octx->req, -ENOMEM);
@@ -642,7 +634,6 @@ static void blob_open_done_cb(void *arg, struct spdk_blob *blob, int bserrno) {
resp->conn = octx->req->conn;
zvfs_handle_id_t handle_id;
if (engine_cache_insert(handle, &handle_id) != 0) {
spdk_dma_free(handle->dma_buf);
spdk_blob_close(blob, blob_close_noop_cb, NULL);
free(handle);
push_err_resp(octx->req, -ENOMEM);
@@ -751,7 +742,6 @@ static void blob_close_done_cb(void *arg, int bserrno) {
struct close_ctx *cctx = arg;
if (bserrno == 0) {
engine_cache_remove((zvfs_handle_id_t)(uintptr_t)cctx->handle);
spdk_dma_free(cctx->handle->dma_buf);
free(cctx->handle);
}
@@ -936,6 +926,8 @@ int blobstore_reset(struct zvfs_req *req) {
static void blob_read_done_cb(void *arg, int bserrno) {
struct io_ctx *ioctx = arg;
if (bserrno != 0) {
dma_buf_release(g_engine.dma_pool, ioctx->dma_buf);
ioctx->dma_buf = NULL;
push_err_resp(ioctx->req, bserrno);
free(ioctx);
return;
@@ -943,9 +935,12 @@ static void blob_read_done_cb(void *arg, int bserrno) {
/* 从 dma_buf 拷贝到用户 buf */
memcpy(ioctx->req->data,
(uint8_t *)ioctx->handle->dma_buf + ioctx->buf_off,
(uint8_t *)ioctx->dma_buf + ioctx->buf_off,
ioctx->req->length);
dma_buf_release(g_engine.dma_pool, ioctx->dma_buf);
ioctx->dma_buf = NULL;
struct zvfs_resp *resp = calloc(1, sizeof(*resp));
if (!resp) { push_err_resp(ioctx->req, -ENOMEM); free(ioctx); return; }
resp->opcode = ioctx->req->opcode;
@@ -985,11 +980,19 @@ static void do_blob_read(void *arg) {
return;
}
void *dma_buf = dma_buf_acquire(g_engine.dma_pool);
if (!dma_buf) {
push_err_resp(ioctx->req, -ENOMEM);
free(ioctx);
return;
}
ioctx->lba_off = lba_off;
ioctx->lba_len = lba_len;
ioctx->buf_off = buf_off;
ioctx->dma_buf = dma_buf;
spdk_blob_io_read(handle->blob, ioctx->channel, handle->dma_buf,
spdk_blob_io_read(handle->blob, ioctx->channel, ioctx->dma_buf,
lba_off, lba_len, blob_read_done_cb, ioctx);
}
@@ -1017,6 +1020,10 @@ int blob_read(struct zvfs_req *req) {
* ============================================================ */
static void blob_write_writephase_cb(void *arg, int bserrno) {
struct io_ctx *ioctx = arg;
dma_buf_release(g_engine.dma_pool, ioctx->dma_buf);
ioctx->dma_buf = NULL;
if (bserrno != 0) {
push_err_resp(ioctx->req, bserrno);
free(ioctx);
@@ -1038,16 +1045,18 @@ static void blob_write_writephase_cb(void *arg, int bserrno) {
static void blob_write_readphase_cb(void *arg, int bserrno) {
struct io_ctx *ioctx = arg;
if (bserrno != 0) {
dma_buf_release(g_engine.dma_pool, ioctx->dma_buf);
ioctx->dma_buf = NULL;
push_err_resp(ioctx->req, bserrno);
free(ioctx);
return;
}
/* read-modify: 将用户数据覆盖到 dma_buf 的对应区域 */
memcpy((uint8_t *)ioctx->handle->dma_buf + ioctx->buf_off,
memcpy((uint8_t *)ioctx->dma_buf + ioctx->buf_off,
ioctx->req->data, ioctx->req->length);
spdk_blob_io_write(ioctx->handle->blob, ioctx->channel, ioctx->handle->dma_buf,
spdk_blob_io_write(ioctx->handle->blob, ioctx->channel, ioctx->dma_buf,
ioctx->lba_off, ioctx->lba_len, blob_write_writephase_cb, ioctx);
}
@@ -1084,12 +1093,19 @@ static void do_blob_write(void *arg) {
return;
}
void *dma_buf = dma_buf_acquire(g_engine.dma_pool);
if (!dma_buf) {
push_err_resp(ioctx->req, -ENOMEM);
free(ioctx);
return;
}
ioctx->lba_off = lba_off;
ioctx->lba_len = lba_len;
ioctx->buf_off = buf_off;
ioctx->dma_buf = dma_buf;
/* 先读出完整的对齐块,再 modify再写回 */
spdk_blob_io_read(handle->blob, ioctx->channel, handle->dma_buf,
spdk_blob_io_read(handle->blob, ioctx->channel, ioctx->dma_buf,
lba_off, lba_len, blob_write_readphase_cb, ioctx);
}

View File

@@ -13,8 +13,6 @@
typedef struct zvfs_blob_handle {
spdk_blob_id blob_id;
struct spdk_blob *blob;
void *dma_buf;
uint64_t dma_buf_size;
atomic_uint ref_count;
} zvfs_blob_handle_t;
@@ -48,13 +46,27 @@ typedef struct zvfs_spdk_io_engine {
uint64_t io_unit_size;
uint64_t cluster_size;
/**
* 全局 DMA buf pool。
* 所有 IO 请求read / write从这里借用 buf完成后归还。
* buf 大小固定为 ZVFS_DMA_BUF_SIZE1MB对齐到 io_unit_size。
* 在 io_engine_init 完成、io_unit_size 确定后创建。
*/
struct dma_buf_pool *dma_pool;
} zvfs_spdk_io_engine_t;
/* ------------------------------------------------------------------ */
/* handle cache 操作(实现在 spdk_engine.c */
/* ------------------------------------------------------------------ */
int engine_cache_insert(struct zvfs_blob_handle *handle, zvfs_handle_id_t *out_id);
struct zvfs_blob_handle *engine_cache_lookup(zvfs_handle_id_t handle_id);
void engine_cache_remove(zvfs_handle_id_t handle_id);
/* ------------------------------------------------------------------ */
/* 引擎公开接口 */
/* ------------------------------------------------------------------ */
int io_engine_init(const char *bdev_name, const char *json_file, int thread_num);
int blob_create(struct zvfs_req *req);
int blob_open(struct zvfs_req *req);

Binary file not shown.

View File

@@ -159,6 +159,44 @@ unlinkat(int dirfd, const char *path, int flags)
return ret;
}
/* ------------------------------------------------------------------ */
/* remove */
/* ------------------------------------------------------------------ */
int
remove(const char *path)
{
ZVFS_HOOK_ENTER();
int ret;
char abspath[PATH_MAX];
int is_zvfs_path = 0;
if (!ZVFS_IN_HOOK() &&
zvfs_resolve_atpath(AT_FDCWD, path, abspath, sizeof(abspath)) == 0) {
is_zvfs_path = zvfs_is_zvfs_path(abspath);
}
if (ZVFS_IN_HOOK() || !is_zvfs_path) {
ret = real_remove(path);
ZVFS_HOOK_LEAVE();
return ret;
}
zvfs_ensure_init();
ret = real_remove(path);
if (ret == 0) {
/*
* remove 既可删文件也可删空目录。
* 若是目录path_cache 不会命中zvfs_unlink_path 会安全 no-op。
*/
zvfs_unlink_path(abspath);
}
ZVFS_HOOK_LEAVE();
return ret;
}
/* ------------------------------------------------------------------ */
/* 内部:执行 rename 的 zvfs 侧缓存更新 */
/* ------------------------------------------------------------------ */

View File

@@ -24,6 +24,7 @@
int unlink(const char *path);
int unlinkat(int dirfd, const char *path, int flags);
int remove(const char *path);
int rename(const char *oldpath, const char *newpath);
int renameat(int olddirfd, const char *oldpath,

View File

@@ -110,6 +110,9 @@ zvfs_debug_has_fd_mapping(int fd)
return found;
}
/* close 路径辅助:在文件后半段实现。 */
static int zvfs_detach_fd_mapping(int fd, int do_sync_md);
/* ------------------------------------------------------------------ */
/* 内部:路径判定辅助 */
/* ------------------------------------------------------------------ */
@@ -779,6 +782,74 @@ fopen64(const char *path, const char *mode)
return fp;
}
int
fclose(FILE *stream)
{
ZVFS_HOOK_ENTER();
int ret;
int ret_errno = 0;
int bk_rc = 0;
int bk_errno = 0;
int fd = -1;
int need_bookkeeping = 0;
if (!stream) {
errno = EINVAL;
ZVFS_HOOK_LEAVE();
return -1;
}
if (!ZVFS_IN_HOOK()) {
fd = fileno(stream);
if (fd >= 0 && zvfs_is_zvfs_fd(fd)) {
need_bookkeeping = 1;
}
}
if (!real_fclose) {
errno = ENOSYS;
ZVFS_HOOK_LEAVE();
return -1;
}
if (ZVFS_IN_HOOK() || !need_bookkeeping) {
ret = real_fclose(stream);
ZVFS_HOOK_LEAVE();
return ret;
}
zvfs_ensure_init();
ret = real_fclose(stream);
if (ret < 0) {
ret_errno = errno;
}
/*
* 无论 real_fclose 是否报错,都尝试回收 zvfs bookkeeping。
* 某些 libc 实现即使返回 EOF也可能已经关闭了底层 fd。
*/
if (zvfs_detach_fd_mapping(fd, 1) < 0) {
bk_rc = -1;
bk_errno = errno;
}
if (ret < 0) {
errno = ret_errno;
ZVFS_HOOK_LEAVE();
return -1;
}
if (bk_rc < 0) {
errno = bk_errno;
ZVFS_HOOK_LEAVE();
return -1;
}
ZVFS_HOOK_LEAVE();
return 0;
}
/* ------------------------------------------------------------------ */
/* creat */
/* ------------------------------------------------------------------ */
@@ -1109,6 +1180,7 @@ close(int fd)
int __close(int fd) { return close(fd); }
int __libc_close(int fd) { return close(fd); }
int __close_nocancel(int fd) { return close(fd); }
/* ------------------------------------------------------------------ */
/* dup helper */

View File

@@ -35,6 +35,7 @@ int creat(const char *path, mode_t mode);
int creat64(const char *path, mode_t mode);
FILE *fopen(const char *path, const char *mode);
FILE *fopen64(const char *path, const char *mode);
int fclose(FILE *stream);
/* close 族 */
int close(int fd);
@@ -68,5 +69,6 @@ int __openat_nocancel(int dirfd, const char *path, int flags, ...);
int __openat64_nocancel(int dirfd, const char *path, int flags, ...);
int __close(int fd);
int __libc_close(int fd);
int __close_nocancel(int fd);
#endif // __ZVFS_HOOK_FD_H__

View File

@@ -104,6 +104,7 @@ int (*real_ioctl)(int, unsigned long, ...) = NULL;
/* 目录 */
int (*real_unlink)(const char *) = NULL;
int (*real_unlinkat)(int, const char *, int) = NULL;
int (*real_remove)(const char *) = NULL;
int (*real_rename)(const char *, const char *) = NULL;
int (*real_renameat)(int, const char *, int, const char *) = NULL;
int (*real_renameat2)(int, const char *, int, const char *,
@@ -143,6 +144,7 @@ int (*real___isoc99_vfscanf)(FILE *, const char *, va_list) = NULL;
FILE *(*real_fopen)(const char *, const char *) = NULL;
FILE *(*real_fopen64)(const char *, const char *) = NULL;
FILE *(*real_fdopen)(int, const char *) = NULL;
int (*real_fclose)(FILE *) = NULL;
/* ------------------------------------------------------------------ */
/* dlsym 辅助宏 */
@@ -219,6 +221,7 @@ void zvfs_hook_init(void)
LOAD_SYM(real_unlink, "unlink");
LOAD_SYM(real_unlinkat, "unlinkat");
LOAD_SYM(real_remove, "remove");
LOAD_SYM(real_rename, "rename");
LOAD_SYM(real_renameat, "renameat");
LOAD_SYM(real_mmap, "mmap");
@@ -266,6 +269,7 @@ void zvfs_hook_init(void)
LOAD_SYM_OPTIONAL(real_fopen, "fopen");
LOAD_SYM_OPTIONAL(real_fopen64, "fopen64");
LOAD_SYM_OPTIONAL(real_fdopen, "fdopen");
LOAD_SYM(real_fclose, "fclose");
/* 初始化全局 fs 结构 */
zvfs_fs_init();

View File

@@ -100,6 +100,7 @@ extern int (*real_ioctl)(int fd, unsigned long request, ...);
/* 目录感知 */
extern int (*real_unlink)(const char *path);
extern int (*real_unlinkat)(int dirfd, const char *path, int flags);
extern int (*real_remove)(const char *path);
extern int (*real_rename)(const char *oldpath, const char *newpath);
extern int (*real_renameat)(int olddirfd, const char *oldpath,
int newdirfd, const char *newpath);
@@ -144,6 +145,7 @@ extern int (*real___isoc99_vfscanf)(FILE *stream, const char *format, va_lis
extern FILE *(*real_fopen)(const char *path, const char *mode);
extern FILE *(*real_fopen64)(const char *path, const char *mode);
extern FILE *(*real_fdopen)(int fd, const char *mode);
extern int (*real_fclose)(FILE *stream);
/* 初始化所有 real_* 指针,在 constructor 中调用 */
void zvfs_hook_init(void);