#include "network/chainbuffer.h" #include #include #include #include #include #define CHAINBUFFER_DEFAULT_CHUNK 4096 #define CHAINBUFFER_MAX_IOV 16 #define CHAINBUFFER_DEFAULT_FREE_LIMIT 256 struct chain_buffer_node { struct chain_buffer_node *next; size_t cap; size_t rpos; size_t wpos; size_t used; unsigned refcnt; uint8_t data[]; }; static chain_buffer_node_t *alloc_node(size_t cap) { chain_buffer_node_t *node; node = (chain_buffer_node_t *)malloc(sizeof(*node) + cap); if (!node) { return NULL; } node->next = NULL; node->cap = cap; node->rpos = 0; node->wpos = 0; node->used = 0; node->refcnt = 0; return node; } static void reset_node_state(chain_buffer_node_t *node) { if (!node) { return; } node->next = NULL; node->rpos = 0; node->wpos = 0; node->used = 0; node->refcnt = 0; } static size_t min_size(size_t a, size_t b) { return a < b ? a : b; } static size_t node_read_seg1_len(const chain_buffer_node_t *node) { size_t tail_len; if (!node || node->used == 0) { return 0; } tail_len = node->cap - node->rpos; return min_size(node->used, tail_len); } static size_t node_write_seg1_len(const chain_buffer_node_t *node) { size_t free_total; size_t tail_len; if (!node || node->used >= node->cap) { return 0; } free_total = node->cap - node->used; tail_len = node->cap - node->wpos; return min_size(free_total, tail_len); } static void node_read_segments(const chain_buffer_node_t *node, uint8_t **p1, size_t *l1, uint8_t **p2, size_t *l2) { size_t seg1; size_t seg2; seg1 = node_read_seg1_len(node); seg2 = (node && node->used > seg1) ? (node->used - seg1) : 0; if (p1) { *p1 = (seg1 > 0) ? (uint8_t *)(node->data + node->rpos) : NULL; } if (l1) { *l1 = seg1; } if (p2) { *p2 = (seg2 > 0) ? (uint8_t *)node->data : NULL; } if (l2) { *l2 = seg2; } } static void node_write_segments(const chain_buffer_node_t *node, uint8_t **p1, size_t *l1, uint8_t **p2, size_t *l2) { size_t free_total; size_t seg1; size_t seg2; if (!node || node->used >= node->cap) { if (p1) *p1 = NULL; if (l1) *l1 = 0; if (p2) *p2 = NULL; if (l2) *l2 = 0; return; } free_total = node->cap - node->used; seg1 = node_write_seg1_len(node); seg2 = free_total - seg1; if (p1) { *p1 = (seg1 > 0) ? (uint8_t *)(node->data + node->wpos) : NULL; } if (l1) { *l1 = seg1; } if (p2) { *p2 = (seg2 > 0) ? (uint8_t *)node->data : NULL; } if (l2) { *l2 = seg2; } } static void node_advance_read(chain_buffer_node_t *node, size_t n) { if (!node || n == 0) { return; } node->rpos = (node->rpos + n) % node->cap; node->used -= n; } static void node_advance_write(chain_buffer_node_t *node, size_t n) { if (!node || n == 0) { return; } node->wpos = (node->wpos + n) % node->cap; node->used += n; } static chain_buffer_node_t *acquire_node(chain_buffer_t *buf, size_t cap) { chain_buffer_node_t *node = NULL; if (!buf || cap == 0) { return NULL; } if (cap == buf->chunk_size && buf->free_list) { node = buf->free_list; buf->free_list = node->next; buf->free_count--; reset_node_state(node); return node; } node = alloc_node(cap); return node; } static void recycle_node(chain_buffer_t *buf, chain_buffer_node_t *node) { if (!node) { return; } if (!buf || node->cap != buf->chunk_size || buf->free_count >= buf->free_limit) { free(node); return; } reset_node_state(node); node->next = buf->free_list; buf->free_list = node; buf->free_count++; } static int append_new_tail(chain_buffer_t *buf, size_t cap) { chain_buffer_node_t *node; node = acquire_node(buf, cap); if (!node) { return -1; } if (!buf->tail) { buf->head = node; buf->tail = node; } else { buf->tail->next = node; buf->tail = node; } return 0; } static void list_append_node(chain_buffer_list_t *list, chain_buffer_node_t *node) { if (!list || !node) { return; } node->next = NULL; if (!list->tail) { list->head = node; list->tail = node; } else { list->tail->next = node; list->tail = node; } } void chain_buffer_init(chain_buffer_t *buf, size_t chunk_size) { if (!buf) { return; } memset(buf, 0, sizeof(*buf)); buf->chunk_size = chunk_size ? chunk_size : CHAINBUFFER_DEFAULT_CHUNK; buf->free_limit = CHAINBUFFER_DEFAULT_FREE_LIMIT; } void chain_buffer_reset(chain_buffer_t *buf) { chain_buffer_node_t *node; if (!buf) { return; } node = buf->head; while (node) { chain_buffer_node_t *next = node->next; free(node); node = next; } node = buf->free_list; while (node) { chain_buffer_node_t *next = node->next; free(node); node = next; } free(buf->linear_cache); memset(buf, 0, sizeof(*buf)); } size_t chain_buffer_len(const chain_buffer_t *buf) { return buf ? buf->total_len : 0; } int chain_buffer_append(chain_buffer_t *buf, const void *data, size_t len) { const uint8_t *src; size_t remain; if (!buf || (!data && len > 0)) { errno = EINVAL; return -1; } if (len == 0) { return 0; } if (buf->total_len > (size_t)-1 - len) { errno = EOVERFLOW; return -1; } src = (const uint8_t *)data; remain = len; while (remain > 0) { chain_buffer_node_t *tail; uint8_t *p1; uint8_t *p2; size_t l1; size_t l2; size_t writable; size_t n; size_t c1; if (!buf->tail || buf->tail->used == buf->tail->cap) { size_t cap = remain > buf->chunk_size ? remain : buf->chunk_size; if (append_new_tail(buf, cap) < 0) { errno = ENOMEM; return -1; } } tail = buf->tail; node_write_segments(tail, &p1, &l1, &p2, &l2); writable = l1 + l2; if (writable == 0) { continue; } n = min_size(remain, writable); c1 = min_size(n, l1); if (c1 > 0) { memcpy(p1, src, c1); } if (n > c1) { memcpy(p2, src + c1, n - c1); } node_advance_write(tail, n); buf->total_len += n; src += n; remain -= n; } return 0; } size_t chain_buffer_drain(chain_buffer_t *buf, size_t len) { size_t remain; size_t drained; if (!buf || len == 0 || buf->total_len == 0) { return 0; } remain = len; drained = 0; while (remain > 0 && buf->head) { chain_buffer_node_t *node = buf->head; size_t n = min_size(remain, node->used); if (n == 0) { buf->head = node->next; if (!buf->head) { buf->tail = NULL; } recycle_node(buf, node); continue; } node_advance_read(node, n); buf->total_len -= n; drained += n; remain -= n; if (node->used == 0) { assert(node->refcnt == 0); buf->head = node->next; if (!buf->head) { buf->tail = NULL; } recycle_node(buf, node); } } return drained; } const uint8_t *chain_buffer_linearize(chain_buffer_t *buf, size_t *out_len) { size_t offset; if (!buf) { return NULL; } if (out_len) { *out_len = buf->total_len; } if (buf->total_len == 0) { return NULL; } if (buf->head == buf->tail && buf->head) { chain_buffer_node_t *node = buf->head; if (node->used == 0) { return NULL; } if (node_read_seg1_len(node) == node->used) { return node->data + node->rpos; } } if (buf->linear_cap < buf->total_len) { uint8_t *new_cache = (uint8_t *)realloc(buf->linear_cache, buf->total_len); if (!new_cache) { return NULL; } buf->linear_cache = new_cache; buf->linear_cap = buf->total_len; } offset = 0; for (chain_buffer_node_t *node = buf->head; node; node = node->next) { uint8_t *p1; uint8_t *p2; size_t l1; size_t l2; node_read_segments(node, &p1, &l1, &p2, &l2); if (l1 > 0) { memcpy(buf->linear_cache + offset, p1, l1); offset += l1; } if (l2 > 0) { memcpy(buf->linear_cache + offset, p2, l2); offset += l2; } } return buf->linear_cache; } ssize_t chain_buffer_send_fd(chain_buffer_t *buf, int fd, int flags) { struct iovec iov[CHAINBUFFER_MAX_IOV]; struct msghdr msg; size_t iovcnt; ssize_t n; if (!buf) { errno = EINVAL; return -1; } if (buf->total_len == 0 || !buf->head) { return 0; } iovcnt = 0; for (chain_buffer_node_t *node = buf->head; node && iovcnt < CHAINBUFFER_MAX_IOV; node = node->next) { uint8_t *p1; uint8_t *p2; size_t l1; size_t l2; node_read_segments(node, &p1, &l1, &p2, &l2); if (l1 > 0) { iov[iovcnt].iov_base = p1; iov[iovcnt].iov_len = l1; iovcnt++; if (iovcnt >= CHAINBUFFER_MAX_IOV) { break; } } if (l2 > 0) { iov[iovcnt].iov_base = p2; iov[iovcnt].iov_len = l2; iovcnt++; } } if (iovcnt == 0) { return 0; } memset(&msg, 0, sizeof(msg)); msg.msg_iov = iov; msg.msg_iovlen = iovcnt; n = sendmsg(fd, &msg, flags); if (n > 0) { chain_buffer_drain(buf, (size_t)n); } return n; } int chain_buffer_prepare_recv_iov(chain_buffer_t *buf, struct iovec *iov, int max_iov) { chain_buffer_node_t *tail; uint8_t *p1; uint8_t *p2; size_t l1; size_t l2; int iovcnt; if (!buf || !iov || max_iov <= 0) { errno = EINVAL; return -1; } if (!buf->tail || buf->tail->used == buf->tail->cap) { if (append_new_tail(buf, buf->chunk_size) < 0) { errno = ENOMEM; return -1; } } tail = buf->tail; node_write_segments(tail, &p1, &l1, &p2, &l2); iovcnt = 0; if (l1 > 0) { iov[iovcnt].iov_base = p1; iov[iovcnt].iov_len = l1; iovcnt++; } if (l2 > 0 && iovcnt < max_iov) { iov[iovcnt].iov_base = p2; iov[iovcnt].iov_len = l2; iovcnt++; } return iovcnt; } size_t chain_buffer_commit_recv(chain_buffer_t *buf, size_t len) { chain_buffer_node_t *tail; size_t free_total; if (!buf || len == 0) { return 0; } if (!buf->tail || buf->tail->used == buf->tail->cap) { return 0; } tail = buf->tail; free_total = tail->cap - tail->used; if (len > free_total) { len = free_total; } node_advance_write(tail, len); buf->total_len += len; return len; } void chain_buffer_list_init(chain_buffer_list_t *list) { if (!list) { return; } memset(list, 0, sizeof(*list)); } size_t chain_buffer_list_len(const chain_buffer_list_t *list) { return list ? list->total_len : 0; } int chain_buffer_list_iov(const chain_buffer_list_t *list, struct iovec *iov, int max_iov) { int iovcnt = 0; if (!list || !iov || max_iov <= 0) { errno = EINVAL; return -1; } for (chain_buffer_node_t *node = list->head; node && iovcnt < max_iov; node = node->next) { uint8_t *p1; uint8_t *p2; size_t l1; size_t l2; node_read_segments(node, &p1, &l1, &p2, &l2); if (l1 > 0) { iov[iovcnt].iov_base = p1; iov[iovcnt].iov_len = l1; iovcnt++; if (iovcnt >= max_iov) { break; } } if (l2 > 0) { iov[iovcnt].iov_base = p2; iov[iovcnt].iov_len = l2; iovcnt++; } } return iovcnt; } static int copy_prefix_from_node(chain_buffer_node_t *node, size_t len, uint8_t *dst) { uint8_t *p1; uint8_t *p2; size_t l1; size_t l2; size_t c1; if (!node || !dst || len == 0 || len > node->used) { return -1; } node_read_segments(node, &p1, &l1, &p2, &l2); c1 = min_size(len, l1); if (c1 > 0) { memcpy(dst, p1, c1); } if (len > c1) { memcpy(dst + c1, p2, len - c1); } return 0; } int chain_buffer_detach_prefix(chain_buffer_t *buf, size_t len, chain_buffer_list_t *out) { size_t remain; if (!buf || !out) { errno = EINVAL; return -1; } chain_buffer_list_init(out); if (len == 0) { return 0; } if (len > buf->total_len) { errno = EINVAL; return -1; } remain = len; while (remain > 0 && buf->head) { chain_buffer_node_t *node = buf->head; if (remain >= node->used) { size_t take = node->used; buf->head = node->next; if (!buf->head) { buf->tail = NULL; } node->next = NULL; node->refcnt = 1; list_append_node(out, node); out->total_len += take; buf->total_len -= take; remain -= take; continue; } { chain_buffer_node_t *part = acquire_node(buf, remain); if (!part) { chain_buffer_list_release(buf, out); errno = ENOMEM; return -1; } if (copy_prefix_from_node(node, remain, part->data) < 0) { recycle_node(buf, part); chain_buffer_list_release(buf, out); errno = EINVAL; return -1; } part->used = remain; part->wpos = remain % part->cap; part->rpos = 0; part->refcnt = 1; part->next = NULL; list_append_node(out, part); out->total_len += remain; node_advance_read(node, remain); buf->total_len -= remain; remain = 0; if (node->used == 0) { buf->head = node->next; if (!buf->head) { buf->tail = NULL; } recycle_node(buf, node); } } } return 0; } void chain_buffer_list_release(chain_buffer_t *owner, chain_buffer_list_t *list) { chain_buffer_node_t *node; if (!list) { return; } node = list->head; while (node) { chain_buffer_node_t *next = node->next; if (node->refcnt > 0) { node->refcnt--; } if (node->refcnt == 0) { if (owner) { recycle_node(owner, node); } else { free(node); } } node = next; } chain_buffer_list_init(list); }