latency优化 + readme修改

This commit is contained in:
1iaan
2026-03-20 21:10:22 +08:00
parent ac2150e0ed
commit 224d813499
20 changed files with 1536 additions and 785 deletions

540
README.md
View File

@@ -1,381 +1,245 @@
# ZVFS
ZVFS 是一个基于 SPDK Blobstore 的用户态文件系统原型,目标是在不改业务代码的前提下,将常见 POSIX 文件 I/O 重定向到用户态高性能存储路径
核心思想是复用 Linux 文件管理机制(命名空间/目录/元数据),把文件数据平面放到 ZVFS。
> 透明用户态 POSIX 文件系统,基于 SPDK Blobstore
- Hook 方式:`LD_PRELOAD`
- 挂载前缀:`/zvfs`
- 架构:多进程 Client + 独立 Daemon + SPDK
- 语义:同步阻塞(请求-响应)
ZVFS 是一个 **透明用户态文件系统原型**,通过 `LD_PRELOAD` 劫持 POSIX I/O
将应用程序的文件数据路径从 Linux 内核 I/O 栈重定向到 **SPDK 用户态 NVMe 存储路径**
目标是在 **零业务代码修改** 的情况下,为数据库与向量检索系统提供更低延迟的存储访问。
目前已在 **PostgreSQL + pgvector** 场景完成功能验证。
---
## 1. 项目定位
# 设计思路
这个项目重点不只是“把 I/O 跑起来”,而是把以下工程问题串起来:
大多数用户态文件系统(如 FUSE需要修改应用或挂载文件系统。用户态文件系统如果要通过VFS需要多一到两次额外的用户态/内核态切换。ZVFS 的目标是对应用完全透明:应用按正常方式调用 POSIX API底层存储路径被悄悄替换掉。
核心决策是控制面与数据面分离:
1. 在多线程/多进程应用RocksDB / PostgreSQL里做透明接管
2. 保留 POSIX 语义open/close/dup/fork/append/sync 等)
3. 把 SPDK 资源集中在 daemon 管理,避免每进程重复初始化。
4. 在同步阻塞语义下,把协议、并发、错误处理做完整。
控制面复用 Linux VFS目录树、权限、inode 生命周期全部由 Linux 管理,文件到 blob 的映射通过 xattruser.zvfs.blob_id持久化无需额外的映射数据库
数据面走 SPDKread/write 等数据路径绕过内核,经 IPC 送到 ZVFS daemon再通过 SPDK Blobstore 直接访问 NVMe
```
Application (PostgreSQL / RocksDB)
│ POSIX API
LD_PRELOAD Hook Layer
│ Unix Domain Socket
ZVFS Daemon
┌────┴────┐
│ │
Metadata IO Workers
Thread (SPDK pollers)
│ │
└────┬────┘
SPDK Blobstore
NVMe SSD
```
SPDK 需要使用轮询模式最好能独占CPU core且metadata最好由同一个 spdk thread 管理,不适合嵌入任意应用进程。因此 daemon 统一持有所有 SPDK 资源,多个客户端进程共享同一个 daemon通过 Unix Domain Socket 通信。
---
## 2. 架构设计
# 🧠 系统架构
![](zvfs架构图.excalidraw.svg)
```text
App (PostgreSQL / RocksDB / db_bench / pgbench)
-> LD_PRELOAD libzvfs.so
-> Hook Client (POSIX 拦截 + 本地状态)
-> Unix Domain Socket IPC (sync/blocking)
-> zvfs_daemon
-> 协议反序列化 + 分发
-> metadata thread + io threads
-> SPDK Blobstore / bdev
```
架构设计关键点:
### 2.1 透传策略
- **同步阻塞语义**
- **零侵入接管应用 I/O**
- 使用 `LD_PRELOAD` 拦截 POSIX API
- 不需要修改应用代码
- **控制面复用 Linux**
- ZVFS 不重新实现目录树,而是复用 Linux VFS。目录 / 权限 / inode 生命周期由 Linux VFS 管理。
- 文件与 blob 的映射通过:`xattr: user.zvfs.blob_id`
- **SPDK 资源集中管理**
- 文件内容存储在 SPDK Blobstore。直接访问 NVMe。
- SPDK 对 metadata 操作有 **单线程要求**,因此 daemon 设计为:
- metadata 操作create / resize / delete
- data IOread / write
**控制面复用 Linux数据面走 ZVFS**
- 控制面Linux 负责)
- 目录/命名空间管理。
- 文件节点生命周期与权限语义create/open/close/stat/rename/unlink 等)。
- 这些操作在 `/zvfs` 下也会真实执行系统调用ZVFS 不重复实现目录树管理。
- 数据面ZVFS 负责)
- 文件内容读写由 blob 承载。
- `read/write` 的真实数据路径不走 Linux 文件数据面,而走 ZVFS IPC + SPDK。
- 关键绑定方式
- `create`:真实创建 Linux 文件 + 在 ZVFS 创建 blob + 把 `blob_id` 写入文件 xattr。
- `open`:真实 `open` Linux 文件 + 读取 xattr 获取 `blob_id` + 在 ZVFS 打开 blob。
- `write`:写入 blob 成功后,使用 `ftruncate` 同步 Linux 视角 `st_size`
- 工程收益
- 直接减少约 50% 的实现工作量。
- 兼容性更好,数据库可直接复用现有文件组织方式。
### 2.2 分层职责
- Client`src/hook` + `src/spdk_engine/io_engine.c`
- 判断是否 `/zvfs` 路径。
- 拦截 POSIX API 并发起同步 IPC。
- 维护最小本地状态(`fd_table/path_cache/inode_table`)。
- Daemon`src/daemon`
- 独占 SPDK 环境与线程。
- 统一执行 blob create/open/read/write/resize/sync/delete。
- 统一管理 handle/ref_count。
- 协议层(`src/proto/ipc_proto.*`
- 统一头 + per-op body。
- Request Header`opcode + payload_len`
- Response Header`opcode + status + payload_len`
### 2.3 为什么是同步阻塞 IPC
- 业务侧兼容成本低,最容易对齐 POSIX 语义。
- 调试路径更直接(一个请求对应一个响应)。
- 先解决正确性和语义完整,再考虑异步化。
- **POSIX 语义兼容**
- 用户态文件系统需要正确模拟 Linux FD 语义:`dup dup2 dup3 fork close_range`
- 保证多个 fd 指向同一文件句柄时语义一致。
---
## 3. 功能覆盖(当前)
### 3.1 已接管的核心接口
- 控制面协同:`open/openat/creat/rename/unlink/...`(真实 syscall + ZVFS 元数据协同)
- 数据面接管:`read/write/pread/pwrite/readv/writev/pwritev`
- 元数据:`fstat/lseek/ftruncate/fallocate`
- 同步:`fsync/fdatasync/sync_file_range`
- FD 语义:`dup/dup2/dup3/fork/close_range`
### 3.2 语义要点
- `write` 默认使用 `AUTO_GROW`
-`AUTO_GROW` 写越界返回 `ENOSPC`
- `O_APPEND` 语义由 inode `logical_size` 保证。
- `write` 成功后会同步更新 Linux 文件大小(`ftruncate`),保持 `stat` 视角一致。
- `mmap` 对 zvfs fd 当前返回 `ENOTSUP`(非 zvfs fd 透传)。
### 3.3 映射关系
- 文件数据在 SPDK blob 中。
- 文件到 blob 的映射通过 xattr`user.zvfs.blob_id`
---
## 4. 构建与运行
### 4.1 构建
# 📦 构建
```bash
cd zvfs
git submodule update --init --recursive --progress
git clone ...
git submodule update --init --recursive
cd spdk
./scripts/pkgdep.sh
./configure --with-shared
make -j"$(nproc)"
make -j
cd ..
make -j"$(nproc)"
mkdir -p tests/bin
make test -j"$(nproc)"
make -j
```
---
产物:
# ▶️ 运行
- `src/libzvfs.so`
- `src/daemon/zvfs_daemon`
- `tests/bin/*`
### 4.2 启动 daemon
```bash
cd zvfs
启动 daemon
```
./src/daemon/zvfs_daemon
```
可选环境变量
- `SPDK_BDEV_NAME`
- `SPDK_JSON_CONFIG`
- `ZVFS_SOCKET_PATH` / `ZVFS_IPC_SOCKET_PATH`
### 4.3 快速验证
```bash
mkdir -p /zvfs
LD_PRELOAD=./src/libzvfs.so ZVFS_TEST_ROOT=/zvfs ./tests/bin/hook_api_test
./tests/bin/ipc_zvfs_test
运行测试
```
---
## 5. 性能测试
### 5.1 测试目标
- 目标场景:低队列深度下阻塞 I/O 性能。
- 对比对象:`spdk_nvme_perf` 与内核路径(`O_DIRECT`)。
### 5.2 工具与脚本
- RocksDB`scripts/run_db_bench_zvfs.sh`
- PostgreSQL`codex/run_pgbench_no_mmap.sh`
建议:
- PostgreSQL 测试时关闭 mmap 路径shared memory 改为 sysv避免 mmap 干扰)。
### 5.3 历史结果
- QD=1 下可达到 `spdk_nvme_perf` 的约 `90%~95%`
- 相对同机 `O_DIRECT`,顺序写吞吐可有约 `2.2x~2.3x` 提升。
- 非对齐写因 RMW 开销,吞吐明显下降。
### 5.4 fio
```shell
root@ubuntu20-129:/home/lian/share/zvfs# LD_PRELOAD=/home/lian/share/zvfs/src/libzvfs.so fio ./fio_script/zvfs.fio
test: (g=0): rw=randwrite, bs=(R) 16.0KiB-16.0KiB, (W) 16.0KiB-16.0KiB, (T) 16.0KiB-16.0KiB, ioengine=psync, iodepth=64
fio-3.16
Starting 1 thread
test: Laying out IO file (1 file / 0MiB)
Jobs: 1 (f=1): [w(1)][100.0%][w=13.4MiB/s][w=857 IOPS][eta 00m:00s]
test: (groupid=0, jobs=1): err= 0: pid=16519: Sat Mar 14 14:11:27 2026
Description : ["variable bs"]
write: IOPS=829, BW=12.0MiB/s (13.6MB/s)(130MiB/10001msec); 0 zone resets
clat (usec): min=778, max=4000, avg=1199.89, stdev=377.74
lat (usec): min=779, max=4001, avg=1200.38, stdev=377.78
clat percentiles (usec):
| 1.00th=[ 848], 5.00th=[ 898], 10.00th=[ 922], 20.00th=[ 955],
| 30.00th=[ 979], 40.00th=[ 1004], 50.00th=[ 1029], 60.00th=[ 1074],
| 70.00th=[ 1221], 80.00th=[ 1500], 90.00th=[ 1614], 95.00th=[ 1975],
| 99.00th=[ 2606], 99.50th=[ 2966], 99.90th=[ 3359], 99.95th=[ 3425],
| 99.99th=[ 4015]
bw ( KiB/s): min=10048, max=15520, per=99.91%, avg=13258.32, stdev=1465.96, samples=19
iops : min= 628, max= 970, avg=828.63, stdev=91.62, samples=19
lat (usec) : 1000=38.46%
lat (msec) : 2=56.79%, 4=4.74%, 10=0.01%
cpu : usr=5.27%, sys=0.00%, ctx=8499, majf=0, minf=7
IO depths : 1=100.0%, 2=0.0%, 4=0.0%, 8=0.0%, 16=0.0%, 32=0.0%, >=64=0.0%
submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
issued rwts: total=0,8295,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=64
Run status group 0 (all jobs):
WRITE: bw=12.0MiB/s (13.6MB/s), 12.0MiB/s-12.0MiB/s (13.6MB/s-13.6MB/s), io=130MiB (136MB), run=10001-10001msec
Disk stats (read/write):
sda: ios=0/118, merge=0/104, ticks=0/66, in_queue=67, util=0.24%
```
### 5.5 pgbench
```shell
root@ubuntu20-129:/home/lian/share/zvfs# ./scripts/run_pgbench_no_mmap.sh
当前配置:
host=127.0.0.1 port=5432 db=benchdb
scale=1 clients=1 threads=1 time=15s preload=1
init_jobs=1 init_steps=dtg skip_init=0
[1/2] pgbench 初始化(-i
dropping old tables...
NOTICE: table "pgbench_accounts" does not exist, skipping
NOTICE: table "pgbench_branches" does not exist, skipping
NOTICE: table "pgbench_history" does not exist, skipping
NOTICE: table "pgbench_tellers" does not exist, skipping
creating tables...
generating data...
100000 of 100000 tuples (100%) done (elapsed 15.06 s, remaining 0.00 s)
done.
[2/2] pgbench 压测(-T
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
number of clients: 1
number of threads: 1
duration: 15 s
number of transactions actually processed: 564
latency average = 26.614 ms
tps = 37.573586 (including connections establishing)
tps = 38.176262 (excluding connections establishing)
```
```shell
root@ubuntu20-129:/home/lian/share/zvfs# ./scripts/run_pgbench.sh
当前配置:
host=127.0.0.1 port=5432 db=postgres
scale=1 clients=1 threads=1 time=15s preload=0
init_jobs=1 init_steps=dtg skip_init=0
[1/2] pgbench 初始化(-i
dropping old tables...
NOTICE: table "pgbench_accounts" does not exist, skipping
NOTICE: table "pgbench_branches" does not exist, skipping
NOTICE: table "pgbench_history" does not exist, skipping
NOTICE: table "pgbench_tellers" does not exist, skipping
creating tables...
generating data...
100000 of 100000 tuples (100%) done (elapsed 1.08 s, remaining 0.00 s)
done.
[2/2] pgbench 压测(-T
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
number of clients: 1
number of threads: 1
duration: 15 s
number of transactions actually processed: 586
latency average = 25.602 ms
tps = 39.059387 (including connections establishing)
tps = 39.102273 (excluding connections establishing)
LD_PRELOAD=./src/libzvfs.so ./tests/bin/hook_api_test
```
---
## 6. 关键工程难点与踩坑复盘(重点)
### SPDK 元数据回调线程模型
问题:把 metadata 操作随意派发到任意线程容易卡住或回调不回来。metadata的回调默认派发给初始化blobstore的线程。
blobstore metadata 操作与创建线程/通道绑定。
需要明确 metadata thread 和 io thread 分工。
### resize 导致程序卡死
- `resize/delete/unload` 内部会走 `spdk_for_each_channel()` barrier。大概是让其他的spdk线程同步resize之后的状态才能返回。所以如果要做要尽可能少做resize。
如果其他spdk线程持有iochannel并且没有持续poll就会导致卡死。
- 保证持有 channel 的线程持续 poll。
- 线程退出时严格释放 channel避免 barrier 永久等待。
### PostgreSQL Tablespace 无法命中 Hook
现象:建表空间后文件操作路径是 `pg_tblspc/...`daemon 无请求日志。
根因:
- PostgreSQL 通过符号链接访问 tablespace。
- 仅按字符串前缀 `/zvfs` 判断会漏判。
修复:
- 路径判定增加 `realpath()` 后再判断。
- `O_CREAT` 且文件尚不存在时,使用 `realpath(parent)+basename` 判定。
### PostgreSQL 报 `Permission denied`(跨用户连接 daemon
现象:`CREATE DATABASE ... TABLESPACE ...` 报权限错误。
根因:
- daemon 由 root 启动UDS 文件权限受 umask 影响。
- postgres 用户无法 `connect(/tmp/zvfs.sock)`
修复:
- daemon `bind` 后显式 `chmod(socket, 0666)`
### PostgreSQL 报 `Message too long`
现象:部分 SQL尤其 `CREATE DATABASE` 路径)失败,错误为 `Message too long`
根因:
- 不是 daemon 解析失败,而是 client 序列化请求时超出 `ZVFS_IPC_BUF_SIZE`
- 当前 hook 会把 `writev` 聚合成一次大写请求,容易触发上限。
当前处理:
-`ZVFS_IPC_BUF_SIZE` 提高到 `16MB``src/common/config.h`)。
后续优化方向:
- 在 client `blob_write_ex` 做透明分片发送(保持同步阻塞语义)。
### dup/dup2/fork 语义一致性
问题:多个 fd 指向同一 open file description 时,如何保证 handle 引用计数一致。
方案:
- 协议增加 `ADD_REF` / `ADD_REF_BATCH`
- 在 hook 中对 `dup/dup2/dup3/fork` 明确执行引用增加。
- `close_range` 增加边界保护(避免 `UINT_MAX` 场景死循环)。
### pg \c 调用链条
1. psql \c ...
2. fork backend
3. InitPostgres -> ValidatePgVersion ...
4. libc fopen
5. libc 内部 -> __IO_file_fopen -> _IO_file_open -> __open64
6. kernel openat
7. fscanf
8. libc __isoc99_fscanf -> \_IO\_* -> __read
9. kernel read
glic 内部调用走的不是 动态符号定位可能是一些隐藏别名。可能会绕过hook。需要拦截非常多变体
# 🔬 已实现功能
打开/关闭/删除
```
open open64 openat openat64 fopen fopen64
creat creat64
fclose close close_range
dup dup2 dup3 fork
unlink unlinkat remove rename renameat
```
读写层
```
read pread pread64 readv preadv preadv64 preadv2
write pwrite pwrite64 writev pwritev pwritev64 pwritev2
fread_unlocked fread fscanf
```
偏移/空间管理层
```
lseek lseek64
truncate truncate64 ftruncate ftruncate64 fallocate posix_fallocate
```
元数据层
```
stat stat64 fstat fstat64 lstat lstat64 fstatat fstatat64 statx
```
同步/控制层
```
fsync fdatasync sync_file_range
fcntl fcntl64 ioctl
```
---
## 7. 当前限制与下一步
# 🚀 性能
### 7.1 当前限制
测试环境VMware 虚拟机 + 模拟 NVMe单线程阻塞 I/O。
- 单请求仍受 `ZVFS_IPC_BUF_SIZE` 约束。
- `mmap` 暂不支持 zvfs fd
- `ADD_REF_BATCH` 当前优先功能,不保证原子性。
> VMware 模拟 NVMe 无法体现 SPDK 轮询模式对中断驱动 I/O 的延迟优势,
> 以下数据用于评估 hook 层与 IPC 的额外开销,不代表真实硬件上的性能对比
### 7.2 下一步计划
### 顺序写吞吐
1. 实现 `WRITE` 客户端透明分片,彻底消除单包上限问题。
2. 持续完善 PostgreSQL 场景tablespace + pgbench + crash/restart
3. 补齐更系统的性能复测(固定硬件、固定参数、全量报告)。
| Block Size | spdk_nvme_perf | ZVFS |
|---|---|---|
| 4K | 100 MiB/s | 94 MiB/s |
| 128K | 1843 MiB/s | 1662 MiB/s |
ZVFS 达到 **SPDK 原生性能约 90%**
---
### fio 随机写16Kpsync
| | kernel (psync) | ZVFS |
|---|---|---|
| IOPS | 1855 | 1353 |
| 吞吐 | 28.0 MiB/s | 21.2 MiB/s |
| avg clat | 492 µs | 692 µs |
| sys% | 28.6% | 8.4% |
> 当前 ZVFS 在该单线程 `psync` 随机写场景下达到 kernel `psync` 的约 73% IOPS。daemon 内部 `SPDK + reply_q` 已收敛到较稳定范围,剩余主要开销集中在 `client -> daemon` 请求进入阶段。
---
### WRITE 请求端到端延迟分解(单位 µs
基于 12 条 `WRITE` trace 样本统计,下面按调用栈层级展开平均耗时。由于四舍五入,父子项相加会有 `±1 µs` 误差。
```text
total 748
├─ c2s 317
│ ├─ send 39
│ └─ server_rx_wait 278
├─ server 336
│ ├─ rx_dispatch 12
│ ├─ dispatch_spdk 25
│ ├─ spdk 194
│ └─ reply_q 103
│ ├─ spdk_post 11
│ └─ cq_wait 91
│ ├─ kick 13
│ ├─ wake_sched 65
│ └─ wake_to_tx 12
└─ s2c 95
├─ resp_wait 83
└─ parse 12
```
当前 `WRITE` 的主要额外开销已经比较清晰:一是 `c2s / server_rx_wait`,二是 `server` 内部的 `spdk``reply_q`。在 `reply_q` 中,`wake_sched` 已明显大于 `kick``wake_to_tx`,说明回包路径的主要损耗不在 `eventfd` 写入本身,而在 reactor 被唤醒后的调度等待。
---
### pgbenchPostgreSQL TPC-B单客户端
| | kernel | ZVFS |
|---|---|---|
| TPS | 39.1 | 38.2 |
| avg latency | 25.6 ms | 26.6 ms |
端到端数据库工作负载下IPC 开销被稀释ZVFS 与 kernel 路径性能基本持平(~4% 差距)。
---
# ⚠️ 当前局限
- 不支持 mmap
- 非对齐写存在 RMW 开销
- IPC 请求大小存在上限:大 I/O 需在 hook 层分片;改用共享内存 scatter-gather 可消除此限制。
---
## future work
- 支持 mmap可通过 /dev/shm + userfaultfd 方向探索。
- 缓解非对齐写开销、`!O_DIRECT`语义:实现 类似 pagecache 的bufferpool
- 修改IPC方式使用更快的 Shared Memory
- 减少通信、拷贝开销:将 I/O 操作迁移至 Application 进程。MetaData操作保留在 Daemon 中。
---
# 🧩 遇到的一些问题
## SPDK metadata 线程模型
SPDK Blobstore metadata 回调必须在初始化线程执行,
需要严格区分:
- metadata thread
-io thread
否则会导致 callback 无法返回。resize barrier 卡死
## spdk_for_each_channel() 在 resize / delete 中会触发 barrier
如果某些线程未 poll 会导致系统卡死。
解决方式:
保证所有 IO thread 持续 poll
thread 退出时释放 io_channel
## PostgreSQL tablespace hook 失效
PostgreSQL tablespace 通过 symbolic link 访问路径: pg_tblspc/xxx
简单字符串前缀匹配 /zvfs 会漏判。
解决realpath() 后再判断路径
## write 延迟显著高于预期
这次 fio 延迟排查里,最初 `WRITE` 延迟明显高于预期。沿端到端路径加轻量打点后发现问题并不在 SPDK 本体,而是同时叠加了无条件 RMW、VM 中 poller 调度抖动、线程未绑核,以及后期 trace 暴露出来的 reactor 唤醒后核心切换抖动。对应处理是:整块对齐写跳过 read phase、将 reactor/md/io 线程固定到指定 CPU并把 io 线程数和绑核目标收敛到配置项中。修复后 `dispatch_spdk` 从毫秒级降到几十微秒,`WRITE` 平均延迟也回落到约 700 µs但剩余尾延迟仍主要表现为请求进入与回包阶段的调度等待。

View File

@@ -7,10 +7,10 @@ verify=0
time_based=1
runtime=10
bs=16K
size=16384
size=524288
iodepth=64
rw=randwrite
filename=kingfs
filename=/tmp/kingfs
ioengine=psync
[test]

View File

@@ -7,7 +7,7 @@ verify=0
time_based=1
runtime=10
bs=16K
size=16384
size=524288
iodepth=64
rw=randwrite
filename=/zvfs/fio/zvfsfio

View File

@@ -12,7 +12,7 @@ set -euo pipefail
PG_HOST="127.0.0.1"
PG_PORT="5432"
PG_DB="postgres"
PG_SCALE="10"
PG_SCALE="1"
PG_TIME="15"
PG_CLIENTS="32"
PG_THREADS="8"

View File

@@ -12,7 +12,7 @@ set -euo pipefail
PG_HOST="127.0.0.1"
PG_PORT="5432"
PG_DB="benchdb"
PG_SCALE="10"
PG_SCALE="1"
PG_TIME="15"
PG_CLIENTS="32"
PG_THREADS="8"

View File

@@ -19,4 +19,14 @@
// #define ZVFS_IPC_BUF_SIZE 4096
#define ZVFS_IPC_BUF_SIZE (16 * 1024 * 1024)
/*
* 线程布局:
* io 线程从 ZVFS_IO_CPU_START 开始连续绑定;
* reactor / md 默认放到 io 线程之后,避开 VM 中常见更嘈杂的 CPU0/1。
*/
#define ZVFS_IO_CPU_START 2
#define ZVFS_IO_THREAD_COUNT 3
#define ZVFS_REACTOR_CPU (ZVFS_IO_CPU_START + ZVFS_IO_THREAD_COUNT)
#define ZVFS_MD_CPU (ZVFS_REACTOR_CPU + 1)
#endif // __ZVFS_CONFIG_H__

View File

@@ -4,6 +4,8 @@
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/eventfd.h>
struct cq *g_cq;
@@ -11,11 +13,19 @@ struct cq *CQ_Create(void) {
struct cq *q = (struct cq*)malloc(sizeof(*q));
if (!q) return NULL;
q->head = q->tail = NULL;
q->wake_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (q->wake_fd < 0) {
free(q);
return NULL;
}
pthread_mutex_init(&q->lock, NULL);
return q;
}
void CQ_Destroy(struct cq *q) {
if (!q) {
return;
}
while (q->head) {
struct cq_item *tmp = q->head;
q->head = tmp->next;
@@ -23,6 +33,9 @@ void CQ_Destroy(struct cq *q) {
free(tmp->resp);
free(tmp);
}
if (q->wake_fd >= 0) {
close(q->wake_fd);
}
pthread_mutex_destroy(&q->lock);
free(q);
}
@@ -41,6 +54,15 @@ void CQ_Push(struct cq *q, struct zvfs_resp *resp) {
q->head = q->tail = item;
}
pthread_mutex_unlock(&q->lock);
if (q->wake_fd >= 0) {
uint64_t one = 1;
ssize_t rc;
do {
rc = write(q->wake_fd, &one, sizeof(one));
} while (rc < 0 && errno == EINTR);
}
}
/* 弹出响应 */
@@ -59,3 +81,10 @@ struct zvfs_resp *CQ_Pop(struct cq *q) {
free(item);
return resp;
}
int CQ_GetWakeFd(const struct cq *q) {
if (!q) {
return -1;
}
return q->wake_fd;
}

View File

@@ -13,6 +13,7 @@ struct cq_item {
struct cq {
struct cq_item *head;
struct cq_item *tail;
int wake_fd;
pthread_mutex_t lock;
};
@@ -20,6 +21,7 @@ struct cq *CQ_Create(void);
void CQ_Destroy(struct cq *q);
void CQ_Push(struct cq *q, struct zvfs_resp *resp);
struct zvfs_resp *CQ_Pop(struct cq *q);
int CQ_GetWakeFd(const struct cq *q);
extern struct cq *g_cq;

View File

@@ -1,6 +1,6 @@
#include "ipc_reactor.h"
#include "ipc_cq.h"
#include "common/config.h"
#include "proto/ipc_proto.h"
#include <stdlib.h>
#include <string.h>
@@ -11,88 +11,185 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/stat.h>
#include <stdint.h>
#include <time.h>
static int send_all(int fd, const uint8_t *buf, size_t len) {
size_t off = 0;
static char g_reactor_wake_token;
static uint64_t now_mono_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
}
static void epoll_mod(struct zvfs_reactor *r, int fd, void *ptr, uint32_t events);
struct zvfs_pending_tx {
struct zvfs_pending_tx *next;
struct zvfs_conn *conn;
uint8_t *buf;
size_t len;
size_t off;
size_t trace_wake_write_patch_off;
size_t trace_reactor_wake_patch_off;
size_t trace_tx_patch_off;
int trace_wake_write_patched;
int trace_reactor_wake_patched;
int trace_tx_patched;
};
#define ZVFS_TX_TRACE_OFFSET_NONE ((size_t)-1)
enum trace_patch_field {
TRACE_PATCH_WAKE_WRITE = 0,
TRACE_PATCH_REACTOR_WAKE,
TRACE_PATCH_RESP_TX,
};
static void free_pending_tx(struct zvfs_pending_tx *tx) {
if (!tx) {
return;
}
if (tx->conn) {
zvfs_conn_put(tx->conn);
}
free(tx->buf);
free(tx);
}
static void free_conn_tx_queue_locked(struct zvfs_conn *conn) {
struct zvfs_pending_tx *tx;
struct zvfs_pending_tx *next;
for (tx = conn->tx_head; tx != NULL; tx = next) {
next = tx->next;
free_pending_tx(tx);
}
conn->tx_head = NULL;
conn->tx_tail = NULL;
}
static void conn_queue_tx_locked(struct zvfs_conn *conn, struct zvfs_pending_tx *tx) {
tx->next = NULL;
if (!conn->tx_tail) {
conn->tx_head = tx;
conn->tx_tail = tx;
return;
}
conn->tx_tail->next = tx;
conn->tx_tail = tx;
}
static size_t resp_trace_patch_offset(const struct zvfs_resp *resp, enum trace_patch_field field) {
size_t base = ZVFS_RESP_HEADER_WIRE_SIZE;
size_t trace_base;
size_t index;
if (!resp || (resp->trace.flags & ZVFS_RESP_TRACE_F_VALID) == 0) {
return ZVFS_TX_TRACE_OFFSET_NONE;
}
switch (resp->opcode) {
case ZVFS_OP_WRITE:
trace_base = base + sizeof(uint64_t);
break;
case ZVFS_OP_SYNC_MD:
trace_base = base;
break;
default:
return ZVFS_TX_TRACE_OFFSET_NONE;
}
switch (field) {
case TRACE_PATCH_WAKE_WRITE:
index = 6;
break;
case TRACE_PATCH_REACTOR_WAKE:
index = 7;
break;
case TRACE_PATCH_RESP_TX:
index = 8;
break;
default:
return ZVFS_TX_TRACE_OFFSET_NONE;
}
return trace_base + sizeof(uint32_t) + sizeof(uint64_t) * index;
}
static void patch_pending_tx_field(struct zvfs_pending_tx *tx, size_t off, int *patched) {
uint64_t ts;
if (!tx || !patched || *patched || off == ZVFS_TX_TRACE_OFFSET_NONE) {
return;
}
if (off + sizeof(ts) > tx->len) {
*patched = 1;
return;
}
ts = now_mono_ns();
memcpy(tx->buf + off, &ts, sizeof(ts));
*patched = 1;
}
static void patch_pending_tx_trace(struct zvfs_pending_tx *tx) {
patch_pending_tx_field(tx, tx->trace_tx_patch_off, &tx->trace_tx_patched);
}
static void patch_conn_head_reactor_wake_locked(struct zvfs_conn *conn) {
if (!conn || !conn->tx_head) {
return;
}
patch_pending_tx_field(conn->tx_head,
conn->tx_head->trace_reactor_wake_patch_off,
&conn->tx_head->trace_reactor_wake_patched);
}
static int conn_flush_tx_locked(struct zvfs_conn *conn) {
while (conn->tx_head) {
struct zvfs_pending_tx *tx = conn->tx_head;
patch_pending_tx_trace(tx);
while (tx->off < tx->len) {
ssize_t sent = send(conn->fd, tx->buf + tx->off, tx->len - tx->off, 0);
while (off < len) {
ssize_t sent = send(fd, buf + off, len - off, 0);
if (sent > 0) {
off += (size_t)sent;
tx->off += (size_t)sent;
continue;
}
if (sent < 0 && errno == EINTR) {
continue;
}
if (sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
/* 当前实现优先功能,等待对端可写后重试。 */
usleep(100);
continue;
if (!conn->want_write) {
conn->want_write = 1;
epoll_mod(conn->reactor, conn->fd, conn, EPOLLIN | EPOLLOUT | EPOLLET);
}
return 0;
}
return -1;
}
conn->tx_head = tx->next;
if (!conn->tx_head) {
conn->tx_tail = NULL;
}
free_pending_tx(tx);
}
if (conn->want_write) {
conn->want_write = 0;
epoll_mod(conn->reactor, conn->fd, conn, EPOLLIN | EPOLLET);
}
return 0;
}
/** ====================================================== */
/* CQ OP */
/** ====================================================== */
static void cq_consume_send(struct cq *q) {
struct zvfs_resp *resp;
while ((resp = CQ_Pop(q)) != NULL) {
struct zvfs_conn *conn = resp->conn;
size_t cap = ZVFS_IPC_BUF_SIZE;
uint8_t *buf = NULL;
// printf("[resp][%s]\n",cast_opcode2string(resp->opcode));
buf = malloc(cap);
if (!buf) {
fprintf(stderr, "serialize resp failed: alloc %zu bytes\n", cap);
free(resp->data);
free(resp);
continue;
}
size_t n = zvfs_serialize_resp(resp, buf, cap);
if (n == 0 && resp->status == 0 && resp->opcode == ZVFS_OP_READ) {
if (resp->length <= SIZE_MAX - 64) {
size_t need = (size_t)resp->length + 64;
uint8_t *bigger = realloc(buf, need);
if (bigger) {
buf = bigger;
cap = need;
n = zvfs_serialize_resp(resp, buf, cap);
}
}
}
if (n == 0) {
fprintf(stderr, "serialize resp failed: op=%u status=%d len=%lu cap=%zu\n",
resp->opcode, resp->status, resp->length, cap);
free(buf);
free(resp->data);
free(resp);
continue;
}
if (send_all(conn->fd, buf, n) != 0) {
perror("send");
free(buf);
free(resp->data);
free(resp);
continue;
}
free(buf);
// 清理
if(resp->data) free(resp->data);
free(resp);
}
}
static int set_nonblock(int fd){
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0)
@@ -122,11 +219,94 @@ static void epoll_mod(struct zvfs_reactor *r, int fd, void *ptr, uint32_t events
epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev);
}
static void reactor_wake(struct zvfs_reactor *r) {
uint64_t one = 1;
ssize_t n;
if (!r || r->wake_fd < 0) {
return;
}
do {
n = write(r->wake_fd, &one, sizeof(one));
} while (n < 0 && errno == EINTR);
}
static void reactor_drain_wakefd(struct zvfs_reactor *r) {
for (;;) {
uint64_t value;
ssize_t n;
n = read(r->wake_fd, &value, sizeof(value));
if (n == (ssize_t)sizeof(value)) {
continue;
}
if (n < 0 && errno == EINTR) {
continue;
}
if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
return;
}
return;
}
}
static void reactor_queue_ready_conn(struct zvfs_conn *conn) {
struct zvfs_reactor *r;
if (!conn || !conn->reactor) {
return;
}
r = conn->reactor;
zvfs_conn_get(conn);
pthread_mutex_lock(&r->ready_lock);
conn->ready_next = NULL;
if (!r->ready_tail) {
r->ready_head = conn;
r->ready_tail = conn;
} else {
r->ready_tail->ready_next = conn;
r->ready_tail = conn;
}
pthread_mutex_unlock(&r->ready_lock);
reactor_wake(r);
}
static struct zvfs_conn *reactor_pop_ready_conn(struct zvfs_reactor *r) {
struct zvfs_conn *conn;
pthread_mutex_lock(&r->ready_lock);
conn = r->ready_head;
if (conn) {
r->ready_head = conn->ready_next;
if (!r->ready_head) {
r->ready_tail = NULL;
}
conn->ready_next = NULL;
}
pthread_mutex_unlock(&r->ready_lock);
return conn;
}
static void conn_destroy(struct zvfs_conn *c){
close(c->fd);
pthread_mutex_destroy(&c->tx_lock);
free(c);
}
void zvfs_conn_get(struct zvfs_conn *conn){
atomic_fetch_add_explicit(&conn->refcnt, 1, memory_order_relaxed);
}
void zvfs_conn_put(struct zvfs_conn *conn){
if (atomic_fetch_sub_explicit(&conn->refcnt, 1, memory_order_acq_rel) == 1) {
conn_destroy(conn);
}
}
int zvfs_conn_get_fd(struct zvfs_conn *conn){
return conn->fd;
}
@@ -140,38 +320,146 @@ void *zvfs_conn_get_ctx(struct zvfs_conn *conn){
}
void zvfs_conn_enable_write(struct zvfs_conn *conn){
if (conn->want_write)
return;
pthread_mutex_lock(&conn->tx_lock);
if (!conn->closed && !conn->want_write) {
conn->want_write = 1;
struct zvfs_reactor *r = conn->reactor;
epoll_mod(r, conn->fd, conn,
EPOLLIN | EPOLLOUT | EPOLLET);
epoll_mod(conn->reactor, conn->fd, conn, EPOLLIN | EPOLLOUT | EPOLLET);
}
pthread_mutex_unlock(&conn->tx_lock);
}
void zvfs_conn_disable_write(struct zvfs_conn *conn){
if (!conn->want_write)
return;
pthread_mutex_lock(&conn->tx_lock);
if (!conn->closed && conn->want_write) {
conn->want_write = 0;
struct zvfs_reactor *r = conn->reactor;
epoll_mod(r, conn->fd, conn,
EPOLLIN | EPOLLET);
epoll_mod(conn->reactor, conn->fd, conn, EPOLLIN | EPOLLET);
}
pthread_mutex_unlock(&conn->tx_lock);
}
void zvfs_conn_close(struct zvfs_conn *conn){
struct zvfs_reactor *r = conn->reactor;
int fd = -1;
if (r->opts.on_close)
pthread_mutex_lock(&conn->tx_lock);
if (conn->closed) {
pthread_mutex_unlock(&conn->tx_lock);
return;
}
conn->closed = 1;
fd = conn->fd;
conn->fd = -1;
free_conn_tx_queue_locked(conn);
pthread_mutex_unlock(&conn->tx_lock);
if (r->opts.on_close) {
r->opts.on_close(conn, r->opts.cb_ctx);
}
epoll_ctl(r->epfd, EPOLL_CTL_DEL, conn->fd, NULL);
if (fd >= 0) {
epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
conn_destroy(conn);
zvfs_conn_put(conn);
}
int zvfs_conn_submit_resp(struct zvfs_resp *resp) {
struct zvfs_conn *conn;
struct zvfs_pending_tx *tx = NULL;
uint8_t *buf = NULL;
size_t cap = ZVFS_IPC_BUF_SIZE;
size_t n;
int need_schedule = 0;
int tx_conn_ref = 0;
if (!resp || !resp->conn) {
return -1;
}
conn = resp->conn;
buf = malloc(cap);
if (!buf) {
fprintf(stderr, "serialize resp failed: alloc %zu bytes\n", cap);
goto err;
}
n = zvfs_serialize_resp(resp, buf, cap);
if (n == 0 && resp->status == 0 && resp->opcode == ZVFS_OP_READ) {
if (resp->length <= SIZE_MAX - 64) {
size_t need = (size_t)resp->length + 64;
uint8_t *bigger = realloc(buf, need);
if (bigger) {
buf = bigger;
cap = need;
n = zvfs_serialize_resp(resp, buf, cap);
}
}
}
if (n == 0) {
fprintf(stderr, "serialize resp failed: op=%u status=%d len=%lu cap=%zu\n",
resp->opcode, resp->status, resp->length, cap);
goto err;
}
tx = calloc(1, sizeof(*tx));
if (!tx) {
fprintf(stderr, "alloc pending tx failed: len=%zu\n", n);
goto err;
}
tx->conn = conn;
tx->buf = buf;
tx->len = n;
tx->trace_wake_write_patch_off = resp_trace_patch_offset(resp, TRACE_PATCH_WAKE_WRITE);
tx->trace_reactor_wake_patch_off = resp_trace_patch_offset(resp, TRACE_PATCH_REACTOR_WAKE);
tx->trace_tx_patch_off = resp_trace_patch_offset(resp, TRACE_PATCH_RESP_TX);
tx->trace_wake_write_patched = 0;
tx->trace_reactor_wake_patched = 0;
tx->trace_tx_patched = 0;
zvfs_conn_get(conn);
tx_conn_ref = 1;
pthread_mutex_lock(&conn->tx_lock);
if (conn->closed) {
pthread_mutex_unlock(&conn->tx_lock);
goto err;
}
conn_queue_tx_locked(conn, tx);
if (!conn->want_write && !conn->queued_for_flush) {
conn->queued_for_flush = 1;
need_schedule = 1;
}
pthread_mutex_unlock(&conn->tx_lock);
if (need_schedule) {
patch_pending_tx_field(tx, tx->trace_wake_write_patch_off, &tx->trace_wake_write_patched);
reactor_queue_ready_conn(conn);
}
if (resp->data) {
free(resp->data);
}
free(resp);
zvfs_conn_put(conn);
return 0;
err:
if (tx) {
tx->conn = tx_conn_ref ? conn : NULL;
free_pending_tx(tx);
buf = NULL;
}
free(buf);
if (resp->data) {
free(resp->data);
}
free(resp);
zvfs_conn_put(conn);
return -1;
}
/**
@@ -214,14 +502,17 @@ struct zvfs_reactor *zvfs_reactor_create(const struct zvfs_reactor_opts *opts){
struct zvfs_reactor *r = calloc(1, sizeof(*r));
r->opts = *opts;
pthread_mutex_init(&r->ready_lock, NULL);
r->epfd = epoll_create1(0);
r->wake_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
r->listen_fd = create_listen_socket(
opts->socket_path,
opts->backlog);
epoll_add(r, r->listen_fd, NULL, EPOLLIN);
epoll_add(r, r->wake_fd, &g_reactor_wake_token, EPOLLIN);
return r;
}
@@ -245,6 +536,8 @@ static void handle_accept(struct zvfs_reactor *r){
conn->fd = fd;
conn->reactor = r;
atomic_init(&conn->refcnt, 1);
pthread_mutex_init(&conn->tx_lock, NULL);
epoll_add(r, fd, conn, EPOLLIN | EPOLLET);
@@ -260,39 +553,70 @@ zvfs_reactor_run(struct zvfs_reactor *r){
r->running = 1;
while (r->running) {
int n = epoll_wait(r->epfd, events, 64, 0);
if (n < 0) {
if (errno == EINTR) {
continue;
}
return -1;
}
for (int i = 0; i < n; i++) {
if (events[i].data.ptr == NULL) {
handle_accept(r);
continue;
}
if (events[i].data.ptr == &g_reactor_wake_token) {
struct zvfs_conn *ready_conn;
reactor_drain_wakefd(r);
while ((ready_conn = reactor_pop_ready_conn(r)) != NULL) {
pthread_mutex_lock(&ready_conn->tx_lock);
ready_conn->queued_for_flush = 0;
patch_conn_head_reactor_wake_locked(ready_conn);
if (!ready_conn->closed && conn_flush_tx_locked(ready_conn) != 0) {
pthread_mutex_unlock(&ready_conn->tx_lock);
zvfs_conn_close(ready_conn);
zvfs_conn_put(ready_conn);
continue;
}
pthread_mutex_unlock(&ready_conn->tx_lock);
zvfs_conn_put(ready_conn);
}
continue;
}
struct zvfs_conn *conn = events[i].data.ptr;
if (events[i].events & (EPOLLHUP | EPOLLERR)) {
zvfs_conn_close(conn);
continue;
}
if ((events[i].events & EPOLLIN) &&
r->opts.on_read) {
if ((events[i].events & EPOLLIN) && r->opts.on_read) {
r->opts.on_read(conn, r->opts.cb_ctx);
}
if ((events[i].events & EPOLLOUT) &&
r->opts.on_write) {
if (events[i].events & EPOLLOUT) {
int rc;
pthread_mutex_lock(&conn->tx_lock);
rc = conn->closed ? 0 : conn_flush_tx_locked(conn);
pthread_mutex_unlock(&conn->tx_lock);
if (rc != 0) {
zvfs_conn_close(conn);
continue;
}
}
if ((events[i].events & EPOLLOUT) && r->opts.on_write) {
r->opts.on_write(conn, r->opts.cb_ctx);
}
}
cq_consume_send(g_cq);
}
return 0;
}
@@ -303,7 +627,8 @@ void zvfs_reactor_stop(struct zvfs_reactor *r){
void zvfs_reactor_destroy(struct zvfs_reactor *r){
close(r->listen_fd);
close(r->wake_fd);
close(r->epfd);
pthread_mutex_destroy(&r->ready_lock);
free(r);
}

View File

@@ -3,6 +3,8 @@
#include <stdint.h>
#include <stddef.h>
#include <pthread.h>
#include <stdatomic.h>
#ifdef __cplusplus
extern "C" {
@@ -11,6 +13,8 @@ extern "C" {
struct zvfs_reactor_opts;
struct zvfs_conn;
struct zvfs_reactor;
struct zvfs_pending_tx;
struct zvfs_resp;
/* callbacks */
@@ -57,10 +61,19 @@ struct zvfs_conn {
int fd;
int want_write;
int closed;
int queued_for_flush;
void *user_ctx;
struct zvfs_reactor *reactor;
pthread_mutex_t tx_lock;
atomic_int refcnt;
struct zvfs_pending_tx *tx_head;
struct zvfs_pending_tx *tx_tail;
struct zvfs_conn *ready_next;
};
struct zvfs_reactor {
@@ -69,8 +82,14 @@ struct zvfs_reactor {
int listen_fd;
int wake_fd;
int running;
pthread_mutex_t ready_lock;
struct zvfs_conn *ready_head;
struct zvfs_conn *ready_tail;
struct zvfs_reactor_opts opts;
};
@@ -110,6 +129,15 @@ zvfs_conn_set_ctx(struct zvfs_conn *conn, void *ctx);
void *
zvfs_conn_get_ctx(struct zvfs_conn *conn);
void
zvfs_conn_get(struct zvfs_conn *conn);
void
zvfs_conn_put(struct zvfs_conn *conn);
int
zvfs_conn_submit_resp(struct zvfs_resp *resp);
#ifdef __cplusplus
}

View File

@@ -1,8 +1,9 @@
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include "common/config.h"
#include "proto/ipc_proto.h"
#include "ipc_reactor.h"
#include "ipc_cq.h"
#include "spdk_engine_wrapper.h"
#include <stdio.h>
@@ -13,6 +14,8 @@
#include <sys/types.h>
#include <errno.h>
#include <stdlib.h>
#include <time.h>
#include <sched.h>
// #define IPC_REACTOR_ECHO
@@ -20,6 +23,192 @@
extern struct zvfs_spdk_io_engine g_engine;
enum conn_rx_stage {
CONN_RX_STAGE_HEADER = 0,
CONN_RX_STAGE_BODY,
CONN_RX_STAGE_WRITE_META,
CONN_RX_STAGE_WRITE_DATA,
};
struct conn_rx_ctx {
enum conn_rx_stage stage;
uint8_t header_buf[ZVFS_REQ_HEADER_WIRE_SIZE];
size_t header_bytes;
struct zvfs_req_header header;
struct zvfs_req *req;
uint8_t *body_buf;
size_t body_len;
size_t body_bytes;
uint8_t write_meta_buf[ZVFS_REQ_WRITE_FIXED_WIRE_SIZE];
size_t write_meta_bytes;
};
static uint64_t now_mono_ns(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
}
static void bind_current_thread_cpu(const char *name, int cpu_id)
{
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(cpu_id, &set);
if (sched_setaffinity(0, sizeof(set), &set) != 0) {
fprintf(stderr, "[affinity] bind %s to cpu %d failed: %s\n",
name, cpu_id, strerror(errno));
}
}
static void conn_rx_reset(struct conn_rx_ctx *rctx)
{
if (!rctx) {
return;
}
free(rctx->body_buf);
rctx->body_buf = NULL;
rctx->body_len = 0;
rctx->body_bytes = 0;
rctx->write_meta_bytes = 0;
rctx->header_bytes = 0;
memset(&rctx->header, 0, sizeof(rctx->header));
rctx->stage = CONN_RX_STAGE_HEADER;
}
static void conn_rx_discard_req(struct conn_rx_ctx *rctx)
{
if (!rctx) {
return;
}
if (rctx->req) {
free(rctx->req->data);
free(rctx->req->add_ref_items);
free(rctx->req);
rctx->req = NULL;
}
conn_rx_reset(rctx);
}
static int conn_rx_req_ready(struct zvfs_conn *c, struct conn_rx_ctx *rctx)
{
struct zvfs_req *req = rctx->req;
req->trace_req_rx_ns = now_mono_ns();
zvfs_conn_get(c);
req->conn = c;
req->trace_dispatch_ns = now_mono_ns();
rctx->req = NULL;
conn_rx_reset(rctx);
if (dispatch_to_worker(req) < 0) {
fprintf(stderr, "[dispatcher] [fd:%d] dispatch error\n", c->fd);
}
return 0;
}
static int conn_rx_handle_write_meta(struct zvfs_conn *c, struct conn_rx_ctx *rctx)
{
struct zvfs_req_write_body body;
if (zvfs_deserialize_req_write_fixed(rctx->write_meta_buf, sizeof(rctx->write_meta_buf), &body) !=
ZVFS_REQ_WRITE_FIXED_WIRE_SIZE) {
fprintf(stderr, "[read] malformed write meta fd=%d\n", c->fd);
return -1;
}
rctx->req = calloc(1, sizeof(*rctx->req));
if (!rctx->req) {
fprintf(stderr, "[read] alloc write req failed fd=%d\n", c->fd);
return -1;
}
rctx->req->opcode = ZVFS_OP_WRITE;
rctx->req->handle_id = body.handle_id;
rctx->req->offset = body.offset;
rctx->req->length = body.length;
rctx->req->write_flags = body.flags;
if (body.length > 0) {
rctx->req->data = malloc((size_t)body.length);
if (!rctx->req->data) {
fprintf(stderr, "[read] alloc write payload failed fd=%d len=%lu\n",
c->fd, (unsigned long)body.length);
return -1;
}
}
rctx->body_len = (size_t)body.length;
rctx->body_bytes = 0;
if (rctx->body_len == 0) {
return conn_rx_req_ready(c, rctx);
}
rctx->stage = CONN_RX_STAGE_WRITE_DATA;
return 0;
}
static int conn_rx_prepare_next(struct zvfs_conn *c, struct conn_rx_ctx *rctx)
{
struct zvfs_req *req;
if (rctx->header.opcode < ZVFS_OP_CREATE || rctx->header.opcode > ZVFS_OP_RESET_BLOBSTORE) {
fprintf(stderr, "[read] invalid opcode fd=%d op=%u\n", c->fd, rctx->header.opcode);
return -1;
}
if (rctx->header.opcode == ZVFS_OP_WRITE) {
if (rctx->header.payload_len < ZVFS_REQ_WRITE_FIXED_WIRE_SIZE) {
fprintf(stderr, "[read] short write payload fd=%d payload_len=%u\n",
c->fd, rctx->header.payload_len);
return -1;
}
rctx->stage = CONN_RX_STAGE_WRITE_META;
rctx->write_meta_bytes = 0;
return 0;
}
req = calloc(1, sizeof(*req));
if (!req) {
fprintf(stderr, "[read] alloc req failed fd=%d\n", c->fd);
return -1;
}
rctx->req = req;
rctx->body_len = ZVFS_REQ_HEADER_WIRE_SIZE + rctx->header.payload_len;
rctx->body_bytes = 0;
rctx->body_buf = malloc(rctx->body_len);
if (!rctx->body_buf) {
fprintf(stderr, "[read] alloc body failed fd=%d len=%zu\n", c->fd, rctx->body_len);
return -1;
}
memcpy(rctx->body_buf, rctx->header_buf, ZVFS_REQ_HEADER_WIRE_SIZE);
rctx->body_bytes = ZVFS_REQ_HEADER_WIRE_SIZE;
rctx->stage = CONN_RX_STAGE_BODY;
if (rctx->body_len == ZVFS_REQ_HEADER_WIRE_SIZE) {
if (zvfs_deserialize_req(rctx->body_buf, rctx->body_len, rctx->req) == 0) {
fprintf(stderr, "[read] deserialize req failed fd=%d op=%u\n", c->fd, rctx->header.opcode);
return -1;
}
return conn_rx_req_ready(c, rctx);
}
return 0;
}
#ifdef IPC_REACTOR_ECHO
static void on_accept(struct zvfs_conn *conn, void *ctx)
@@ -103,26 +292,14 @@ int main()
#else
static void on_accept(struct zvfs_conn *conn, void *ctx)
{
struct {
uint8_t *buf;
size_t len;
size_t cap;
} *rctx = calloc(1, sizeof(*rctx));
struct conn_rx_ctx *rctx = calloc(1, sizeof(*rctx));
if (!rctx) {
fprintf(stderr, "[accept] alloc conn ctx failed\n");
zvfs_conn_close(conn);
return;
}
rctx->cap = ZVFS_IPC_BUF_SIZE;
rctx->buf = calloc(1, rctx->cap);
if (!rctx->buf) {
fprintf(stderr, "[accept] alloc conn rx buffer failed\n");
free(rctx);
zvfs_conn_close(conn);
return;
}
conn_rx_reset(rctx);
zvfs_conn_set_ctx(conn, rctx);
printf("client connected fd=%d\n",
@@ -132,29 +309,47 @@ static void on_accept(struct zvfs_conn *conn, void *ctx)
static void on_read(struct zvfs_conn *c, void *ctx)
{
int fd = zvfs_conn_get_fd(c);
struct {
uint8_t *buf;
size_t len;
size_t cap;
} *rctx = zvfs_conn_get_ctx(c);
struct conn_rx_ctx *rctx = zvfs_conn_get_ctx(c);
if (!rctx || !rctx->buf || rctx->cap == 0) {
if (!rctx) {
fprintf(stderr, "[read] invalid conn ctx fd=%d\n", fd);
zvfs_conn_close(c);
return;
}
for (;;) {
if (rctx->len >= rctx->cap) {
fprintf(stderr, "[read] rx buffer overflow fd=%d len=%zu cap=%zu\n",
fd, rctx->len, rctx->cap);
ssize_t n;
size_t need;
void *dst;
switch (rctx->stage) {
case CONN_RX_STAGE_HEADER:
need = sizeof(rctx->header_buf) - rctx->header_bytes;
dst = rctx->header_buf + rctx->header_bytes;
break;
case CONN_RX_STAGE_BODY:
need = rctx->body_len - rctx->body_bytes;
dst = rctx->body_buf + rctx->body_bytes;
break;
case CONN_RX_STAGE_WRITE_META:
need = sizeof(rctx->write_meta_buf) - rctx->write_meta_bytes;
dst = rctx->write_meta_buf + rctx->write_meta_bytes;
break;
case CONN_RX_STAGE_WRITE_DATA:
need = rctx->body_len - rctx->body_bytes;
dst = (uint8_t *)rctx->req->data + rctx->body_bytes;
break;
default:
fprintf(stderr, "[read] invalid stage fd=%d stage=%d\n", fd, (int)rctx->stage);
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
ssize_t n = read(fd, rctx->buf + rctx->len, rctx->cap - rctx->len);
n = read(fd, dst, need);
if (n == 0) {
fprintf(stderr, "[read] fd=%d closed\n", fd);
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
@@ -162,63 +357,73 @@ static void on_read(struct zvfs_conn *c, void *ctx)
if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("[read]");
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
break;
return;
}
rctx->len += (size_t)n;
}
size_t offset = 0;
while (offset < rctx->len) {
struct zvfs_req *req = calloc(1, sizeof(*req));
if (!req) {
fprintf(stderr, "malloc failed\n");
break;
}
size_t consumed = zvfs_deserialize_req(rctx->buf + offset, rctx->len - offset, req);
if (consumed == 0) {
free(req);
break; /* 等待更多数据 */
}
// printf("[req][%s]\n", cast_opcode2string(req->opcode));
req->conn = c;
offset += consumed;
if (dispatch_to_worker(req) < 0) {
fprintf(stderr, "[dispatcher] [fd:%d] dispatch error\n", c->fd);
}
}
if (offset > 0) {
size_t remain = rctx->len - offset;
if (remain > 0) {
memmove(rctx->buf, rctx->buf + offset, remain);
}
rctx->len = remain;
}
if (rctx->len == rctx->cap) {
fprintf(stderr, "[read] request too large or malformed fd=%d cap=%zu\n",
fd, rctx->cap);
switch (rctx->stage) {
case CONN_RX_STAGE_HEADER:
rctx->header_bytes += (size_t)n;
if (rctx->header_bytes == sizeof(rctx->header_buf)) {
if (zvfs_deserialize_req_header(rctx->header_buf, sizeof(rctx->header_buf), &rctx->header) == 0 ||
conn_rx_prepare_next(c, rctx) != 0) {
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
}
break;
case CONN_RX_STAGE_BODY:
rctx->body_bytes += (size_t)n;
if (rctx->body_bytes == rctx->body_len) {
if (zvfs_deserialize_req(rctx->body_buf, rctx->body_len, rctx->req) == 0) {
fprintf(stderr, "[read] deserialize req failed fd=%d op=%u\n", fd, rctx->header.opcode);
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
if (conn_rx_req_ready(c, rctx) != 0) {
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
}
break;
case CONN_RX_STAGE_WRITE_META:
rctx->write_meta_bytes += (size_t)n;
if (rctx->write_meta_bytes == sizeof(rctx->write_meta_buf)) {
if (conn_rx_handle_write_meta(c, rctx) != 0) {
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
}
break;
case CONN_RX_STAGE_WRITE_DATA:
rctx->body_bytes += (size_t)n;
if (rctx->body_bytes == rctx->body_len) {
if (conn_rx_req_ready(c, rctx) != 0) {
conn_rx_discard_req(rctx);
zvfs_conn_close(c);
return;
}
}
break;
default:
break;
}
}
}
static void on_close(struct zvfs_conn *conn, void *ctx)
{
struct {
uint8_t *buf;
size_t len;
size_t cap;
} *rctx = zvfs_conn_get_ctx(conn);
struct conn_rx_ctx *rctx = zvfs_conn_get_ctx(conn);
if (rctx) {
free(rctx->buf);
conn_rx_discard_req(rctx);
free(rctx);
zvfs_conn_set_ctx(conn, NULL);
}
@@ -229,14 +434,10 @@ static void on_close(struct zvfs_conn *conn, void *ctx)
int main(void){
const char *bdev_name = getenv("SPDK_BDEV_NAME") ? getenv("SPDK_BDEV_NAME") : ZVFS_BDEV;
const char *json_file = getenv("SPDK_JSON_CONFIG") ? getenv("SPDK_JSON_CONFIG") : SPDK_JSON_PATH;
g_cq = CQ_Create();
zvfs_engine_init(bdev_name, json_file, 4);
zvfs_engine_init(bdev_name, json_file, ZVFS_IO_THREAD_COUNT + 1);
bind_current_thread_cpu("reactor", ZVFS_REACTOR_CPU);
struct zvfs_reactor_opts opts = {
.socket_path = ZVFS_IPC_DEFAULT_SOCKET_PATH,
@@ -251,9 +452,5 @@ int main(void){
struct zvfs_reactor *r = zvfs_reactor_create(&opts);
zvfs_reactor_run(r);
if(g_cq) CQ_Destroy(g_cq);
}
#endif

View File

@@ -1,7 +1,10 @@
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include "common/utils.h"
#include "common/config.h"
#include "spdk_engine.h"
#include "ipc_cq.h"
#include "ipc_reactor.h"
#include "dma_buf_pool.h"
#include <spdk/event.h>
@@ -15,6 +18,8 @@
#include <string.h>
#include <time.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <sched.h>
/** ===========================================================
* 全局引擎状态
@@ -25,10 +30,73 @@ static _Atomic bool g_bs_resetting = false;
/** ===========================================================
* 内部辅助:时钟
* ============================================================ */
static uint64_t now_mono_ms(void) {
static uint64_t now_mono_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
}
static uint64_t now_mono_ms(void) {
return now_mono_ns() / 1000000ULL;
}
static void resp_trace_init_from_req(struct zvfs_resp *resp, const struct zvfs_req *req) {
if (!resp || !req) {
return;
}
resp->trace.flags = ZVFS_RESP_TRACE_F_VALID;
resp->trace.req_rx_ns = req->trace_req_rx_ns;
resp->trace.dispatch_ns = req->trace_dispatch_ns;
}
static void bind_current_thread_cpu(const char *name, int cpu_id) {
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(cpu_id, &set);
if (sched_setaffinity(0, sizeof(set), &set) != 0) {
SPDK_ERRLOG("bind %s to cpu %d failed: %s\n",
name, cpu_id, strerror(errno));
return;
}
SPDK_NOTICELOG("bound %s to cpu %d\n", name, cpu_id);
}
static unsigned int poll_idle_us(void) {
static int inited = 0;
static unsigned int idle_us = 0;
const char *v;
char *end = NULL;
unsigned long parsed;
if (!inited) {
v = getenv("ZVFS_SPDK_POLL_US");
if (v && v[0] != '\0') {
parsed = strtoul(v, &end, 10);
if (end && *end == '\0' && parsed <= 1000000UL) {
idle_us = (unsigned int)parsed;
}
}
inited = 1;
}
return idle_us;
}
static void poll_thread_forever(struct spdk_thread *thread) {
const unsigned int idle_us = poll_idle_us();
while (true) {
int rc = spdk_thread_poll(thread, 0, 0);
if (rc == 0 && idle_us > 0) {
usleep(idle_us);
}
}
}
/*
@@ -50,6 +118,7 @@ static void push_err_resp(struct zvfs_req *req, int status) {
struct zvfs_resp *resp = calloc(1, sizeof(*resp));
if (!resp) {
SPDK_ERRLOG("push_err_resp: calloc failed, op_code=%u\n", req->opcode);
zvfs_conn_put(req->conn);
if (req->data) free(req->data);
if (req->add_ref_items) free(req->add_ref_items);
free(req);
@@ -61,7 +130,7 @@ static void push_err_resp(struct zvfs_req *req, int status) {
if (req->data) free(req->data);
if (req->add_ref_items) free(req->add_ref_items);
free(req);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
/** ===========================================================
@@ -188,6 +257,7 @@ static void *md_poller_fn(void *arg) {
struct thread_bootstrap_ctx *boot = arg;
struct zvfs_io_thread *slot = &g_engine.thread_pool[0];
bind_current_thread_cpu("md_thread", ZVFS_MD_CPU);
spdk_set_thread(slot->thread);
int rc = load_json_config(slot->thread, boot->json_file);
@@ -206,10 +276,7 @@ notify:
slot->ready = true;
/* 持续 poll处理所有通过 spdk_thread_send_msg 分发的 md 操作 */
while (true) {
spdk_thread_poll(slot->thread, 0, 0);
usleep(100);
}
poll_thread_forever(slot->thread);
return NULL;
}
@@ -217,8 +284,10 @@ notify:
static void *io_poller_fn(void *arg) {
struct thread_bootstrap_ctx *boot = arg;
int idx = boot->idx;
int cpu_id = ZVFS_IO_CPU_START + (idx - 1);
struct zvfs_io_thread *slot = &g_engine.thread_pool[idx];
bind_current_thread_cpu("io_thread", cpu_id);
spdk_set_thread(slot->thread);
/* 等待 md 线程完成 blobstore 初始化 */
@@ -252,10 +321,7 @@ static void *io_poller_fn(void *arg) {
slot->ready = true;
/* 持续 poll处理通过 spdk_thread_send_msg 分发的 IO 操作 */
while (true) {
spdk_thread_poll(slot->thread, 0, 0);
usleep(100);
}
poll_thread_forever(slot->thread);
return NULL;
}
@@ -447,6 +513,7 @@ struct resize_ctx {
struct sync_ctx {
struct zvfs_req *req;
struct zvfs_blob_handle *handle;
uint64_t spdk_start_ns;
};
struct close_ctx {
@@ -471,6 +538,8 @@ struct io_ctx {
uint64_t lba_len;
uint32_t buf_off;
void *dma_buf;
uint64_t spdk_start_ns;
uint64_t phase1_done_ns;
};
struct write_autogrow_ctx {
@@ -589,7 +658,7 @@ static void create_sync_cb(void *arg, int bserrno) {
free(cctx->req);
free(cctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
int blob_create(struct zvfs_req *req) {
@@ -646,7 +715,7 @@ static void blob_open_done_cb(void *arg, struct spdk_blob *blob, int bserrno) {
free(octx->req);
free(octx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void do_blob_open(void *arg) {
@@ -678,7 +747,7 @@ static void blob_resize_done_cb(void *arg, int bserrno) {
free(rctx->req);
free(rctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void do_blob_resize(void *arg) {
@@ -712,14 +781,19 @@ static void blob_sync_md_done_cb(void *arg, int bserrno) {
resp->opcode = sctx->req->opcode;
resp->status = bserrno;
resp->conn = sctx->req->conn;
resp_trace_init_from_req(resp, sctx->req);
resp->trace.spdk_start_ns = sctx->spdk_start_ns;
resp->trace.spdk_done_ns = now_mono_ns();
resp->trace.cq_push_ns = now_mono_ns();
free(sctx->req);
free(sctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void do_blob_sync_md(void *arg) {
struct sync_ctx *sctx = arg;
sctx->spdk_start_ns = now_mono_ns();
spdk_blob_sync_md(sctx->handle->blob, blob_sync_md_done_cb, sctx);
}
@@ -753,7 +827,7 @@ static void blob_close_done_cb(void *arg, int bserrno) {
free(cctx->req);
free(cctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void do_blob_close(void *arg) {
@@ -782,7 +856,7 @@ int blob_close(struct zvfs_req *req) {
resp->status = 0;
resp->conn = req->conn;
free(req);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
return 0;
}
break;
@@ -810,7 +884,7 @@ static void blob_delete_done_cb(void *arg, int bserrno) {
free(dctx->req);
free(dctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void do_blob_delete(void *arg) {
@@ -848,7 +922,7 @@ static void blobstore_reset_finish(struct reset_ctx *rctx, int status) {
free(rctx->req);
free(rctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
atomic_store(&g_bs_resetting, false);
}
@@ -951,7 +1025,7 @@ static void blob_read_done_cb(void *arg, int bserrno) {
free(ioctx->req);
free(ioctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void do_blob_read(void *arg) {
@@ -1036,10 +1110,18 @@ static void blob_write_writephase_cb(void *arg, int bserrno) {
resp->status = 0;
resp->conn = ioctx->req->conn;
resp->bytes_written = ioctx->req->length;
resp_trace_init_from_req(resp, ioctx->req);
resp->trace.spdk_start_ns = ioctx->spdk_start_ns;
resp->trace.phase1_done_ns = ioctx->phase1_done_ns;
resp->trace.spdk_done_ns = now_mono_ns();
if (ioctx->phase1_done_ns != 0) {
resp->trace.flags |= ZVFS_RESP_TRACE_F_PHASE1_VALID;
}
resp->trace.cq_push_ns = now_mono_ns();
free(ioctx->req);
free(ioctx);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void blob_write_readphase_cb(void *arg, int bserrno) {
@@ -1052,6 +1134,8 @@ static void blob_write_readphase_cb(void *arg, int bserrno) {
return;
}
ioctx->phase1_done_ns = now_mono_ns();
/* read-modify: 将用户数据覆盖到 dma_buf 的对应区域 */
memcpy((uint8_t *)ioctx->dma_buf + ioctx->buf_off,
ioctx->req->data, ioctx->req->length);
@@ -1060,10 +1144,18 @@ static void blob_write_readphase_cb(void *arg, int bserrno) {
ioctx->lba_off, ioctx->lba_len, blob_write_writephase_cb, ioctx);
}
static void blob_write_direct_submit(struct io_ctx *ioctx) {
memcpy(ioctx->dma_buf, ioctx->req->data, (size_t)ioctx->req->length);
spdk_blob_io_write(ioctx->handle->blob, ioctx->channel, ioctx->dma_buf,
ioctx->lba_off, ioctx->lba_len, blob_write_writephase_cb, ioctx);
}
static void do_blob_write(void *arg) {
struct io_ctx *ioctx = arg;
struct zvfs_blob_handle *handle = ioctx->handle;
ioctx->spdk_start_ns = now_mono_ns();
uint64_t end = 0;
if (__builtin_add_overflow(ioctx->req->offset, ioctx->req->length, &end)) {
push_err_resp(ioctx->req, -EOVERFLOW);
@@ -1105,6 +1197,12 @@ static void do_blob_write(void *arg) {
ioctx->buf_off = buf_off;
ioctx->dma_buf = dma_buf;
if (buf_off == 0 &&
ioctx->req->length == lba_len * g_engine.io_unit_size) {
blob_write_direct_submit(ioctx);
return;
}
spdk_blob_io_read(handle->blob, ioctx->channel, ioctx->dma_buf,
lba_off, lba_len, blob_write_readphase_cb, ioctx);
}

View File

@@ -1,6 +1,6 @@
#include "spdk_engine_wrapper.h"
#include "spdk_engine.h"
#include "ipc_cq.h"
#include "ipc_reactor.h"
#include <spdk/log.h>
#include <errno.h>
@@ -11,6 +11,7 @@ static void push_err_resp(struct zvfs_req *req, int status) {
struct zvfs_resp *resp = calloc(1, sizeof(*resp));
if (!resp) {
SPDK_ERRLOG("push_err_resp: calloc failed, op_code=%u\n", req->opcode);
zvfs_conn_put(req->conn);
if (req->data) free(req->data);
if (req->add_ref_items) free(req->add_ref_items);
free(req);
@@ -22,13 +23,14 @@ static void push_err_resp(struct zvfs_req *req, int status) {
if (req->data) free(req->data);
if (req->add_ref_items) free(req->add_ref_items);
free(req);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
static void push_ok_resp(struct zvfs_req *req) {
struct zvfs_resp *resp = calloc(1, sizeof(*resp));
if (!resp) {
SPDK_ERRLOG("push_ok_resp: calloc failed, op_code=%u\n", req->opcode);
zvfs_conn_put(req->conn);
if (req->data) free(req->data);
if (req->add_ref_items) free(req->add_ref_items);
free(req);
@@ -40,7 +42,7 @@ static void push_ok_resp(struct zvfs_req *req) {
if (req->data) free(req->data);
if (req->add_ref_items) free(req->add_ref_items);
free(req);
CQ_Push(g_cq, resp);
zvfs_conn_submit_resp(resp);
}
/** hash map op */

Binary file not shown.

View File

@@ -19,96 +19,7 @@
#include <pthread.h>
#include <stdio.h>
/* ------------------------------------------------------------------ */
/* 内部open/openat 调试日志 */
/* ------------------------------------------------------------------ */
static inline const char *
zvfs_dbg_str(const char *s)
{
return s ? s : "(null)";
}
static int
zvfs_debug_open_enabled(void)
{
static int inited = 0;
static int enabled = 0;
const char *v;
if (!inited) {
v = getenv("ZVFS_DEBUG_OPEN");
enabled = (v && v[0] != '\0' && strcmp(v, "0") != 0);
inited = 1;
}
return enabled;
}
static const char *
zvfs_debug_open_match(void)
{
static int inited = 0;
static const char *match = NULL;
if (!inited) {
match = getenv("ZVFS_DEBUG_OPEN_MATCH");
if (match && match[0] == '\0') {
match = NULL;
}
inited = 1;
}
return match;
}
static int
zvfs_debug_open_should_log(const char *path1, const char *path2)
{
const char *match;
if (!zvfs_debug_open_enabled()) {
return 0;
}
match = zvfs_debug_open_match();
if (!match) {
return 1;
}
if (path1 && strstr(path1, match)) {
return 1;
}
if (path2 && strstr(path2, match)) {
return 1;
}
return 0;
}
static void
zvfs_debug_open_log(const char *path1, const char *path2, const char *fmt, ...)
{
va_list ap;
if (!zvfs_debug_open_should_log(path1, path2)) {
return;
}
fprintf(stderr, "[zvfs][open-dbg][pid=%d][tid=%lu] ",
getpid(), (unsigned long)pthread_self());
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fputc('\n', stderr);
}
static int
zvfs_debug_has_fd_mapping(int fd)
{
int found = 0;
pthread_mutex_lock(&g_fs.fd_mu);
found = (openfile_lookup(fd) != NULL);
pthread_mutex_unlock(&g_fs.fd_mu);
return found;
}
#define zvfs_debug_open_log(...) ((void)0)
/* close 路径辅助:在文件后半段实现。 */
static int zvfs_detach_fd_mapping(int fd, int do_sync_md);
@@ -393,10 +304,6 @@ zvfs_open_impl(int real_fd, const char *abspath, int flags, mode_t mode)
/* 未命中:从 xattr 读 blob_id可能是进程首次 open */
if (zvfs_xattr_read_blob_id(real_fd, &blob_id) < 0) {
/* xattr 不存在:不是 zvfs 管理的文件,降级透传 */
int saved = errno;
zvfs_debug_open_log(abspath, NULL,
"open existing xattr_read miss errno=%d(%s), passthrough real_fd=%d",
saved, strerror(saved), real_fd);
return real_fd; /* 直接返回,不做任何包装 */
}
zvfs_debug_open_log(abspath, NULL,

View File

@@ -17,106 +17,10 @@
#include <stdarg.h>
#include <pthread.h>
#include <stdint.h>
#include <limits.h>
#include <unistd.h>
#include <bits/types/struct_FILE.h>
/* ------------------------------------------------------------------ */
/* 内部read/pread 调试日志 */
/* ------------------------------------------------------------------ */
static int
zvfs_debug_rw_enabled(void)
{
static int inited = 0;
static int enabled = 0;
const char *v;
if (!inited) {
v = getenv("ZVFS_DEBUG_RW");
enabled = (v && v[0] != '\0' && strcmp(v, "0") != 0);
inited = 1;
}
return enabled;
}
static int
zvfs_debug_rw_fd_match(int fd)
{
static int inited = 0;
static int has_filter = 0;
static int filter_fd = -1;
const char *v;
char *end = NULL;
long parsed;
if (!inited) {
v = getenv("ZVFS_DEBUG_RW_FD");
if (v && v[0] != '\0') {
parsed = strtol(v, &end, 10);
if (end && *end == '\0' && parsed >= 0 && parsed <= INT_MAX) {
has_filter = 1;
filter_fd = (int)parsed;
}
}
inited = 1;
}
if (!has_filter) {
return 1;
}
return fd == filter_fd;
}
static void
zvfs_debug_rw_log(int fd, const char *fmt, ...)
{
va_list ap;
if (!zvfs_debug_rw_enabled() || !zvfs_debug_rw_fd_match(fd)) {
return;
}
fprintf(stderr, "[zvfs][rw-dbg][pid=%d][tid=%lu][fd=%d] ",
getpid(), (unsigned long)pthread_self(), fd);
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fputc('\n', stderr);
}
static const char *
zvfs_debug_fd_target(int fd)
{
static __thread char target[PATH_MAX];
char linkpath[64];
ssize_t n;
if (fd < 0) {
return "(invalid-fd)";
}
snprintf(linkpath, sizeof(linkpath), "/proc/self/fd/%d", fd);
n = readlink(linkpath, target, sizeof(target) - 1);
if (n < 0) {
return "(unknown)";
}
target[n] = '\0';
return target;
}
static uint64_t
zvfs_debug_logical_size(struct zvfs_open_file *of)
{
uint64_t size;
if (!of || !of->inode) {
return 0;
}
pthread_mutex_lock(&of->inode->mu);
size = of->inode->logical_size;
pthread_mutex_unlock(&of->inode->mu);
return size;
}
#define zvfs_debug_rw_log(...) ((void)0)
/* ------------------------------------------------------------------ */
/* 内部:单段 pread / pwrite不修改 of->offset */
@@ -420,7 +324,11 @@ zvfs_vfscanf_impl(FILE *stream, const char *format, va_list ap, int use_isoc99)
}
uint64_t cur_off = of->offset;
uint64_t logical_size = zvfs_debug_logical_size(of);
uint64_t logical_size;
pthread_mutex_lock(&of->inode->mu);
logical_size = of->inode->logical_size;
pthread_mutex_unlock(&of->inode->mu);
uint64_t remain64 = (cur_off < logical_size) ? (logical_size - cur_off) : 0;
if (remain64 == 0) {
stream->_flags |= _IO_EOF_SEEN;
@@ -516,7 +424,6 @@ read(int fd, void *buf, size_t count)
{
ZVFS_HOOK_ENTER();
int in_hook = ZVFS_IN_HOOK();
struct zvfs_open_file *of = get_of(fd);
if (!of) {
@@ -573,7 +480,6 @@ pread(int fd, void *buf, size_t count, off_t offset)
{
ZVFS_HOOK_ENTER();
int in_hook = ZVFS_IN_HOOK();
struct zvfs_open_file *of = get_of(fd);
if (!of) {

View File

@@ -3,9 +3,6 @@
#include <stdlib.h>
#include <string.h>
#define ZVFS_REQ_HEADER_WIRE_SIZE (sizeof(uint32_t) + sizeof(uint32_t))
#define ZVFS_RESP_HEADER_WIRE_SIZE (sizeof(uint32_t) + sizeof(int32_t) + sizeof(uint32_t))
static int write_bytes(uint8_t **p, size_t *remaining, const void *src, size_t n) {
if (*remaining < n) {
return -1;
@@ -50,6 +47,50 @@ static int read_u64(const uint8_t **p, size_t *remaining, uint64_t *v) {
return read_bytes(p, remaining, v, sizeof(*v));
}
static int write_resp_trace(uint8_t **p, size_t *remaining, const struct zvfs_resp_trace *trace) {
struct zvfs_resp_trace zero = {0};
if (!trace) {
trace = &zero;
}
if (write_u32(p, remaining, trace->flags) != 0 ||
write_u64(p, remaining, trace->req_rx_ns) != 0 ||
write_u64(p, remaining, trace->dispatch_ns) != 0 ||
write_u64(p, remaining, trace->spdk_start_ns) != 0 ||
write_u64(p, remaining, trace->phase1_done_ns) != 0 ||
write_u64(p, remaining, trace->spdk_done_ns) != 0 ||
write_u64(p, remaining, trace->cq_push_ns) != 0 ||
write_u64(p, remaining, trace->wake_write_ns) != 0 ||
write_u64(p, remaining, trace->reactor_wake_ns) != 0 ||
write_u64(p, remaining, trace->resp_tx_ns) != 0) {
return -1;
}
return 0;
}
static int read_resp_trace(const uint8_t **p, size_t *remaining, struct zvfs_resp_trace *trace) {
if (!trace) {
return -1;
}
if (read_u32(p, remaining, &trace->flags) != 0 ||
read_u64(p, remaining, &trace->req_rx_ns) != 0 ||
read_u64(p, remaining, &trace->dispatch_ns) != 0 ||
read_u64(p, remaining, &trace->spdk_start_ns) != 0 ||
read_u64(p, remaining, &trace->phase1_done_ns) != 0 ||
read_u64(p, remaining, &trace->spdk_done_ns) != 0 ||
read_u64(p, remaining, &trace->cq_push_ns) != 0 ||
read_u64(p, remaining, &trace->wake_write_ns) != 0 ||
read_u64(p, remaining, &trace->reactor_wake_ns) != 0 ||
read_u64(p, remaining, &trace->resp_tx_ns) != 0) {
return -1;
}
return 0;
}
static int valid_opcode(uint32_t opcode) {
return opcode >= ZVFS_OP_CREATE && opcode <= ZVFS_OP_RESET_BLOBSTORE;
}
@@ -220,13 +261,45 @@ size_t zvfs_serialize_req_write(const struct zvfs_req_write_body *body, uint8_t
if (body->length > 0 && !body->data) {
return 0;
}
if (zvfs_serialize_req_write_fixed(body, buf, buf_len) != ZVFS_REQ_WRITE_FIXED_WIRE_SIZE) {
return 0;
}
p += ZVFS_REQ_WRITE_FIXED_WIRE_SIZE;
remaining -= ZVFS_REQ_WRITE_FIXED_WIRE_SIZE;
if (body->length > 0 && write_bytes(&p, &remaining, body->data, (size_t)body->length) != 0) {
return 0;
}
return (size_t)(p - buf);
}
size_t zvfs_serialize_req_write_fixed(const struct zvfs_req_write_body *body, uint8_t *buf, size_t buf_len) {
uint8_t *p = buf;
size_t remaining = buf_len;
if (!body || !buf) {
return 0;
}
if (write_u64(&p, &remaining, body->handle_id) != 0 ||
write_u64(&p, &remaining, body->offset) != 0 ||
write_u64(&p, &remaining, body->length) != 0 ||
write_u32(&p, &remaining, body->flags) != 0) {
return 0;
}
if (body->length > 0 && write_bytes(&p, &remaining, body->data, (size_t)body->length) != 0) {
return (size_t)(p - buf);
}
size_t zvfs_deserialize_req_write_fixed(const uint8_t *buf, size_t buf_len, struct zvfs_req_write_body *body) {
const uint8_t *p = buf;
size_t remaining = buf_len;
if (!body || !buf) {
return 0;
}
memset(body, 0, sizeof(*body));
if (read_u64(&p, &remaining, &body->handle_id) != 0 ||
read_u64(&p, &remaining, &body->offset) != 0 ||
read_u64(&p, &remaining, &body->length) != 0 ||
read_u32(&p, &remaining, &body->flags) != 0) {
return 0;
}
return (size_t)(p - buf);
@@ -239,14 +312,11 @@ size_t zvfs_deserialize_req_write(const uint8_t *buf, size_t buf_len, struct zvf
if (!body || !buf) {
return 0;
}
body->data = NULL;
body->flags = 0;
if (read_u64(&p, &remaining, &body->handle_id) != 0 ||
read_u64(&p, &remaining, &body->offset) != 0 ||
read_u64(&p, &remaining, &body->length) != 0 ||
read_u32(&p, &remaining, &body->flags) != 0) {
if (zvfs_deserialize_req_write_fixed(buf, buf_len, body) != ZVFS_REQ_WRITE_FIXED_WIRE_SIZE) {
return 0;
}
p += ZVFS_REQ_WRITE_FIXED_WIRE_SIZE;
remaining -= ZVFS_REQ_WRITE_FIXED_WIRE_SIZE;
if (body->length > remaining) {
return 0;
@@ -570,7 +640,8 @@ size_t zvfs_serialize_resp_write(const struct zvfs_resp_write_body *body, uint8_
if (!body || !buf) {
return 0;
}
if (write_u64(&p, &remaining, body->bytes_written) != 0) {
if (write_u64(&p, &remaining, body->bytes_written) != 0 ||
write_resp_trace(&p, &remaining, &body->trace) != 0) {
return 0;
}
return (size_t)(p - buf);
@@ -583,7 +654,10 @@ size_t zvfs_deserialize_resp_write(const uint8_t *buf, size_t buf_len, struct zv
if (!body || !buf) {
return 0;
}
if (read_u64(&p, &remaining, &body->bytes_written) != 0) {
memset(body, 0, sizeof(*body));
if (read_u64(&p, &remaining, &body->bytes_written) != 0 ||
read_resp_trace(&p, &remaining, &body->trace) != 0) {
return 0;
}
return (size_t)(p - buf);
@@ -601,16 +675,31 @@ size_t zvfs_deserialize_resp_resize(const uint8_t *buf, size_t buf_len) {
return 0;
}
size_t zvfs_serialize_resp_sync_md(uint8_t *buf, size_t buf_len) {
(void)buf;
(void)buf_len;
size_t zvfs_serialize_resp_sync_md(const struct zvfs_resp_sync_md_body *body, uint8_t *buf, size_t buf_len) {
uint8_t *p = buf;
size_t remaining = buf_len;
if (!body || !buf) {
return 0;
}
if (write_resp_trace(&p, &remaining, &body->trace) != 0) {
return 0;
}
return (size_t)(p - buf);
}
size_t zvfs_deserialize_resp_sync_md(const uint8_t *buf, size_t buf_len) {
(void)buf;
(void)buf_len;
size_t zvfs_deserialize_resp_sync_md(const uint8_t *buf, size_t buf_len, struct zvfs_resp_sync_md_body *body) {
const uint8_t *p = buf;
size_t remaining = buf_len;
if (!body || !buf) {
return 0;
}
memset(body, 0, sizeof(*body));
if (read_resp_trace(&p, &remaining, &body->trace) != 0) {
return 0;
}
return (size_t)(p - buf);
}
size_t zvfs_serialize_resp_close(uint8_t *buf, size_t buf_len) {
@@ -935,13 +1024,26 @@ size_t zvfs_serialize_resp(struct zvfs_resp *resp, uint8_t *buf, size_t buf_len)
break;
}
case ZVFS_OP_WRITE: {
struct zvfs_resp_write_body body = { .bytes_written = resp->bytes_written };
struct zvfs_resp_write_body body = {
.bytes_written = resp->bytes_written,
.trace = resp->trace,
};
body_len = zvfs_serialize_resp_write(&body, buf + ZVFS_RESP_HEADER_WIRE_SIZE,
buf_len - ZVFS_RESP_HEADER_WIRE_SIZE);
break;
}
case ZVFS_OP_RESIZE:
body_len = 0;
break;
case ZVFS_OP_SYNC_MD:
{
struct zvfs_resp_sync_md_body body = {
.trace = resp->trace,
};
body_len = zvfs_serialize_resp_sync_md(&body, buf + ZVFS_RESP_HEADER_WIRE_SIZE,
buf_len - ZVFS_RESP_HEADER_WIRE_SIZE);
}
break;
case ZVFS_OP_CLOSE:
case ZVFS_OP_DELETE:
case ZVFS_OP_ADD_REF:
@@ -1036,11 +1138,24 @@ size_t zvfs_deserialize_resp(uint8_t *buf, size_t buf_len, struct zvfs_resp *res
consumed = zvfs_deserialize_resp_write(payload, header.payload_len, &body);
if (consumed == header.payload_len) {
resp->bytes_written = body.bytes_written;
resp->trace = body.trace;
}
break;
}
case ZVFS_OP_RESIZE:
case ZVFS_OP_SYNC_MD:
if (header.payload_len != 0) {
return 0;
}
consumed = 0;
break;
case ZVFS_OP_SYNC_MD: {
struct zvfs_resp_sync_md_body body;
consumed = zvfs_deserialize_resp_sync_md(payload, header.payload_len, &body);
if (consumed == header.payload_len) {
resp->trace = body.trace;
}
break;
}
case ZVFS_OP_CLOSE:
case ZVFS_OP_DELETE:
case ZVFS_OP_ADD_REF:

View File

@@ -69,6 +69,13 @@ inline const char* cast_opcode2string(uint32_t op){
#define ZVFS_WRITE_F_AUTO_GROW (1u << 0)
#define ZVFS_RESP_TRACE_F_VALID (1u << 0)
#define ZVFS_RESP_TRACE_F_PHASE1_VALID (1u << 1)
#define ZVFS_REQ_HEADER_WIRE_SIZE (sizeof(uint32_t) + sizeof(uint32_t))
#define ZVFS_RESP_HEADER_WIRE_SIZE (sizeof(uint32_t) + sizeof(int32_t) + sizeof(uint32_t))
#define ZVFS_REQ_WRITE_FIXED_WIRE_SIZE (sizeof(uint64_t) * 3 + sizeof(uint32_t))
/* 最小固定头(同步阻塞场景,不含 request_id */
struct zvfs_req_header {
uint32_t opcode;
@@ -154,15 +161,33 @@ struct zvfs_resp_read_body {
void *data;
};
struct zvfs_resp_trace {
uint32_t flags;
uint64_t req_rx_ns;
uint64_t dispatch_ns;
uint64_t spdk_start_ns;
uint64_t phase1_done_ns;
uint64_t spdk_done_ns;
uint64_t cq_push_ns;
uint64_t wake_write_ns;
uint64_t reactor_wake_ns;
uint64_t resp_tx_ns;
};
struct zvfs_resp_write_body {
uint64_t bytes_written;
struct zvfs_resp_trace trace;
};
struct zvfs_resp_sync_md_body {
struct zvfs_resp_trace trace;
};
/* resize/sync_md/close/delete 成功时 body 为空 */
size_t zvfs_serialize_resp_resize(uint8_t *buf, size_t buf_len);
size_t zvfs_deserialize_resp_resize(const uint8_t *buf, size_t buf_len);
size_t zvfs_serialize_resp_sync_md(uint8_t *buf, size_t buf_len);
size_t zvfs_deserialize_resp_sync_md(const uint8_t *buf, size_t buf_len);
size_t zvfs_serialize_resp_sync_md(const struct zvfs_resp_sync_md_body *body, uint8_t *buf, size_t buf_len);
size_t zvfs_deserialize_resp_sync_md(const uint8_t *buf, size_t buf_len, struct zvfs_resp_sync_md_body *body);
size_t zvfs_serialize_resp_close(uint8_t *buf, size_t buf_len);
size_t zvfs_deserialize_resp_close(const uint8_t *buf, size_t buf_len);
size_t zvfs_serialize_resp_delete(uint8_t *buf, size_t buf_len);
@@ -188,6 +213,9 @@ struct zvfs_req {
struct zvfs_conn *conn;
struct zvfs_blob_handle *handle;
uint64_t trace_req_rx_ns;
uint64_t trace_dispatch_ns;
};
struct zvfs_resp {
@@ -202,6 +230,7 @@ struct zvfs_resp {
void *data;
uint64_t bytes_written;
struct zvfs_resp_trace trace;
struct zvfs_conn *conn;
};
@@ -226,6 +255,8 @@ size_t zvfs_serialize_req_read(const struct zvfs_req_read_body *body, uint8_t *b
size_t zvfs_deserialize_req_read(const uint8_t *buf, size_t buf_len, struct zvfs_req_read_body *body);
size_t zvfs_serialize_req_write(const struct zvfs_req_write_body *body, uint8_t *buf, size_t buf_len);
size_t zvfs_serialize_req_write_fixed(const struct zvfs_req_write_body *body, uint8_t *buf, size_t buf_len);
size_t zvfs_deserialize_req_write_fixed(const uint8_t *buf, size_t buf_len, struct zvfs_req_write_body *body);
size_t zvfs_deserialize_req_write(const uint8_t *buf, size_t buf_len, struct zvfs_req_write_body *body);
size_t zvfs_serialize_req_resize(const struct zvfs_req_resize_body *body, uint8_t *buf, size_t buf_len);

View File

@@ -8,8 +8,11 @@
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
@@ -28,6 +31,154 @@ static __thread struct ipc_client_ctx g_ipc_tls = {
.rx_len = 0,
};
static uint64_t now_mono_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
}
static int latency_trace_enabled(void) {
static int inited = 0;
static int enabled = 0;
const char *v;
if (!inited) {
v = getenv("ZVFS_TRACE_LATENCY");
enabled = (v && v[0] != '\0' && strcmp(v, "0") != 0);
inited = 1;
}
return enabled;
}
static uint64_t ns_to_us(uint64_t ns) {
return ns / 1000ULL;
}
static uint64_t ns_diff(uint64_t end, uint64_t start) {
return (end >= start) ? (end - start) : 0;
}
static void maybe_log_latency_trace(const struct zvfs_req *req, const struct zvfs_resp *resp,
uint64_t client_start_ns, uint64_t client_send_done_ns,
uint64_t client_recv_ns,
uint64_t client_parse_done_ns) {
uint64_t total_ns;
uint64_t server_ns;
uint64_t residual_ns;
uint64_t client_to_server_ns;
uint64_t client_send_ns;
uint64_t server_rx_wait_ns;
uint64_t server_to_client_ns;
uint64_t resp_wait_ns;
uint64_t client_parse_ns;
uint64_t rx_to_dispatch_ns;
uint64_t dispatch_to_spdk_ns;
uint64_t spdk_ns;
uint64_t spdk_post_ns;
uint64_t wake_write_ns;
uint64_t wake_sched_ns;
uint64_t wake_to_tx_ns;
uint64_t reply_q_ns;
uint64_t cq_wait_ns;
if (!latency_trace_enabled() || !req || !resp) {
return;
}
if (resp->status != 0 || (resp->trace.flags & ZVFS_RESP_TRACE_F_VALID) == 0) {
return;
}
if (req->opcode != ZVFS_OP_WRITE && req->opcode != ZVFS_OP_SYNC_MD) {
return;
}
total_ns = ns_diff(client_parse_done_ns, client_start_ns);
server_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.req_rx_ns);
residual_ns = (total_ns >= server_ns) ? (total_ns - server_ns) : 0;
client_to_server_ns = ns_diff(resp->trace.req_rx_ns, client_start_ns);
client_send_ns = ns_diff(client_send_done_ns, client_start_ns);
server_rx_wait_ns = ns_diff(resp->trace.req_rx_ns, client_send_done_ns);
server_to_client_ns = ns_diff(client_parse_done_ns, resp->trace.resp_tx_ns);
resp_wait_ns = ns_diff(client_recv_ns, resp->trace.resp_tx_ns);
client_parse_ns = ns_diff(client_parse_done_ns, client_recv_ns);
rx_to_dispatch_ns = ns_diff(resp->trace.dispatch_ns, resp->trace.req_rx_ns);
dispatch_to_spdk_ns = ns_diff(resp->trace.spdk_start_ns, resp->trace.dispatch_ns);
spdk_ns = ns_diff(resp->trace.spdk_done_ns, resp->trace.spdk_start_ns);
spdk_post_ns = ns_diff(resp->trace.cq_push_ns, resp->trace.spdk_done_ns);
wake_write_ns = ns_diff(resp->trace.wake_write_ns, resp->trace.cq_push_ns);
wake_sched_ns = ns_diff(resp->trace.reactor_wake_ns, resp->trace.wake_write_ns);
wake_to_tx_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.reactor_wake_ns);
reply_q_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.spdk_done_ns);
cq_wait_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.cq_push_ns);
if (req->opcode == ZVFS_OP_WRITE) {
uint64_t phase1_ns = 0;
uint64_t phase2_ns = spdk_ns;
if ((resp->trace.flags & ZVFS_RESP_TRACE_F_PHASE1_VALID) != 0) {
phase1_ns = ns_diff(resp->trace.phase1_done_ns, resp->trace.spdk_start_ns);
phase2_ns = ns_diff(resp->trace.spdk_done_ns, resp->trace.phase1_done_ns);
}
fprintf(stderr,
"[zvfs][trace][WRITE] total=%luus server=%luus residual=%luus "
"c2s=%luus send=%luus server_rx_wait=%luus "
"s2c=%luus resp_wait=%luus parse=%luus "
"rx_dispatch=%luus dispatch_spdk=%luus spdk=%luus "
"phase1=%luus phase2=%luus spdk_post=%luus "
"kick=%luus wake_sched=%luus wake_to_tx=%luus "
"reply_q=%luus cq_wait=%luus\n",
(unsigned long)ns_to_us(total_ns),
(unsigned long)ns_to_us(server_ns),
(unsigned long)ns_to_us(residual_ns),
(unsigned long)ns_to_us(client_to_server_ns),
(unsigned long)ns_to_us(client_send_ns),
(unsigned long)ns_to_us(server_rx_wait_ns),
(unsigned long)ns_to_us(server_to_client_ns),
(unsigned long)ns_to_us(resp_wait_ns),
(unsigned long)ns_to_us(client_parse_ns),
(unsigned long)ns_to_us(rx_to_dispatch_ns),
(unsigned long)ns_to_us(dispatch_to_spdk_ns),
(unsigned long)ns_to_us(spdk_ns),
(unsigned long)ns_to_us(phase1_ns),
(unsigned long)ns_to_us(phase2_ns),
(unsigned long)ns_to_us(spdk_post_ns),
(unsigned long)ns_to_us(wake_write_ns),
(unsigned long)ns_to_us(wake_sched_ns),
(unsigned long)ns_to_us(wake_to_tx_ns),
(unsigned long)ns_to_us(reply_q_ns),
(unsigned long)ns_to_us(cq_wait_ns));
return;
}
fprintf(stderr,
"[zvfs][trace][SYNC_MD] total=%luus server=%luus residual=%luus "
"c2s=%luus send=%luus server_rx_wait=%luus "
"s2c=%luus resp_wait=%luus parse=%luus "
"rx_dispatch=%luus dispatch_spdk=%luus spdk=%luus "
"spdk_post=%luus kick=%luus wake_sched=%luus wake_to_tx=%luus "
"reply_q=%luus cq_wait=%luus\n",
(unsigned long)ns_to_us(total_ns),
(unsigned long)ns_to_us(server_ns),
(unsigned long)ns_to_us(residual_ns),
(unsigned long)ns_to_us(client_to_server_ns),
(unsigned long)ns_to_us(client_send_ns),
(unsigned long)ns_to_us(server_rx_wait_ns),
(unsigned long)ns_to_us(server_to_client_ns),
(unsigned long)ns_to_us(resp_wait_ns),
(unsigned long)ns_to_us(client_parse_ns),
(unsigned long)ns_to_us(rx_to_dispatch_ns),
(unsigned long)ns_to_us(dispatch_to_spdk_ns),
(unsigned long)ns_to_us(spdk_ns),
(unsigned long)ns_to_us(spdk_post_ns),
(unsigned long)ns_to_us(wake_write_ns),
(unsigned long)ns_to_us(wake_sched_ns),
(unsigned long)ns_to_us(wake_to_tx_ns),
(unsigned long)ns_to_us(reply_q_ns),
(unsigned long)ns_to_us(cq_wait_ns));
}
static const char *zvfs_ipc_socket_path(void) {
const char *path = getenv("ZVFS_SOCKET_PATH");
if (path && path[0] != '\0') {
@@ -119,6 +270,40 @@ static int write_all(int fd, const uint8_t *buf, size_t len) {
return 0;
}
static int writev_all(int fd, struct iovec *iov, int iovcnt) {
int idx = 0;
while (idx < iovcnt) {
ssize_t n = writev(fd, &iov[idx], iovcnt - idx);
if (n > 0) {
size_t remaining = (size_t)n;
while (idx < iovcnt && remaining >= iov[idx].iov_len) {
remaining -= iov[idx].iov_len;
idx++;
}
if (idx < iovcnt && remaining > 0) {
iov[idx].iov_base = (uint8_t *)iov[idx].iov_base + remaining;
iov[idx].iov_len -= remaining;
}
continue;
}
if (n < 0 && errno == EINTR) {
continue;
}
if (n == 0) {
errno = EPIPE;
}
return -1;
}
return 0;
}
static int try_pop_resp(struct ipc_client_ctx *ctx, struct zvfs_resp *resp) {
size_t consumed = zvfs_deserialize_resp(ctx->rx_buf, ctx->rx_len, resp);
if (consumed == 0) {
@@ -206,6 +391,10 @@ static int set_errno_by_status(int status) {
static int ipc_do_req(struct zvfs_req *req, struct zvfs_resp *resp_out) {
struct ipc_client_ctx *ctx = &g_ipc_tls;
uint64_t client_start_ns;
uint64_t client_send_done_ns;
uint64_t client_recv_ns;
uint64_t client_done_ns;
if (ipc_ensure_buffers(ctx) != 0) {
return -1;
@@ -215,6 +404,46 @@ static int ipc_do_req(struct zvfs_req *req, struct zvfs_resp *resp_out) {
return -1;
}
client_start_ns = now_mono_ns();
if (req->opcode == ZVFS_OP_WRITE) {
struct zvfs_req_header header = {
.opcode = req->opcode,
.payload_len = (uint32_t)(ZVFS_REQ_WRITE_FIXED_WIRE_SIZE + req->length),
};
struct zvfs_req_write_body body = {
.handle_id = req->handle_id,
.offset = req->offset,
.length = req->length,
.flags = req->write_flags,
.data = req->data,
};
uint8_t hdr_buf[ZVFS_REQ_HEADER_WIRE_SIZE];
uint8_t meta_buf[ZVFS_REQ_WRITE_FIXED_WIRE_SIZE];
struct iovec iov[3];
if (req->length > UINT32_MAX) {
errno = EMSGSIZE;
return -1;
}
if (zvfs_serialize_req_header(&header, hdr_buf, sizeof(hdr_buf)) != sizeof(hdr_buf) ||
zvfs_serialize_req_write_fixed(&body, meta_buf, sizeof(meta_buf)) != sizeof(meta_buf)) {
errno = EMSGSIZE;
return -1;
}
iov[0].iov_base = hdr_buf;
iov[0].iov_len = sizeof(hdr_buf);
iov[1].iov_base = meta_buf;
iov[1].iov_len = sizeof(meta_buf);
iov[2].iov_base = req->data;
iov[2].iov_len = (size_t)req->length;
if (writev_all(ctx->fd, iov, 3) != 0) {
ipc_close_conn(ctx);
return -1;
}
} else {
size_t tx_len = zvfs_serialize_req(req, ctx->tx_buf, ZVFS_IPC_BUF_SIZE);
if (tx_len == 0) {
errno = EMSGSIZE;
@@ -225,12 +454,20 @@ static int ipc_do_req(struct zvfs_req *req, struct zvfs_resp *resp_out) {
ipc_close_conn(ctx);
return -1;
}
}
client_send_done_ns = now_mono_ns();
if (recv_one_resp(ctx, resp_out) != 0) {
ipc_close_conn(ctx);
return -1;
}
client_recv_ns = now_mono_ns();
client_done_ns = now_mono_ns();
maybe_log_latency_trace(req, resp_out, client_start_ns, client_send_done_ns,
client_recv_ns, client_done_ns);
return set_errno_by_status(resp_out->status);
}

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 95 KiB

After

Width:  |  Height:  |  Size: 94 KiB