#include "kvs_dump.h" #include "kvs_rw_tools.h" #include "memory/alloc_dispatch.h" #include "kvs_protocol_resp.h" #include "diskuring/diskuring.h" #include "common/config.h" #include #include #include #include #include #include int global_oplog_fd = -1; static off_t g_log_off = -1; extern AppConfig global_cfg; #define KVS_OPLOG_PAGE_SIZE (64u * 1024u) typedef struct oplog_buf { struct oplog_buf *next; off_t off; size_t used; uint8_t data[KVS_OPLOG_PAGE_SIZE]; } oplog_buf_t; static oplog_buf_t *g_oplog_idle_head = NULL; static oplog_buf_t *g_oplog_idle_tail = NULL; static oplog_buf_t *g_oplog_ready_head = NULL; static oplog_buf_t *g_oplog_ready_tail = NULL; static oplog_buf_t *g_oplog_cur = NULL; static pthread_mutex_t g_oplog_mu = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t g_sync_cv = PTHREAD_COND_INITIALIZER; static pthread_t g_sync_th; static int g_sync_started = 0; static int g_sync_stop = 0; static int g_sync_logfd = -1; static uint64_t g_sync_gen = 0; static uint64_t g_sync_synced_gen = 0; static inline void oplog_mark_dirty_locked(void) { g_sync_gen++; pthread_cond_signal(&g_sync_cv); } typedef struct { pthread_mutex_t mu; pthread_cond_t cv; int pending; int submit_failed; int fsync_err; } oplog_fsync_wait_t; typedef struct { oplog_fsync_wait_t *waiter; } oplog_fsync_arg_t; static void make_timeout_ms(struct timespec *ts, long ms) { if (!ts) { return; } clock_gettime(CLOCK_REALTIME, ts); ts->tv_sec += ms / 1000; ts->tv_nsec += (ms % 1000) * 1000000L; if (ts->tv_nsec >= 1000000000L) { ts->tv_sec += 1; ts->tv_nsec -= 1000000000L; } } static void oplog_fsync_done(task_t *t, void *arg) { oplog_fsync_arg_t *a = (oplog_fsync_arg_t *)arg; oplog_fsync_wait_t *w; if (!a || !a->waiter) { return; } w = a->waiter; pthread_mutex_lock(&w->mu); if (t && t->res < 0 && w->fsync_err == 0) { w->fsync_err = t->res; } if (w->pending > 0) { w->pending--; } if (w->pending == 0) { pthread_cond_signal(&w->cv); } pthread_mutex_unlock(&w->mu); } static int kvs_oplog_fsync_all_workers(int fd) { int i; int n; int rc = 0; oplog_fsync_wait_t w; oplog_fsync_arg_t *args = NULL; if (fd < 0 || !global_uring_ctx.workers || global_uring_ctx.worker_nr <= 0) { return -1; } memset(&w, 0, sizeof(w)); pthread_mutex_init(&w.mu, NULL); pthread_cond_init(&w.cv, NULL); n = global_uring_ctx.worker_nr; args = (oplog_fsync_arg_t *)kvs_malloc(sizeof(oplog_fsync_arg_t) * (size_t)n); if (!args) { pthread_cond_destroy(&w.cv); pthread_mutex_destroy(&w.mu); return -1; } for (i = 0; i < n; i++) { task_t *t; args[i].waiter = &w; pthread_mutex_lock(&w.mu); w.pending++; pthread_mutex_unlock(&w.mu); t = submit_fsync_ref(&global_uring_ctx, fd, i, 1, oplog_fsync_done, &args[i]); if (!t) { pthread_mutex_lock(&w.mu); w.pending--; w.submit_failed = 1; if (w.pending == 0) { pthread_cond_signal(&w.cv); } pthread_mutex_unlock(&w.mu); rc = -1; break; } } pthread_mutex_lock(&w.mu); while (w.pending > 0) { struct timespec ts; make_timeout_ms(&ts, 10); (void)pthread_cond_timedwait(&w.cv, &w.mu, &ts); if (w.pending > 0) { pthread_mutex_unlock(&w.mu); cleanup_finished_iouring_tasks(&global_uring_ctx); pthread_mutex_lock(&w.mu); } } if (w.fsync_err < 0 || w.submit_failed) { rc = -1; } pthread_mutex_unlock(&w.mu); cleanup_finished_iouring_tasks(&global_uring_ctx); kvs_free(args); pthread_cond_destroy(&w.cv); pthread_mutex_destroy(&w.mu); return rc; } static void oplog_push_tail(oplog_buf_t **head, oplog_buf_t **tail, oplog_buf_t *buf) { if (!head || !tail || !buf) { return; } buf->next = NULL; if (!*tail) { *head = *tail = buf; return; } (*tail)->next = buf; *tail = buf; } static void oplog_push_front(oplog_buf_t **head, oplog_buf_t **tail, oplog_buf_t *buf) { if (!head || !tail || !buf) { return; } if (!*head) { buf->next = NULL; *head = *tail = buf; return; } buf->next = *head; *head = buf; } static oplog_buf_t *oplog_pop_head(oplog_buf_t **head, oplog_buf_t **tail) { oplog_buf_t *buf; if (!head || !tail || !*head) { return NULL; } buf = *head; *head = buf->next; if (!*head) { *tail = NULL; } buf->next = NULL; return buf; } static oplog_buf_t *oplog_alloc_buf(void) { oplog_buf_t *buf = (oplog_buf_t *)kvs_malloc(sizeof(oplog_buf_t)); if (!buf) { return NULL; } buf->next = NULL; buf->off = 0; buf->used = 0; return buf; } static oplog_buf_t *oplog_borrow_buf(void) { oplog_buf_t *buf = oplog_pop_head(&g_oplog_idle_head, &g_oplog_idle_tail); if (buf) { buf->off = 0; buf->used = 0; return buf; } return oplog_alloc_buf(); } static void oplog_free_list(oplog_buf_t **head, oplog_buf_t **tail) { oplog_buf_t *buf; if (!head || !tail) { return; } while ((buf = oplog_pop_head(head, tail)) != NULL) { kvs_free(buf); } } static void oplog_pool_release_all(void) { if (g_oplog_cur) { kvs_free(g_oplog_cur); g_oplog_cur = NULL; } oplog_free_list(&g_oplog_idle_head, &g_oplog_idle_tail); oplog_free_list(&g_oplog_ready_head, &g_oplog_ready_tail); } static void oplog_recycle_done(task_t *t, void *arg) { oplog_buf_t *buf = (oplog_buf_t *)arg; (void)t; if (!buf) { return; } pthread_mutex_lock(&g_oplog_mu); buf->off = 0; buf->used = 0; oplog_push_tail(&g_oplog_idle_head, &g_oplog_idle_tail, buf); pthread_mutex_unlock(&g_oplog_mu); } static int kvs_oplog_submit_ready_buf(oplog_buf_t *buf, int logfd) { void *bufs[1]; size_t lens[1]; task_t *t; if (!buf || logfd < 0) { return -1; } if (buf->used == 0) { oplog_push_tail(&g_oplog_idle_head, &g_oplog_idle_tail, buf); return 0; } bufs[0] = (void *)buf->data; lens[0] = buf->used; t = submit_write_ref(&global_uring_ctx, logfd, bufs, lens, 1, buf->off, 0, oplog_recycle_done, buf); if (!t) { return -1; } return 0; } static int kvs_oplog_flush_internal(int logfd, int force) { oplog_buf_t *buf; if (logfd < 0) { return -1; } if (force && g_oplog_cur && g_oplog_cur->used > 0) { oplog_push_tail(&g_oplog_ready_head, &g_oplog_ready_tail, g_oplog_cur); g_oplog_cur = NULL; } while ((buf = oplog_pop_head(&g_oplog_ready_head, &g_oplog_ready_tail)) != NULL) { if (kvs_oplog_submit_ready_buf(buf, logfd) < 0) { oplog_push_front(&g_oplog_ready_head, &g_oplog_ready_tail, buf); return -1; } } return 0; } static void make_timeout_1s(struct timespec *ts) { if (!ts) { return; } clock_gettime(CLOCK_REALTIME, ts); ts->tv_sec += 1; } static void *oplog_sync_main(void *arg) { (void)arg; while (1) { uint64_t target_gen = 0; int fd = -1; int flush_ok = 0; struct timespec ts; make_timeout_1s(&ts); pthread_mutex_lock(&g_oplog_mu); (void)pthread_cond_timedwait(&g_sync_cv, &g_oplog_mu, &ts); if (g_sync_stop) { pthread_mutex_unlock(&g_oplog_mu); break; } if (global_cfg.oplog_sync_mode != OPLOG_SYNC_EVERY_SEC || g_sync_logfd < 0 || g_sync_synced_gen >= g_sync_gen) { pthread_mutex_unlock(&g_oplog_mu); continue; } target_gen = g_sync_gen; fd = g_sync_logfd; flush_ok = (kvs_oplog_flush_internal(fd, 1) == 0); pthread_mutex_unlock(&g_oplog_mu); if (!flush_ok) { continue; } if (kvs_oplog_fsync_all_workers(fd) == 0) { pthread_mutex_lock(&g_oplog_mu); if (g_sync_synced_gen < target_gen) { g_sync_synced_gen = target_gen; } pthread_mutex_unlock(&g_oplog_mu); } } return NULL; } static void oplog_sync_thread_stop_locked(void) { int need_join = g_sync_started; if (!need_join) { return; } g_sync_stop = 1; pthread_cond_broadcast(&g_sync_cv); pthread_mutex_unlock(&g_oplog_mu); pthread_join(g_sync_th, NULL); pthread_mutex_lock(&g_oplog_mu); g_sync_started = 0; g_sync_logfd = -1; } static int oplog_sync_thread_start_locked(int logfd) { if (g_sync_started) { return 0; } g_sync_stop = 0; g_sync_logfd = logfd; if (pthread_create(&g_sync_th, NULL, oplog_sync_main, NULL) != 0) { g_sync_logfd = -1; return -1; } g_sync_started = 1; return 0; } static int kvs_oplog_append_direct(const uint8_t *cmd, size_t len, int logfd) { uint32_t nlen; void *bufs[2]; size_t lens[2]; size_t total; off_t myoff; task_t *t; nlen = htonl((uint32_t)len); bufs[0] = (void *)&nlen; lens[0] = sizeof(nlen); bufs[1] = (void *)cmd; lens[1] = len; total = sizeof(nlen) + len; myoff = g_log_off; g_log_off += (off_t)total; t = submit_write(&global_uring_ctx, logfd, bufs, lens, 2, myoff); if (!t) { return -1; } oplog_mark_dirty_locked(); return 0; } int init_cmd_log(const char *file, int *logfd){ int rc = 0; if(!file) return -1; int fd = open(file, O_RDWR | O_CREAT , 0644); if(fd < 0) return -2; off_t off = lseek(fd, 0, SEEK_END); if (off < 0) { close(fd); return -2; } pthread_mutex_lock(&g_oplog_mu); oplog_sync_thread_stop_locked(); g_log_off = off; g_sync_gen = 0; g_sync_synced_gen = 0; g_sync_logfd = fd; oplog_pool_release_all(); if (global_cfg.oplog_sync_mode == OPLOG_SYNC_EVERY_SEC) { rc = oplog_sync_thread_start_locked(fd); } pthread_mutex_unlock(&g_oplog_mu); if (rc != 0) { close(fd); return -3; } *logfd = fd; return 0; } int destroy_cmd_log(int logfd){ if (logfd < 0) { return -1; } pthread_mutex_lock(&g_oplog_mu); oplog_sync_thread_stop_locked(); if (kvs_oplog_flush_internal(logfd, 1) < 0) { pthread_mutex_unlock(&g_oplog_mu); return -2; } pthread_mutex_unlock(&g_oplog_mu); if (kvs_oplog_fsync_all_workers(logfd) < 0) { return -3; } cleanup_finished_iouring_tasks(&global_uring_ctx); close(logfd); pthread_mutex_lock(&g_oplog_mu); oplog_pool_release_all(); g_log_off = -1; g_sync_gen = 0; g_sync_synced_gen = 0; g_sync_logfd = -1; pthread_mutex_unlock(&g_oplog_mu); global_oplog_fd = -1; return 0; } int kvs_oplog_buffer_append(const uint8_t *cmd, size_t len, int logfd){ if (logfd < 0 || !cmd || len == 0) return -1; if (len > UINT32_MAX) return -2; pthread_mutex_lock(&g_oplog_mu); if (g_log_off < 0) { pthread_mutex_unlock(&g_oplog_mu); return -3; } { size_t need = sizeof(uint32_t) + len; int became_full = 0; uint32_t nlen = htonl((uint32_t)len); if (need > KVS_OPLOG_PAGE_SIZE) { int rc = kvs_oplog_append_direct(cmd, len, logfd); pthread_mutex_unlock(&g_oplog_mu); return (rc == 0) ? KVS_OPLOG_BUF_NOT_FULL : -4; } if (!g_oplog_cur) { g_oplog_cur = oplog_borrow_buf(); if (!g_oplog_cur) { pthread_mutex_unlock(&g_oplog_mu); return -4; } g_oplog_cur->off = g_log_off; } if (g_oplog_cur->used + need > KVS_OPLOG_PAGE_SIZE) { if (g_oplog_cur->used > 0) { oplog_push_tail(&g_oplog_ready_head, &g_oplog_ready_tail, g_oplog_cur); became_full = 1; g_oplog_cur = NULL; } g_oplog_cur = oplog_borrow_buf(); if (!g_oplog_cur) { pthread_mutex_unlock(&g_oplog_mu); return -4; } g_oplog_cur->off = g_log_off; } memcpy(g_oplog_cur->data + g_oplog_cur->used, &nlen, sizeof(nlen)); g_oplog_cur->used += sizeof(nlen); memcpy(g_oplog_cur->data + g_oplog_cur->used, cmd, len); g_oplog_cur->used += len; g_log_off += (off_t)need; oplog_mark_dirty_locked(); if (g_oplog_cur->used == KVS_OPLOG_PAGE_SIZE) { oplog_push_tail(&g_oplog_ready_head, &g_oplog_ready_tail, g_oplog_cur); g_oplog_cur = NULL; became_full = 1; } pthread_mutex_unlock(&g_oplog_mu); return became_full ? KVS_OPLOG_BUF_FULL : KVS_OPLOG_BUF_NOT_FULL; } } int kvs_oplog_flush(int logfd, int force) { int rc; pthread_mutex_lock(&g_oplog_mu); rc = kvs_oplog_flush_internal(logfd, force); pthread_mutex_unlock(&g_oplog_mu); if (rc < 0) { return -1; } return 0; } int kvs_oplog_append(const uint8_t *cmd, size_t len, int logfd){ if (logfd < 0 || !cmd || len == 0) return -1; if (len > UINT32_MAX) return -2; pthread_mutex_lock(&g_oplog_mu); if (g_log_off < 0) { pthread_mutex_unlock(&g_oplog_mu); return -3; } if (kvs_oplog_flush_internal(logfd, 1) < 0) { pthread_mutex_unlock(&g_oplog_mu); return -4; } if (kvs_oplog_append_direct(cmd, len, logfd) < 0) { pthread_mutex_unlock(&g_oplog_mu); return -4; } pthread_mutex_unlock(&g_oplog_mu); return 0; } int kvs_replay_log(int logfd){ if (logfd < 0) return -1; if (lseek(logfd, 0, SEEK_SET) < 0) { return -1; } for (;;) { uint32_t nlen = 0; int hr = read_full(logfd, &nlen, sizeof(nlen)); if (hr == 0) break; if (hr < 0) { return -2; } uint32_t len = ntohl(nlen); if (len == 0) { return -3; } uint8_t *cmd_bytes = (uint8_t *)kvs_malloc(len); if (!cmd_bytes ) { return -5; } int pr = read_full(logfd, cmd_bytes, len); if (pr <= 0) { kvs_free(cmd_bytes ); return -6; } resp_cmd_t cmd; memset(&cmd, 0, sizeof(cmd)); int clen = resp_parse_one_cmd(cmd_bytes, (int)len, &cmd); if (clen <= 0 || clen != (int)len) { kvs_free(cmd_bytes); return -7; } resp_value_t outvalue; memset(&outvalue, 0, sizeof(outvalue)); int dr = resp_dispatch(&cmd, &outvalue); if (dr < 0) { kvs_free(cmd_bytes); return -8; } kvs_free(cmd_bytes); } pthread_mutex_lock(&g_oplog_mu); g_log_off = lseek(logfd, 0, SEEK_CUR); if (g_sync_synced_gen < g_sync_gen) { g_sync_synced_gen = g_sync_gen; } pthread_mutex_unlock(&g_oplog_mu); return 0; } /** * clear log file not close */ int ksv_clear_log(int logfd){ if(logfd < 0) return -1; pthread_mutex_lock(&g_oplog_mu); if (kvs_oplog_flush_internal(logfd, 1) < 0) { pthread_mutex_unlock(&g_oplog_mu); return -2; } pthread_mutex_unlock(&g_oplog_mu); if (kvs_oplog_fsync_all_workers(logfd) < 0) { return -3; } cleanup_finished_iouring_tasks(&global_uring_ctx); ftruncate(logfd, 0); lseek(logfd, 0, SEEK_SET); pthread_mutex_lock(&g_oplog_mu); g_log_off = 0; g_sync_gen = 0; g_sync_synced_gen = 0; pthread_mutex_unlock(&g_oplog_mu); return 0; }