add function sf_binlog_writer_destroy

recovery_and_balance
YuQing 2022-03-17 20:52:41 +08:00
parent e061a3dfad
commit a265bbbbea
5 changed files with 73 additions and 13 deletions

View File

@ -257,6 +257,10 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer)
int count; int count;
if (writer->fw.file.name != NULL) { if (writer->fw.file.name != NULL) {
while (!fc_queue_empty(&writer->thread->queue)) {
fc_sleep_ms(10);
}
fc_queue_terminate(&writer->thread->queue); fc_queue_terminate(&writer->thread->queue);
count = 0; count = 0;
@ -341,13 +345,22 @@ static int binlog_wbuffer_alloc_init(void *element, void *args)
return 0; return 0;
} }
static void binlog_wbuffer_destroy_func(void *element, void *args)
{
SFBinlogWriterBuffer *wbuffer;
wbuffer = (SFBinlogWriterBuffer *)element;
if (wbuffer->bf.buff != NULL) {
free(wbuffer->bf.buff);
}
}
int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer,
const char *data_path, const char *subdir_name, const char *data_path, const char *subdir_name,
const int buffer_size) const int buffer_size)
{ {
writer->flush.in_queue = false; memset(writer, 0, sizeof(*writer));
return sf_file_writer_init_normal(&writer->fw, return sf_file_writer_init(&writer->fw, data_path,
data_path, subdir_name, buffer_size); subdir_name, buffer_size);
} }
int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
@ -369,8 +382,9 @@ int sf_binlog_writer_init_by_version(SFBinlogWriterInfo *writer,
writer->version_ctx.change_count = 0; writer->version_ctx.change_count = 0;
binlog_writer_set_next_version(writer, next_version); binlog_writer_set_next_version(writer, next_version);
return sf_binlog_writer_init_normal(writer, writer->flush.in_queue = false;
data_path, subdir_name, buffer_size); return sf_file_writer_init(&writer->fw, data_path,
subdir_name, buffer_size);
} }
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
@ -379,9 +393,10 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
const int writer_count, const bool use_fixed_buffer_size) const int writer_count, const bool use_fixed_buffer_size)
{ {
const int alloc_elements_once = 1024; const int alloc_elements_once = 1024;
int result;
int element_size; int element_size;
pthread_t tid; pthread_t tid;
int result; struct fast_mblock_object_callbacks callbacks;
snprintf(thread->name, sizeof(thread->name), "%s", name); snprintf(thread->name, sizeof(thread->name), "%s", name);
thread->order_mode = order_mode; thread->order_mode = order_mode;
@ -390,13 +405,18 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
writer->fw.cfg.max_record_size = max_record_size; writer->fw.cfg.max_record_size = max_record_size;
writer->thread = thread; writer->thread = thread;
callbacks.init_func = binlog_wbuffer_alloc_init;
callbacks.args = writer;
element_size = sizeof(SFBinlogWriterBuffer); element_size = sizeof(SFBinlogWriterBuffer);
if (use_fixed_buffer_size) { if (use_fixed_buffer_size) {
element_size += max_record_size; element_size += max_record_size;
callbacks.destroy_func = NULL;
} else {
callbacks.destroy_func = binlog_wbuffer_destroy_func;
} }
if ((result=fast_mblock_init_ex1(&thread->mblock, "binlog-wbuffer", if ((result=fast_mblock_init_ex2(&thread->mblock, "binlog-wbuffer",
element_size, alloc_elements_once, 0, element_size, alloc_elements_once, 0,
binlog_wbuffer_alloc_init, writer, true)) != 0) &callbacks, true, NULL)) != 0)
{ {
return result; return result;
} }

View File

@ -132,6 +132,33 @@ static inline int sf_binlog_writer_init(SFBinlogWriterContext *context,
max_record_size); max_record_size);
} }
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);
static inline void sf_binlog_writer_destroy_writer(
SFBinlogWriterInfo *writer)
{
sf_file_writer_destroy(&writer->fw);
if (writer->version_ctx.ring.slots != NULL) {
free(writer->version_ctx.ring.slots);
writer->version_ctx.ring.slots = NULL;
}
}
static inline void sf_binlog_writer_destroy_thread(
SFBinlogWriterThread *thread)
{
fast_mblock_destroy(&thread->mblock);
fc_queue_destroy(&thread->queue);
}
static inline void sf_binlog_writer_destroy(
SFBinlogWriterContext *context)
{
sf_binlog_writer_finish(&context->writer);
sf_binlog_writer_destroy_writer(&context->writer);
sf_binlog_writer_destroy_thread(&context->thread);
}
int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
const short order_by); const short order_by);
@ -144,8 +171,6 @@ int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
#define sf_binlog_writer_get_last_version(writer) \ #define sf_binlog_writer_get_last_version(writer) \
sf_file_writer_get_last_version(&(writer)->fw) sf_file_writer_get_last_version(&(writer)->fw)
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);
#define sf_binlog_get_current_write_index(writer) \ #define sf_binlog_get_current_write_index(writer) \
sf_file_writer_get_current_index(&(writer)->fw) sf_file_writer_get_current_index(&(writer)->fw)

View File

@ -298,7 +298,7 @@ int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
return 0; return 0;
} }
int sf_file_writer_init_normal(SFFileWriterInfo *writer, int sf_file_writer_init(SFFileWriterInfo *writer,
const char *data_path, const char *subdir_name, const char *data_path, const char *subdir_name,
const int buffer_size) const int buffer_size)
{ {
@ -347,6 +347,19 @@ int sf_file_writer_init_normal(SFFileWriterInfo *writer,
return 0; return 0;
} }
void sf_file_writer_destroy(SFFileWriterInfo *writer)
{
if (writer->file.fd >= 0) {
close(writer->file.fd);
writer->file.fd = -1;
}
if (writer->file.name != NULL) {
free(writer->file.name);
writer->file.name = NULL;
}
sf_binlog_buffer_destroy(&writer->binlog_buffer);
}
int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer, int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer,
const int binlog_index) const int binlog_index)
{ {

View File

@ -63,10 +63,12 @@ typedef struct sf_file_writer_info {
extern "C" { extern "C" {
#endif #endif
int sf_file_writer_init_normal(SFFileWriterInfo *writer, int sf_file_writer_init(SFFileWriterInfo *writer,
const char *data_path, const char *subdir_name, const char *data_path, const char *subdir_name,
const int buffer_size); const int buffer_size);
void sf_file_writer_destroy(SFFileWriterInfo *writer);
int sf_file_writer_deal_buffer(SFFileWriterInfo *writer, int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
BufferInfo *buffer, const int64_t version); BufferInfo *buffer, const int64_t version);

View File

@ -239,7 +239,7 @@ int sf_ordered_writer_init(SFOrderedWriterContext *context,
const int buffer_size, const int max_record_size) const int buffer_size, const int max_record_size)
{ {
int result; int result;
if ((result=sf_file_writer_init_normal(&context->writer.fw, if ((result=sf_file_writer_init(&context->writer.fw,
data_path, subdir_name, buffer_size)) != 0) data_path, subdir_name, buffer_size)) != 0)
{ {
return result; return result;