diff --git a/src/Makefile.in b/src/Makefile.in index 48a97cf..2dd611c 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -8,7 +8,8 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION) TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \ sf_cluster_cfg.h sf_sharding_htable.h sf_connection_manager.h \ - sf_serializer.h sf_binlog_index.h sf_file_writer.h + sf_serializer.h sf_binlog_index.h sf_file_writer.h \ + sf_ordered_writer.h IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ idempotency/server/server_channel.h \ @@ -28,7 +29,7 @@ SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \ sf_binlog_writer.lo sf_sharding_htable.lo \ sf_cluster_cfg.lo sf_connection_manager.lo \ sf_serializer.lo sf_binlog_index.lo \ - sf_file_writer.lo \ + sf_file_writer.lo sf_ordered_writer.lo \ idempotency/server/server_channel.lo \ idempotency/server/request_htable.lo \ idempotency/server/channel_htable.lo \ diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c new file mode 100644 index 0000000..ba10888 --- /dev/null +++ b/src/sf_ordered_writer.c @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/logger.h" +#include "fastcommon/sockopt.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/pthread_func.h" +#include "fastcommon/sched_thread.h" +#include "sf_global.h" +#include "sf_func.h" +#include "sf_ordered_writer.h" + +#define deal_binlog_one_record(writer, wb) \ + sf_file_writer_deal_buffer(&(writer)->fw, &wb->bf, wb->version) + +static inline int flush_writer_files(SFOrderedWriterInfo *writer) +{ + int result; + + if ((result=sf_file_writer_flush(&writer->fw)) != 0) { + return result; + } + + if (writer->fw.flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) { + writer->fw.last_versions.done = writer->fw.last_versions.pending; + } + + return 0; +} + +static int deal_binlog_records(SFOrderedWriterContext *context, + SFOrderedWriterBuffer *wb_head) +{ + int result; + SFOrderedWriterBuffer *wbuffer; + SFOrderedWriterBuffer *current; + + wbuffer = wb_head; + do { + current = wbuffer; + wbuffer = wbuffer->next; + + context->writer.fw.total_count++; + if ((result=deal_binlog_one_record(&context->writer, current)) != 0) { + return result; + } + + fast_mblock_free_object(&context->thread.allocators.buffer, current); + } while (wbuffer != NULL); + + return flush_writer_files(&context->writer); +} + +void sf_ordered_writer_finish(SFOrderedWriterInfo *writer) +{ + //SFOrderedWriterBuffer *wb_head; + int count; + + if (writer->fw.file.name != NULL) { + sorted_queue_terminate(&writer->thread->queues.buffer); + + count = 0; + while (writer->thread->running && ++count < 300) { + fc_sleep_ms(10); + } + + if (writer->thread->running) { + logWarning("file: "__FILE__", line: %d, " + "%s binlog write thread still running, " + "exit anyway!", __LINE__, writer->fw.cfg.subdir_name); + } + + //TODO + /* + wb_head = (SFOrderedWriterBuffer *)sorted_queue_try_pop_all( + &writer->thread->queue); + if (wb_head != NULL) { + deal_binlog_records(writer->thread, wb_head); + } + */ + + free(writer->fw.file.name); + writer->fw.file.name = NULL; + } + + if (writer->fw.file.fd >= 0) { + close(writer->fw.file.fd); + writer->fw.file.fd = -1; + } +} + +static void *binlog_writer_func(void *arg) +{ + SFOrderedWriterContext *context; + SFOrderedWriterThread *thread; + SFOrderedWriterBuffer *wb_head; + + context = (SFOrderedWriterContext *)arg; + thread = &context->thread; + +#ifdef OS_LINUX + { + char thread_name[64]; + snprintf(thread_name, sizeof(thread_name), + "%s-writer", thread->name); + prctl(PR_SET_NAME, thread_name); + } +#endif + + thread->running = true; + while (SF_G_CONTINUE_FLAG) { + wb_head = NULL; + //wb_head = (SFOrderedWriterBuffer *)sorted_queue_pop_all(&thread->queue); + if (wb_head == NULL) { + continue; + } + + if (deal_binlog_records(context, wb_head) != 0) { + logCrit("file: "__FILE__", line: %d, " + "deal_binlog_records fail, " + "program exit!", __LINE__); + sf_terminate_myself(); + } + } + + thread->running = false; + return NULL; +} + +static int binlog_wbuffer_alloc_init(void *element, void *args) +{ + SFOrderedWriterBuffer *wbuffer; + SFOrderedWriterInfo *writer; + + wbuffer = (SFOrderedWriterBuffer *)element; + writer = (SFOrderedWriterInfo *)args; + wbuffer->bf.alloc_size = writer->fw.cfg.max_record_size; + wbuffer->bf.buff = (char *)(wbuffer + 1); + return 0; +} + +static int compare_buffer_version(const SFOrderedWriterBuffer *entry1, + const SFOrderedWriterBuffer *entry2) +{ + return fc_compare_int64(entry1->version, entry2->version); +} + +static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context, + const char *name, const int max_record_size) +{ + const int alloc_elements_once = 1024; + SFOrderedWriterThread *thread; + SFOrderedWriterInfo *writer; + int element_size; + pthread_t tid; + int result; + + thread = &context->thread; + writer = &context->writer; + snprintf(thread->name, sizeof(thread->name), "%s", name); + writer->fw.cfg.max_record_size = max_record_size; + writer->thread = thread; + + if ((result=fast_mblock_init_ex1(&thread->allocators.version, + "writer-ver-info", sizeof(SFWriterVersionInfo), + 8 * 1024, 0, NULL, NULL, true)) != 0) + { + return result; + } + + element_size = sizeof(SFOrderedWriterBuffer) + max_record_size; + if ((result=fast_mblock_init_ex1(&thread->allocators.buffer, + "sorted-wbuffer", element_size, alloc_elements_once, + 0, binlog_wbuffer_alloc_init, writer, true)) != 0) + { + return result; + } + + if ((result=fc_queue_init(&thread->queues.version, (unsigned long) + (&((SFWriterVersionInfo *)NULL)->next))) != 0) + { + return result; + } + + if ((result=sorted_queue_init(&thread->queues.buffer, (unsigned long) + (&((SFOrderedWriterBuffer *)NULL)->next), + (int (*)(const void *, const void *)) + compare_buffer_version)) != 0) + { + return result; + } + + return fc_create_thread(&tid, binlog_writer_func, + context, SF_G_THREAD_STACK_SIZE); +} + +int sf_ordered_writer_init(SFOrderedWriterContext *context, + const char *data_path, const char *subdir_name, + const int buffer_size, const int max_record_size) +{ + int result; + if ((result=sf_file_writer_init_normal(&context->writer.fw, + data_path, subdir_name, buffer_size)) != 0) + { + return result; + } + + return sf_ordered_writer_init_thread(context, + subdir_name, max_record_size); +} diff --git a/src/sf_ordered_writer.h b/src/sf_ordered_writer.h new file mode 100644 index 0000000..6f96bb6 --- /dev/null +++ b/src/sf_ordered_writer.h @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//sf_ordered_writer.h + +#ifndef _SF_ORDERED_WRITER_H_ +#define _SF_ORDERED_WRITER_H_ + +#include "fastcommon/sorted_queue.h" +#include "sf_file_writer.h" + +typedef struct sf_writer_version_info { + int64_t version; + struct sf_writer_version_info *next; +} SFWriterVersionInfo; + +typedef struct sf_ordered_writer_buffer { + int64_t version; + BufferInfo bf; + struct sf_ordered_writer_buffer *next; +} SFOrderedWriterBuffer; + +typedef struct sf_orderd_writer_thread { + struct { + struct fast_mblock_man version; + struct fast_mblock_man buffer; + } allocators; + + struct { + struct fc_queue version; + struct sorted_queue buffer; + } queues; + char name[64]; + volatile bool running; +} SFOrderedWriterThread; + +typedef struct sf_ordered_writer_info { + SFFileWriterInfo fw; + SFBinlogBuffer binlog_buffer; + SFOrderedWriterThread *thread; +} SFOrderedWriterInfo; + +typedef struct sf_ordered_writer_context { + SFOrderedWriterInfo writer; + SFOrderedWriterThread thread; +} SFOrderedWriterContext; + +#ifdef __cplusplus +extern "C" { +#endif + +int sf_ordered_writer_init(SFOrderedWriterContext *context, + const char *data_path, const char *subdir_name, + const int buffer_size, const int max_record_size); + +#define sf_ordered_writer_set_flags(writer, flags) \ + sf_file_writer_set_flags(&(writer)->fw, flags) + +#define sf_ordered_writer_get_last_version(writer) \ + sf_ordered_writer_get_last_version(&(writer)->fw) + +void sf_ordered_writer_finish(SFOrderedWriterInfo *writer); + +#define sf_ordered_writer_get_current_index(writer) \ + sf_file_writer_get_current_index(&(writer)->fw) + +#define sf_ordered_writer_get_current_position(writer, position) \ + sf_file_writer_get_current_position(&(writer)->fw, position) + +static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer( + SFOrderedWriterThread *thread) +{ + return (SFOrderedWriterBuffer *)fast_mblock_alloc_object( + &thread->allocators.buffer); +} + +static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_versioned_buffer_ex( + SFOrderedWriterInfo *writer, const int64_t version) +{ + SFOrderedWriterBuffer *buffer; + buffer = (SFOrderedWriterBuffer *)fast_mblock_alloc_object( + &writer->thread->allocators.buffer); + if (buffer != NULL) { + buffer->version = version; + } + return buffer; +} + +#define sf_ordered_writer_get_filepath(data_path, subdir_name, filename, size) \ + sf_file_writer_get_filepath(data_path, subdir_name, filename, size) + +#define sf_ordered_writer_get_filename(data_path, \ + subdir_name, binlog_index, filename, size) \ + sf_file_writer_get_filename(data_path, subdir_name, \ + binlog_index, filename, size) + +#define sf_ordered_writer_set_binlog_index(writer, binlog_index) \ + sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index) + +#define sf_push_to_binlog_thread_queue(thread, buffer) \ + sorted_queue_push(&(thread)->queues.buffer, buffer) + +static inline void sf_push_to_binlog_write_queue(SFOrderedWriterInfo *writer, + SFOrderedWriterBuffer *buffer) +{ + sorted_queue_push(&writer->thread->queues.buffer, buffer); +} + +#ifdef __cplusplus +} +#endif + +#endif