From a727f382bc59790db9e92594515ac039bed4d1e8 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 18 Mar 2022 16:48:26 +0800 Subject: [PATCH] add function: sf_binlog_writer_notify_exit --- src/sf_binlog_writer.c | 45 +++++++++++++++++++++++++++++++----------- src/sf_binlog_writer.h | 3 +++ src/sf_file_writer.c | 1 + 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index bedda07..d7c8ac2 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -34,6 +34,8 @@ #include "sf_func.h" #include "sf_binlog_writer.h" +#define ERRNO_THREAD_EXIT -1000 + static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer, const uint64_t next_version) { @@ -194,7 +196,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); break; - + case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT: + flush_writer_files(thread); + return ERRNO_THREAD_EXIT; case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { logWarning("file: "__FILE__", line: %d, " @@ -260,8 +264,7 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer) while (!fc_queue_empty(&writer->thread->queue)) { fc_sleep_ms(10); } - - fc_queue_terminate(&writer->thread->queue); + sf_binlog_writer_notify_exit(writer); count = 0; while (writer->thread->running && ++count < 300) { @@ -294,6 +297,7 @@ static void *binlog_writer_func(void *arg) { SFBinlogWriterThread *thread; SFBinlogWriterBuffer *wb_head; + int result; thread = (SFBinlogWriterThread *)arg; @@ -313,11 +317,14 @@ static void *binlog_writer_func(void *arg) continue; } - if (deal_binlog_records(thread, wb_head) != 0) { - logCrit("file: "__FILE__", line: %d, " - "deal_binlog_records fail, " - "program exit!", __LINE__); - sf_terminate_myself(); + if ((result=deal_binlog_records(thread, wb_head)) != 0) { + if (result != ERRNO_THREAD_EXIT) { + logCrit("file: "__FILE__", line: %d, " + "deal_binlog_records fail, " + "program exit!", __LINE__); + sf_terminate_myself(); + } + break; } } @@ -467,13 +474,13 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, return 0; } -int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, - const int64_t next_version) +static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer, + const int buffer_type, const int64_t version) { SFBinlogWriterBuffer *buffer; - if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, - next_version, SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) + if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, + version, version, buffer_type)) == NULL) { return ENOMEM; } @@ -481,3 +488,17 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, fc_queue_push(&writer->thread->queue, buffer); return 0; } + +int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, + const int64_t next_version) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION, + next_version); +} + +int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer) +{ + return sf_binlog_writer_push_directive(writer, + SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0); +} diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 03c9de2..5c79e4d 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -31,6 +31,7 @@ #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 +#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 3 #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ (buffer)->version.first = (buffer)->version.last = ver @@ -165,6 +166,8 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, const int64_t next_version); +int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer); + #define sf_binlog_writer_set_flags(writer, flags) \ sf_file_writer_set_flags(&(writer)->fw, flags) diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c index 071faf0..794697a 100644 --- a/src/sf_file_writer.c +++ b/src/sf_file_writer.c @@ -94,6 +94,7 @@ static int get_binlog_index_from_file(SFFileWriterInfo *writer) if (access(full_filename, F_OK) != 0) { if (errno == ENOENT) { writer->binlog.index = 0; + writer->binlog.compress_index = 0; return write_to_binlog_index_file(writer); } }