From d2b828bd7a693628fea3a9e5ee0964565f5c28d6 Mon Sep 17 00:00:00 2001
From: YuQing <384681@qq.com>
Date: Tue, 21 Sep 2021 17:07:11 +0800
Subject: [PATCH] add files: sf_ordered_writer.[hc]
---
src/Makefile.in | 5 +-
src/sf_ordered_writer.c | 234 ++++++++++++++++++++++++++++++++++++++++
src/sf_ordered_writer.h | 125 +++++++++++++++++++++
3 files changed, 362 insertions(+), 2 deletions(-)
create mode 100644 src/sf_ordered_writer.c
create mode 100644 src/sf_ordered_writer.h
diff --git a/src/Makefile.in b/src/Makefile.in
index 48a97cf..2dd611c 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -8,7 +8,8 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION)
TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \
sf_cluster_cfg.h sf_sharding_htable.h sf_connection_manager.h \
- sf_serializer.h sf_binlog_index.h sf_file_writer.h
+ sf_serializer.h sf_binlog_index.h sf_file_writer.h \
+ sf_ordered_writer.h
IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
@@ -28,7 +29,7 @@ SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \
sf_binlog_writer.lo sf_sharding_htable.lo \
sf_cluster_cfg.lo sf_connection_manager.lo \
sf_serializer.lo sf_binlog_index.lo \
- sf_file_writer.lo \
+ sf_file_writer.lo sf_ordered_writer.lo \
idempotency/server/server_channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c
new file mode 100644
index 0000000..ba10888
--- /dev/null
+++ b/src/sf_ordered_writer.c
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include "fastcommon/logger.h"
+#include "fastcommon/sockopt.h"
+#include "fastcommon/shared_func.h"
+#include "fastcommon/pthread_func.h"
+#include "fastcommon/sched_thread.h"
+#include "sf_global.h"
+#include "sf_func.h"
+#include "sf_ordered_writer.h"
+
+#define deal_binlog_one_record(writer, wb) \
+ sf_file_writer_deal_buffer(&(writer)->fw, &wb->bf, wb->version)
+
+static inline int flush_writer_files(SFOrderedWriterInfo *writer)
+{
+ int result;
+
+ if ((result=sf_file_writer_flush(&writer->fw)) != 0) {
+ return result;
+ }
+
+ if (writer->fw.flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
+ writer->fw.last_versions.done = writer->fw.last_versions.pending;
+ }
+
+ return 0;
+}
+
+static int deal_binlog_records(SFOrderedWriterContext *context,
+ SFOrderedWriterBuffer *wb_head)
+{
+ int result;
+ SFOrderedWriterBuffer *wbuffer;
+ SFOrderedWriterBuffer *current;
+
+ wbuffer = wb_head;
+ do {
+ current = wbuffer;
+ wbuffer = wbuffer->next;
+
+ context->writer.fw.total_count++;
+ if ((result=deal_binlog_one_record(&context->writer, current)) != 0) {
+ return result;
+ }
+
+ fast_mblock_free_object(&context->thread.allocators.buffer, current);
+ } while (wbuffer != NULL);
+
+ return flush_writer_files(&context->writer);
+}
+
+void sf_ordered_writer_finish(SFOrderedWriterInfo *writer)
+{
+ //SFOrderedWriterBuffer *wb_head;
+ int count;
+
+ if (writer->fw.file.name != NULL) {
+ sorted_queue_terminate(&writer->thread->queues.buffer);
+
+ count = 0;
+ while (writer->thread->running && ++count < 300) {
+ fc_sleep_ms(10);
+ }
+
+ if (writer->thread->running) {
+ logWarning("file: "__FILE__", line: %d, "
+ "%s binlog write thread still running, "
+ "exit anyway!", __LINE__, writer->fw.cfg.subdir_name);
+ }
+
+ //TODO
+ /*
+ wb_head = (SFOrderedWriterBuffer *)sorted_queue_try_pop_all(
+ &writer->thread->queue);
+ if (wb_head != NULL) {
+ deal_binlog_records(writer->thread, wb_head);
+ }
+ */
+
+ free(writer->fw.file.name);
+ writer->fw.file.name = NULL;
+ }
+
+ if (writer->fw.file.fd >= 0) {
+ close(writer->fw.file.fd);
+ writer->fw.file.fd = -1;
+ }
+}
+
+static void *binlog_writer_func(void *arg)
+{
+ SFOrderedWriterContext *context;
+ SFOrderedWriterThread *thread;
+ SFOrderedWriterBuffer *wb_head;
+
+ context = (SFOrderedWriterContext *)arg;
+ thread = &context->thread;
+
+#ifdef OS_LINUX
+ {
+ char thread_name[64];
+ snprintf(thread_name, sizeof(thread_name),
+ "%s-writer", thread->name);
+ prctl(PR_SET_NAME, thread_name);
+ }
+#endif
+
+ thread->running = true;
+ while (SF_G_CONTINUE_FLAG) {
+ wb_head = NULL;
+ //wb_head = (SFOrderedWriterBuffer *)sorted_queue_pop_all(&thread->queue);
+ if (wb_head == NULL) {
+ continue;
+ }
+
+ if (deal_binlog_records(context, wb_head) != 0) {
+ logCrit("file: "__FILE__", line: %d, "
+ "deal_binlog_records fail, "
+ "program exit!", __LINE__);
+ sf_terminate_myself();
+ }
+ }
+
+ thread->running = false;
+ return NULL;
+}
+
+static int binlog_wbuffer_alloc_init(void *element, void *args)
+{
+ SFOrderedWriterBuffer *wbuffer;
+ SFOrderedWriterInfo *writer;
+
+ wbuffer = (SFOrderedWriterBuffer *)element;
+ writer = (SFOrderedWriterInfo *)args;
+ wbuffer->bf.alloc_size = writer->fw.cfg.max_record_size;
+ wbuffer->bf.buff = (char *)(wbuffer + 1);
+ return 0;
+}
+
+static int compare_buffer_version(const SFOrderedWriterBuffer *entry1,
+ const SFOrderedWriterBuffer *entry2)
+{
+ return fc_compare_int64(entry1->version, entry2->version);
+}
+
+static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context,
+ const char *name, const int max_record_size)
+{
+ const int alloc_elements_once = 1024;
+ SFOrderedWriterThread *thread;
+ SFOrderedWriterInfo *writer;
+ int element_size;
+ pthread_t tid;
+ int result;
+
+ thread = &context->thread;
+ writer = &context->writer;
+ snprintf(thread->name, sizeof(thread->name), "%s", name);
+ writer->fw.cfg.max_record_size = max_record_size;
+ writer->thread = thread;
+
+ if ((result=fast_mblock_init_ex1(&thread->allocators.version,
+ "writer-ver-info", sizeof(SFWriterVersionInfo),
+ 8 * 1024, 0, NULL, NULL, true)) != 0)
+ {
+ return result;
+ }
+
+ element_size = sizeof(SFOrderedWriterBuffer) + max_record_size;
+ if ((result=fast_mblock_init_ex1(&thread->allocators.buffer,
+ "sorted-wbuffer", element_size, alloc_elements_once,
+ 0, binlog_wbuffer_alloc_init, writer, true)) != 0)
+ {
+ return result;
+ }
+
+ if ((result=fc_queue_init(&thread->queues.version, (unsigned long)
+ (&((SFWriterVersionInfo *)NULL)->next))) != 0)
+ {
+ return result;
+ }
+
+ if ((result=sorted_queue_init(&thread->queues.buffer, (unsigned long)
+ (&((SFOrderedWriterBuffer *)NULL)->next),
+ (int (*)(const void *, const void *))
+ compare_buffer_version)) != 0)
+ {
+ return result;
+ }
+
+ return fc_create_thread(&tid, binlog_writer_func,
+ context, SF_G_THREAD_STACK_SIZE);
+}
+
+int sf_ordered_writer_init(SFOrderedWriterContext *context,
+ const char *data_path, const char *subdir_name,
+ const int buffer_size, const int max_record_size)
+{
+ int result;
+ if ((result=sf_file_writer_init_normal(&context->writer.fw,
+ data_path, subdir_name, buffer_size)) != 0)
+ {
+ return result;
+ }
+
+ return sf_ordered_writer_init_thread(context,
+ subdir_name, max_record_size);
+}
diff --git a/src/sf_ordered_writer.h b/src/sf_ordered_writer.h
new file mode 100644
index 0000000..6f96bb6
--- /dev/null
+++ b/src/sf_ordered_writer.h
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+//sf_ordered_writer.h
+
+#ifndef _SF_ORDERED_WRITER_H_
+#define _SF_ORDERED_WRITER_H_
+
+#include "fastcommon/sorted_queue.h"
+#include "sf_file_writer.h"
+
+typedef struct sf_writer_version_info {
+ int64_t version;
+ struct sf_writer_version_info *next;
+} SFWriterVersionInfo;
+
+typedef struct sf_ordered_writer_buffer {
+ int64_t version;
+ BufferInfo bf;
+ struct sf_ordered_writer_buffer *next;
+} SFOrderedWriterBuffer;
+
+typedef struct sf_orderd_writer_thread {
+ struct {
+ struct fast_mblock_man version;
+ struct fast_mblock_man buffer;
+ } allocators;
+
+ struct {
+ struct fc_queue version;
+ struct sorted_queue buffer;
+ } queues;
+ char name[64];
+ volatile bool running;
+} SFOrderedWriterThread;
+
+typedef struct sf_ordered_writer_info {
+ SFFileWriterInfo fw;
+ SFBinlogBuffer binlog_buffer;
+ SFOrderedWriterThread *thread;
+} SFOrderedWriterInfo;
+
+typedef struct sf_ordered_writer_context {
+ SFOrderedWriterInfo writer;
+ SFOrderedWriterThread thread;
+} SFOrderedWriterContext;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int sf_ordered_writer_init(SFOrderedWriterContext *context,
+ const char *data_path, const char *subdir_name,
+ const int buffer_size, const int max_record_size);
+
+#define sf_ordered_writer_set_flags(writer, flags) \
+ sf_file_writer_set_flags(&(writer)->fw, flags)
+
+#define sf_ordered_writer_get_last_version(writer) \
+ sf_ordered_writer_get_last_version(&(writer)->fw)
+
+void sf_ordered_writer_finish(SFOrderedWriterInfo *writer);
+
+#define sf_ordered_writer_get_current_index(writer) \
+ sf_file_writer_get_current_index(&(writer)->fw)
+
+#define sf_ordered_writer_get_current_position(writer, position) \
+ sf_file_writer_get_current_position(&(writer)->fw, position)
+
+static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer(
+ SFOrderedWriterThread *thread)
+{
+ return (SFOrderedWriterBuffer *)fast_mblock_alloc_object(
+ &thread->allocators.buffer);
+}
+
+static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_versioned_buffer_ex(
+ SFOrderedWriterInfo *writer, const int64_t version)
+{
+ SFOrderedWriterBuffer *buffer;
+ buffer = (SFOrderedWriterBuffer *)fast_mblock_alloc_object(
+ &writer->thread->allocators.buffer);
+ if (buffer != NULL) {
+ buffer->version = version;
+ }
+ return buffer;
+}
+
+#define sf_ordered_writer_get_filepath(data_path, subdir_name, filename, size) \
+ sf_file_writer_get_filepath(data_path, subdir_name, filename, size)
+
+#define sf_ordered_writer_get_filename(data_path, \
+ subdir_name, binlog_index, filename, size) \
+ sf_file_writer_get_filename(data_path, subdir_name, \
+ binlog_index, filename, size)
+
+#define sf_ordered_writer_set_binlog_index(writer, binlog_index) \
+ sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index)
+
+#define sf_push_to_binlog_thread_queue(thread, buffer) \
+ sorted_queue_push(&(thread)->queues.buffer, buffer)
+
+static inline void sf_push_to_binlog_write_queue(SFOrderedWriterInfo *writer,
+ SFOrderedWriterBuffer *buffer)
+{
+ sorted_queue_push(&writer->thread->queues.buffer, buffer);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif