From a265bbbbea32725256c717bb62bf0f38f5637749 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 17 Mar 2022 20:52:41 +0800 Subject: [PATCH] add function sf_binlog_writer_destroy --- src/sf_binlog_writer.c | 36 ++++++++++++++++++++++++++++-------- src/sf_binlog_writer.h | 29 +++++++++++++++++++++++++++-- src/sf_file_writer.c | 15 ++++++++++++++- src/sf_file_writer.h | 4 +++- src/sf_ordered_writer.c | 2 +- 5 files changed, 73 insertions(+), 13 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index fa5a2ba..bedda07 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -257,6 +257,10 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) int count; if (writer->fw.file.name != NULL) { + while (!fc_queue_empty(&writer->thread->queue)) { + fc_sleep_ms(10); + } + fc_queue_terminate(&writer->thread->queue); count = 0; @@ -341,13 +345,22 @@ static int binlog_wbuffer_alloc_init(void *element, void *args) return 0; } +static void binlog_wbuffer_destroy_func(void *element, void *args) +{ + SFBinlogWriterBuffer *wbuffer; + wbuffer = (SFBinlogWriterBuffer *)element; + if (wbuffer->bf.buff != NULL) { + free(wbuffer->bf.buff); + } +} + int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, const char *data_path, const char *subdir_name, const int buffer_size) { - writer->flush.in_queue = false; - return sf_file_writer_init_normal(&writer->fw, - data_path, subdir_name, buffer_size); + memset(writer, 0, sizeof(*writer)); + return sf_file_writer_init(&writer->fw, data_path, + subdir_name, buffer_size); } int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, @@ -369,8 +382,9 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, writer->version_ctx.change_count = 0; binlog_writer_set_next_version(writer, next_version); - return sf_binlog_writer_init_normal(writer, - data_path, subdir_name, buffer_size); + writer->flush.in_queue = false; + return sf_file_writer_init(&writer->fw, data_path, + subdir_name, buffer_size); } int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, @@ -379,9 +393,10 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, const int writer_count, const bool use_fixed_buffer_size) { const int alloc_elements_once = 1024; + int result; int element_size; pthread_t tid; - int result; + struct fast_mblock_object_callbacks callbacks; snprintf(thread->name, sizeof(thread->name), "%s", name); thread->order_mode = order_mode; @@ -390,13 +405,18 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, writer->fw.cfg.max_record_size = max_record_size; writer->thread = thread; + callbacks.init_func = binlog_wbuffer_alloc_init; + callbacks.args = writer; element_size = sizeof(SFBinlogWriterBuffer); if (use_fixed_buffer_size) { element_size += max_record_size; + callbacks.destroy_func = NULL; + } else { + callbacks.destroy_func = binlog_wbuffer_destroy_func; } - if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog-wbuffer", + if ((result=fast_mblock_init_ex2(&thread->mblock, "binlog-wbuffer", element_size, alloc_elements_once, 0, - binlog_wbuffer_alloc_init, writer, true)) != 0) + &callbacks, true, NULL)) != 0) { return result; } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 4834118..03c9de2 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -132,6 +132,33 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context, max_record_size); } +void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); + +static inline void sf_binlog_writer_destroy_writer( + SFBinlogWriterInfo *writer) +{ + sf_file_writer_destroy(&writer->fw); + if (writer->version_ctx.ring.slots != NULL) { + free(writer->version_ctx.ring.slots); + writer->version_ctx.ring.slots = NULL; + } +} + +static inline void sf_binlog_writer_destroy_thread( + SFBinlogWriterThread *thread) +{ + fast_mblock_destroy(&thread->mblock); + fc_queue_destroy(&thread->queue); +} + +static inline void sf_binlog_writer_destroy( + SFBinlogWriterContext *context) +{ + sf_binlog_writer_finish(&context->writer); + sf_binlog_writer_destroy_writer(&context->writer); + sf_binlog_writer_destroy_thread(&context->thread); +} + int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, const short order_by); @@ -144,8 +171,6 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, #define sf_binlog_writer_get_last_version(writer) \ sf_file_writer_get_last_version(&(writer)->fw) -void sf_binlog_writer_finish(SFBinlogWriterInfo *writer); - #define sf_binlog_get_current_write_index(writer) \ sf_file_writer_get_current_index(&(writer)->fw) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 20a07b7..071faf0 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -298,7 +298,7 @@ int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, return 0; } -int sf_file_writer_init_normal(SFFileWriterInfo *writer, +int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, const int buffer_size) { @@ -347,6 +347,19 @@ int sf_file_writer_init_normal(SFFileWriterInfo *writer, return 0; } +void sf_file_writer_destroy(SFFileWriterInfo *writer) +{ + if (writer->file.fd >= 0) { + close(writer->file.fd); + writer->file.fd = -1; + } + if (writer->file.name != NULL) { + free(writer->file.name); + writer->file.name = NULL; + } + sf_binlog_buffer_destroy(&writer->binlog_buffer); +} + int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer, const int binlog_index) { diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h index e0b0634..51b5324 100644 --- a/src/sf_file_writer.h +++ b/src/sf_file_writer.h @@ -63,10 +63,12 @@ typedef struct sf_file_writer_info { extern "C" { #endif -int sf_file_writer_init_normal(SFFileWriterInfo *writer, +int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path, const char *subdir_name, const int buffer_size); +void sf_file_writer_destroy(SFFileWriterInfo *writer); + int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, BufferInfo *buffer, const int64_t version); diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index 0fbe1d7..417c16d 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -239,7 +239,7 @@ int sf_ordered_writer_init(SFOrderedWriterContext *context, const int buffer_size, const int max_record_size) { int result; - if ((result=sf_file_writer_init_normal(&context->writer.fw, + if ((result=sf_file_writer_init(&context->writer.fw, data_path, subdir_name, buffer_size)) != 0) { return result;