#include "spdk_engine/io_engine.h" #include "common/config.h" #include "proto/ipc_proto.h" #include #include #include #include #include #include #include #include #include #include #include struct ipc_client_ctx { int fd; uint8_t *rx_buf; uint8_t *tx_buf; size_t rx_len; }; static __thread struct ipc_client_ctx g_ipc_tls = { .fd = -1, .rx_buf = NULL, .tx_buf = NULL, .rx_len = 0, }; static uint64_t now_mono_ns(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec; } static int latency_trace_enabled(void) { static int inited = 0; static int enabled = 0; const char *v; if (!inited) { v = getenv("ZVFS_TRACE_LATENCY"); enabled = (v && v[0] != '\0' && strcmp(v, "0") != 0); inited = 1; } return enabled; } static uint64_t ns_to_us(uint64_t ns) { return ns / 1000ULL; } static uint64_t ns_diff(uint64_t end, uint64_t start) { return (end >= start) ? (end - start) : 0; } static void maybe_log_latency_trace(const struct zvfs_req *req, const struct zvfs_resp *resp, uint64_t client_start_ns, uint64_t client_send_done_ns, uint64_t client_recv_ns, uint64_t client_parse_done_ns) { uint64_t total_ns; uint64_t server_ns; uint64_t residual_ns; uint64_t client_to_server_ns; uint64_t client_send_ns; uint64_t server_rx_wait_ns; uint64_t server_to_client_ns; uint64_t resp_wait_ns; uint64_t client_parse_ns; uint64_t rx_to_dispatch_ns; uint64_t dispatch_to_spdk_ns; uint64_t spdk_ns; uint64_t spdk_post_ns; uint64_t wake_write_ns; uint64_t wake_sched_ns; uint64_t wake_to_tx_ns; uint64_t reply_q_ns; uint64_t cq_wait_ns; if (!latency_trace_enabled() || !req || !resp) { return; } if (resp->status != 0 || (resp->trace.flags & ZVFS_RESP_TRACE_F_VALID) == 0) { return; } if (req->opcode != ZVFS_OP_WRITE && req->opcode != ZVFS_OP_SYNC_MD) { return; } total_ns = ns_diff(client_parse_done_ns, client_start_ns); server_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.req_rx_ns); residual_ns = (total_ns >= server_ns) ? (total_ns - server_ns) : 0; client_to_server_ns = ns_diff(resp->trace.req_rx_ns, client_start_ns); client_send_ns = ns_diff(client_send_done_ns, client_start_ns); server_rx_wait_ns = ns_diff(resp->trace.req_rx_ns, client_send_done_ns); server_to_client_ns = ns_diff(client_parse_done_ns, resp->trace.resp_tx_ns); resp_wait_ns = ns_diff(client_recv_ns, resp->trace.resp_tx_ns); client_parse_ns = ns_diff(client_parse_done_ns, client_recv_ns); rx_to_dispatch_ns = ns_diff(resp->trace.dispatch_ns, resp->trace.req_rx_ns); dispatch_to_spdk_ns = ns_diff(resp->trace.spdk_start_ns, resp->trace.dispatch_ns); spdk_ns = ns_diff(resp->trace.spdk_done_ns, resp->trace.spdk_start_ns); spdk_post_ns = ns_diff(resp->trace.cq_push_ns, resp->trace.spdk_done_ns); wake_write_ns = ns_diff(resp->trace.wake_write_ns, resp->trace.cq_push_ns); wake_sched_ns = ns_diff(resp->trace.reactor_wake_ns, resp->trace.wake_write_ns); wake_to_tx_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.reactor_wake_ns); reply_q_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.spdk_done_ns); cq_wait_ns = ns_diff(resp->trace.resp_tx_ns, resp->trace.cq_push_ns); if (req->opcode == ZVFS_OP_WRITE) { uint64_t phase1_ns = 0; uint64_t phase2_ns = spdk_ns; if ((resp->trace.flags & ZVFS_RESP_TRACE_F_PHASE1_VALID) != 0) { phase1_ns = ns_diff(resp->trace.phase1_done_ns, resp->trace.spdk_start_ns); phase2_ns = ns_diff(resp->trace.spdk_done_ns, resp->trace.phase1_done_ns); } fprintf(stderr, "[zvfs][trace][WRITE] total=%luus server=%luus residual=%luus " "c2s=%luus send=%luus server_rx_wait=%luus " "s2c=%luus resp_wait=%luus parse=%luus " "rx_dispatch=%luus dispatch_spdk=%luus spdk=%luus " "phase1=%luus phase2=%luus spdk_post=%luus " "kick=%luus wake_sched=%luus wake_to_tx=%luus " "reply_q=%luus cq_wait=%luus\n", (unsigned long)ns_to_us(total_ns), (unsigned long)ns_to_us(server_ns), (unsigned long)ns_to_us(residual_ns), (unsigned long)ns_to_us(client_to_server_ns), (unsigned long)ns_to_us(client_send_ns), (unsigned long)ns_to_us(server_rx_wait_ns), (unsigned long)ns_to_us(server_to_client_ns), (unsigned long)ns_to_us(resp_wait_ns), (unsigned long)ns_to_us(client_parse_ns), (unsigned long)ns_to_us(rx_to_dispatch_ns), (unsigned long)ns_to_us(dispatch_to_spdk_ns), (unsigned long)ns_to_us(spdk_ns), (unsigned long)ns_to_us(phase1_ns), (unsigned long)ns_to_us(phase2_ns), (unsigned long)ns_to_us(spdk_post_ns), (unsigned long)ns_to_us(wake_write_ns), (unsigned long)ns_to_us(wake_sched_ns), (unsigned long)ns_to_us(wake_to_tx_ns), (unsigned long)ns_to_us(reply_q_ns), (unsigned long)ns_to_us(cq_wait_ns)); return; } fprintf(stderr, "[zvfs][trace][SYNC_MD] total=%luus server=%luus residual=%luus " "c2s=%luus send=%luus server_rx_wait=%luus " "s2c=%luus resp_wait=%luus parse=%luus " "rx_dispatch=%luus dispatch_spdk=%luus spdk=%luus " "spdk_post=%luus kick=%luus wake_sched=%luus wake_to_tx=%luus " "reply_q=%luus cq_wait=%luus\n", (unsigned long)ns_to_us(total_ns), (unsigned long)ns_to_us(server_ns), (unsigned long)ns_to_us(residual_ns), (unsigned long)ns_to_us(client_to_server_ns), (unsigned long)ns_to_us(client_send_ns), (unsigned long)ns_to_us(server_rx_wait_ns), (unsigned long)ns_to_us(server_to_client_ns), (unsigned long)ns_to_us(resp_wait_ns), (unsigned long)ns_to_us(client_parse_ns), (unsigned long)ns_to_us(rx_to_dispatch_ns), (unsigned long)ns_to_us(dispatch_to_spdk_ns), (unsigned long)ns_to_us(spdk_ns), (unsigned long)ns_to_us(spdk_post_ns), (unsigned long)ns_to_us(wake_write_ns), (unsigned long)ns_to_us(wake_sched_ns), (unsigned long)ns_to_us(wake_to_tx_ns), (unsigned long)ns_to_us(reply_q_ns), (unsigned long)ns_to_us(cq_wait_ns)); } static const char *zvfs_ipc_socket_path(void) { const char *path = getenv("ZVFS_SOCKET_PATH"); if (path && path[0] != '\0') { return path; } path = getenv("ZVFS_IPC_SOCKET_PATH"); if (path && path[0] != '\0') { return path; } return ZVFS_IPC_DEFAULT_SOCKET_PATH; } static void ipc_close_conn(struct ipc_client_ctx *ctx) { if (ctx->fd >= 0) { close(ctx->fd); } ctx->fd = -1; ctx->rx_len = 0; } static int ipc_ensure_buffers(struct ipc_client_ctx *ctx) { if (!ctx->rx_buf) { ctx->rx_buf = (uint8_t *)malloc(ZVFS_IPC_BUF_SIZE); if (!ctx->rx_buf) { errno = ENOMEM; return -1; } } if (!ctx->tx_buf) { ctx->tx_buf = (uint8_t *)malloc(ZVFS_IPC_BUF_SIZE); if (!ctx->tx_buf) { errno = ENOMEM; return -1; } } return 0; } static int ipc_connect(struct ipc_client_ctx *ctx) { int fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd < 0) { return -1; } struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, zvfs_ipc_socket_path(), sizeof(addr.sun_path) - 1); if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { int saved = errno; close(fd); errno = saved; return -1; } ctx->fd = fd; ctx->rx_len = 0; return 0; } static int ipc_ensure_connected(struct ipc_client_ctx *ctx) { if (ctx->fd >= 0) { return 0; } return ipc_connect(ctx); } static int write_all(int fd, const uint8_t *buf, size_t len) { size_t off = 0; while (off < len) { ssize_t n = write(fd, buf + off, len - off); if (n > 0) { off += (size_t)n; continue; } if (n < 0 && errno == EINTR) { continue; } if (n == 0) { errno = EPIPE; } return -1; } return 0; } static int writev_all(int fd, struct iovec *iov, int iovcnt) { int idx = 0; while (idx < iovcnt) { ssize_t n = writev(fd, &iov[idx], iovcnt - idx); if (n > 0) { size_t remaining = (size_t)n; while (idx < iovcnt && remaining >= iov[idx].iov_len) { remaining -= iov[idx].iov_len; idx++; } if (idx < iovcnt && remaining > 0) { iov[idx].iov_base = (uint8_t *)iov[idx].iov_base + remaining; iov[idx].iov_len -= remaining; } continue; } if (n < 0 && errno == EINTR) { continue; } if (n == 0) { errno = EPIPE; } return -1; } return 0; } static int try_pop_resp(struct ipc_client_ctx *ctx, struct zvfs_resp *resp) { size_t consumed = zvfs_deserialize_resp(ctx->rx_buf, ctx->rx_len, resp); if (consumed == 0) { return 0; } if (consumed < ctx->rx_len) { memmove(ctx->rx_buf, ctx->rx_buf + consumed, ctx->rx_len - consumed); } ctx->rx_len -= consumed; return 1; } static int read_into_rx(struct ipc_client_ctx *ctx) { while (1) { if (ctx->rx_len >= ZVFS_IPC_BUF_SIZE) { errno = EOVERFLOW; return -1; } ssize_t n = read(ctx->fd, ctx->rx_buf + ctx->rx_len, ZVFS_IPC_BUF_SIZE - ctx->rx_len); if (n > 0) { ctx->rx_len += (size_t)n; return 0; } if (n == 0) { errno = ECONNRESET; return -1; } if (errno == EINTR) { continue; } return -1; } } static int recv_one_resp(struct ipc_client_ctx *ctx, struct zvfs_resp *resp_out) { while (1) { struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); int has_resp = try_pop_resp(ctx, &resp); if (has_resp == 1) { *resp_out = resp; return 0; } if (read_into_rx(ctx) != 0) { return -1; } if (ctx->rx_len == ZVFS_IPC_BUF_SIZE) { struct zvfs_resp probe; memset(&probe, 0, sizeof(probe)); if (zvfs_deserialize_resp(ctx->rx_buf, ctx->rx_len, &probe) == 0) { errno = EPROTO; return -1; } if (probe.data) { free(probe.data); } } } } static int set_errno_by_status(int status) { if (status == 0) { return 0; } if (status < 0) { errno = -status; } else { errno = status; } if (errno == 0) { errno = EIO; } return -1; } static int ipc_do_req(struct zvfs_req *req, struct zvfs_resp *resp_out) { struct ipc_client_ctx *ctx = &g_ipc_tls; uint64_t client_start_ns; uint64_t client_send_done_ns; uint64_t client_recv_ns; uint64_t client_done_ns; if (ipc_ensure_buffers(ctx) != 0) { return -1; } if (ipc_ensure_connected(ctx) != 0) { return -1; } client_start_ns = now_mono_ns(); if (req->opcode == ZVFS_OP_WRITE) { struct zvfs_req_header header = { .opcode = req->opcode, .payload_len = (uint32_t)(ZVFS_REQ_WRITE_FIXED_WIRE_SIZE + req->length), }; struct zvfs_req_write_body body = { .handle_id = req->handle_id, .offset = req->offset, .length = req->length, .flags = req->write_flags, .data = req->data, }; uint8_t hdr_buf[ZVFS_REQ_HEADER_WIRE_SIZE]; uint8_t meta_buf[ZVFS_REQ_WRITE_FIXED_WIRE_SIZE]; struct iovec iov[3]; if (req->length > UINT32_MAX) { errno = EMSGSIZE; return -1; } if (zvfs_serialize_req_header(&header, hdr_buf, sizeof(hdr_buf)) != sizeof(hdr_buf) || zvfs_serialize_req_write_fixed(&body, meta_buf, sizeof(meta_buf)) != sizeof(meta_buf)) { errno = EMSGSIZE; return -1; } iov[0].iov_base = hdr_buf; iov[0].iov_len = sizeof(hdr_buf); iov[1].iov_base = meta_buf; iov[1].iov_len = sizeof(meta_buf); iov[2].iov_base = req->data; iov[2].iov_len = (size_t)req->length; if (writev_all(ctx->fd, iov, 3) != 0) { ipc_close_conn(ctx); return -1; } } else { size_t tx_len = zvfs_serialize_req(req, ctx->tx_buf, ZVFS_IPC_BUF_SIZE); if (tx_len == 0) { errno = EMSGSIZE; return -1; } if (write_all(ctx->fd, ctx->tx_buf, tx_len) != 0) { ipc_close_conn(ctx); return -1; } } client_send_done_ns = now_mono_ns(); if (recv_one_resp(ctx, resp_out) != 0) { ipc_close_conn(ctx); return -1; } client_recv_ns = now_mono_ns(); client_done_ns = now_mono_ns(); maybe_log_latency_trace(req, resp_out, client_start_ns, client_send_done_ns, client_recv_ns, client_done_ns); return set_errno_by_status(resp_out->status); } int io_engine_init(void) { return 0; } int blob_create(uint64_t size_hint, int open_flags, uint64_t *blob_id_out, uint64_t *handle_id_out) { if (!blob_id_out || !handle_id_out) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_CREATE; req.size_hint = size_hint; req.open_flags = (uint32_t)open_flags; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } *blob_id_out = resp.blob_id; *handle_id_out = resp.handle_id; if (resp.data) { free(resp.data); } return 0; } int blob_open(uint64_t blob_id, int open_flags, uint64_t *handle_id_out) { if (!handle_id_out) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_OPEN; req.blob_id = blob_id; req.open_flags = (uint32_t)open_flags; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } *handle_id_out = resp.handle_id; if (resp.data) { free(resp.data); } return 0; } int blob_write_ex(uint64_t handle_id, uint64_t offset, const void *buf, size_t len, uint32_t write_flags) { if (len == 0) { return 0; } if (!buf || handle_id == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_WRITE; req.handle_id = handle_id; req.offset = offset; req.length = (uint64_t)len; req.write_flags = write_flags; req.data = (void *)buf; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.bytes_written != (uint64_t)len) { if (resp.data) { free(resp.data); } errno = EIO; return -1; } if (resp.data) { free(resp.data); } return 0; } int blob_write(uint64_t handle_id, uint64_t offset, const void *buf, size_t len) { return blob_write_ex(handle_id, offset, buf, len, 0); } int blob_write_shared(uint64_t handle_id, uint64_t logical_size, const void *buf, size_t len, uint32_t write_flags) { return blob_write_ex(handle_id, logical_size, buf, len, write_flags | ZVFS_RW_F_USE_HANDLE_POS); } ssize_t blob_read_ex(uint64_t handle_id, uint64_t offset, void *buf, size_t len, uint32_t read_flags) { if (len == 0) { return 0; } if (!buf || handle_id == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_READ; req.handle_id = handle_id; req.offset = offset; req.length = (uint64_t)len; req.write_flags = read_flags; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (!resp.data && resp.length != 0) { if (resp.data) { free(resp.data); } errno = EPROTO; return -1; } if (resp.length > len) { free(resp.data); errno = EPROTO; return -1; } if (resp.length > 0) { memcpy(buf, resp.data, (size_t)resp.length); } free(resp.data); return (ssize_t)resp.length; } int blob_read(uint64_t handle_id, uint64_t offset, void *buf, size_t len) { ssize_t nr = blob_read_ex(handle_id, offset, buf, len, 0); if (nr < 0) { return -1; } if ((size_t)nr != len) { errno = EPROTO; return -1; } return 0; } ssize_t blob_read_shared(uint64_t handle_id, uint64_t logical_size, void *buf, size_t len) { return blob_read_ex(handle_id, logical_size, buf, len, ZVFS_RW_F_USE_HANDLE_POS); } int blob_resize(uint64_t handle_id, uint64_t new_size) { if (handle_id == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_RESIZE; req.handle_id = handle_id; req.size_hint = new_size; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.data) { free(resp.data); } return 0; } int blob_sync_md(uint64_t handle_id) { if (handle_id == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_SYNC_MD; req.handle_id = handle_id; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.data) { free(resp.data); } return 0; } int blob_close(uint64_t handle_id) { if (handle_id == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_CLOSE; req.handle_id = handle_id; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.data) { free(resp.data); } return 0; } int blob_delete(uint64_t blob_id) { struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_DELETE; req.blob_id = blob_id; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.data) { free(resp.data); } return 0; } int blob_add_ref(uint64_t handle_id, uint32_t ref_delta) { if (handle_id == 0 || ref_delta == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_ADD_REF; req.handle_id = handle_id; req.ref_delta = ref_delta; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.data) { free(resp.data); } return 0; } int blob_add_ref_batch(const uint64_t *handle_ids, const uint32_t *ref_deltas, uint32_t count) { uint32_t i; struct zvfs_add_ref_item *items = NULL; if (!handle_ids || !ref_deltas || count == 0) { errno = EINVAL; return -1; } items = calloc(count, sizeof(*items)); if (!items) { errno = ENOMEM; return -1; } for (i = 0; i < count; i++) { if (handle_ids[i] == 0 || ref_deltas[i] == 0) { free(items); errno = EINVAL; return -1; } items[i].handle_id = handle_ids[i]; items[i].ref_delta = ref_deltas[i]; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_ADD_REF_BATCH; req.add_ref_count = count; req.add_ref_items = items; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { free(items); return -1; } free(items); if (resp.data) { free(resp.data); } return 0; } int blob_seek(uint64_t handle_id, int64_t offset, int whence, uint64_t logical_size, uint64_t *new_offset_out) { if (handle_id == 0 || !new_offset_out) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_SEEK; req.handle_id = handle_id; req.seek_offset = offset; req.seek_whence = (uint32_t)whence; req.size_hint = logical_size; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } *new_offset_out = resp.offset; if (resp.data) { free(resp.data); } return 0; } int blob_get_pos(uint64_t handle_id, uint64_t *offset_out) { if (handle_id == 0 || !offset_out) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_GET_POS; req.handle_id = handle_id; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } *offset_out = resp.offset; if (resp.data) { free(resp.data); } return 0; } int blob_get_flags(uint64_t handle_id, uint32_t *flags_out) { if (handle_id == 0 || !flags_out) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_GET_FLAGS; req.handle_id = handle_id; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } *flags_out = resp.handle_flags; if (resp.data) { free(resp.data); } return 0; } int blob_set_flags(uint64_t handle_id, uint32_t flags) { if (handle_id == 0) { errno = EINVAL; return -1; } struct zvfs_req req; memset(&req, 0, sizeof(req)); req.opcode = ZVFS_OP_SET_FLAGS; req.handle_id = handle_id; req.handle_flags = flags; struct zvfs_resp resp; memset(&resp, 0, sizeof(resp)); if (ipc_do_req(&req, &resp) != 0) { return -1; } if (resp.data) { free(resp.data); } return 0; }