add function: sf_binlog_writer_notify_exit

recovery_and_balance
YuQing 2022-03-18 16:48:26 +08:00
parent a265bbbbea
commit a727f382bc
3 changed files with 37 additions and 12 deletions

View File

@ -34,6 +34,8 @@
#include "sf_func.h" #include "sf_func.h"
#include "sf_binlog_writer.h" #include "sf_binlog_writer.h"
#define ERRNO_THREAD_EXIT -1000
static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer, static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer,
const uint64_t next_version) const uint64_t next_version)
{ {
@ -194,7 +196,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
fast_mblock_free_object(&current->writer-> fast_mblock_free_object(&current->writer->
thread->mblock, current); thread->mblock, current);
break; break;
case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT:
flush_writer_files(thread);
return ERRNO_THREAD_EXIT;
case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION:
if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { if (thread->order_by != SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
@ -260,8 +264,7 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer)
while (!fc_queue_empty(&writer->thread->queue)) { while (!fc_queue_empty(&writer->thread->queue)) {
fc_sleep_ms(10); fc_sleep_ms(10);
} }
sf_binlog_writer_notify_exit(writer);
fc_queue_terminate(&writer->thread->queue);
count = 0; count = 0;
while (writer->thread->running && ++count < 300) { while (writer->thread->running && ++count < 300) {
@ -294,6 +297,7 @@ static void *binlog_writer_func(void *arg)
{ {
SFBinlogWriterThread *thread; SFBinlogWriterThread *thread;
SFBinlogWriterBuffer *wb_head; SFBinlogWriterBuffer *wb_head;
int result;
thread = (SFBinlogWriterThread *)arg; thread = (SFBinlogWriterThread *)arg;
@ -313,11 +317,14 @@ static void *binlog_writer_func(void *arg)
continue; continue;
} }
if (deal_binlog_records(thread, wb_head) != 0) { if ((result=deal_binlog_records(thread, wb_head)) != 0) {
logCrit("file: "__FILE__", line: %d, " if (result != ERRNO_THREAD_EXIT) {
"deal_binlog_records fail, " logCrit("file: "__FILE__", line: %d, "
"program exit!", __LINE__); "deal_binlog_records fail, "
sf_terminate_myself(); "program exit!", __LINE__);
sf_terminate_myself();
}
break;
} }
} }
@ -467,13 +474,13 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
return 0; return 0;
} }
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer,
const int64_t next_version) const int buffer_type, const int64_t version)
{ {
SFBinlogWriterBuffer *buffer; SFBinlogWriterBuffer *buffer;
if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer, next_version, if ((buffer=sf_binlog_writer_alloc_versioned_buffer_ex(writer,
next_version, SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION)) == NULL) version, version, buffer_type)) == NULL)
{ {
return ENOMEM; return ENOMEM;
} }
@ -481,3 +488,17 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
fc_queue_push(&writer->thread->queue, buffer); fc_queue_push(&writer->thread->queue, buffer);
return 0; return 0;
} }
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version)
{
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION,
next_version);
}
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
{
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0);
}

View File

@ -31,6 +31,7 @@
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 3
#define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \
(buffer)->version.first = (buffer)->version.last = ver (buffer)->version.first = (buffer)->version.last = ver
@ -165,6 +166,8 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version); const int64_t next_version);
int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer);
#define sf_binlog_writer_set_flags(writer, flags) \ #define sf_binlog_writer_set_flags(writer, flags) \
sf_file_writer_set_flags(&(writer)->fw, flags) sf_file_writer_set_flags(&(writer)->fw, flags)

View File

@ -94,6 +94,7 @@ static int get_binlog_index_from_file(SFFileWriterInfo *writer)
if (access(full_filename, F_OK) != 0) { if (access(full_filename, F_OK) != 0) {
if (errno == ENOENT) { if (errno == ENOENT) {
writer->binlog.index = 0; writer->binlog.index = 0;
writer->binlog.compress_index = 0;
return write_to_binlog_index_file(writer); return write_to_binlog_index_file(writer);
} }
} }