#include "replica_shm.h" #include #include #include #include #include #include #include #define REPLICA_SHM_MAGIC 0x52504C43u /* 'RPLC' */ #define REPLICA_SHM_VER 1 static inline uint64_t align8_u64(uint64_t x) { return (x + 7u) & ~7ull; } int replica_shm_open(replica_shm_t *s, const char *name, size_t total_size, int create) { if (!s || !name || total_size < (sizeof(replica_shm_hdr_t) + 4096)) return -EINVAL; memset(s, 0, sizeof(*s)); int flags = O_RDWR; if (create) flags |= O_CREAT; int fd = shm_open(name, flags, 0666); if (fd < 0) return -errno; if (create) { if (ftruncate(fd, (off_t)total_size) != 0) { int e = -errno; close(fd); return e; } } void *p = mmap(NULL, total_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (p == MAP_FAILED) { int e = -errno; close(fd); return e; } s->fd = fd; s->map_size = total_size; s->hdr = (replica_shm_hdr_t *)p; s->data = (uint8_t *)p + sizeof(replica_shm_hdr_t); // 初始化头 if (create || s->hdr->magic != REPLICA_SHM_MAGIC) { memset(s->hdr, 0, sizeof(*s->hdr)); s->hdr->magic = REPLICA_SHM_MAGIC; s->hdr->version = REPLICA_SHM_VER; s->hdr->capacity = total_size - sizeof(replica_shm_hdr_t) - sizeof(replica_rec_hdr_t); s->hdr->write_off = 0; s->hdr->last_seq = 0; } printf("capcity:%ld\n", s->hdr->capacity); return 0; } int replica_shm_append(replica_shm_t *s, uint64_t seq, const void *buf, uint32_t len, uint32_t *out_off) { if (!s || !s->hdr || !s->data || !buf || len == 0 || !out_off) return -EINVAL; uint64_t cap = s->hdr->capacity; uint64_t off = __atomic_load_n(&s->hdr->write_off, __ATOMIC_RELAXED); uint64_t need = align8_u64((uint64_t)sizeof(replica_rec_hdr_t) + (uint64_t)len); // 简化:如果尾部放不下,则写一个“wrap marker”,回到 0 // wrap marker: hdr.len=0, seq 保留,表示消费者遇到就跳到 0 if (off + need > cap) { replica_rec_hdr_t wrap = { .seq = seq, .len = 0, .flags = 0, .crc32 = 0, .reserved = 0 }; memcpy(s->data + off, &wrap, sizeof(wrap)); __atomic_store_n(&s->hdr->write_off, 0, __ATOMIC_RELEASE); off = 0; if (need > cap) return -ENOSPC; // 单条记录太大 } replica_rec_hdr_t h = {0}; h.seq = seq; h.len = len; h.flags = 0; h.crc32 = 0; // 写 header + payload memcpy(s->data + off, &h, sizeof(h)); memcpy(s->data + off + sizeof(h), buf, len); // 发布 write_off / last_seq(保证消费者看到 payload) uint64_t new_off = off + need; __atomic_store_n(&s->hdr->last_seq, seq, __ATOMIC_RELEASE); __atomic_store_n(&s->hdr->write_off, new_off, __ATOMIC_RELEASE); *out_off = (uint32_t)off; return 0; } int replica_shm_peek(replica_shm_t *s, uint32_t off, replica_rec_hdr_t *out_hdr) { if (!s || !s->hdr || !s->data || !out_hdr) return -EINVAL; if ((uint64_t)off + sizeof(replica_rec_hdr_t) > s->hdr->capacity) return -EINVAL; memcpy(out_hdr, s->data + off, sizeof(*out_hdr)); return 0; } void replica_shm_close(replica_shm_t *s) { if (!s) return; if (s->hdr && s->map_size) munmap(s->hdr, s->map_size); if (s->fd > 0) close(s->fd); memset(s, 0, sizeof(*s)); }