flush_writers: use chain instead of ptr array

connection_manager
YuQing 2020-10-26 16:48:48 +08:00
parent 99f80b847e
commit b9e22e61fc
2 changed files with 29 additions and 54 deletions

View File

@ -423,56 +423,39 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb)
return 0;
}
static inline void add_to_flush_writer_array(SFBinlogWriterThread *thread,
static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread,
SFBinlogWriterInfo *writer)
{
struct sf_binlog_writer_info **entry;
struct sf_binlog_writer_info **end;
if (thread->flush_writers.count == 0) {
thread->flush_writers.entries[thread->flush_writers.count++] = writer;
if (writer->flush.in_queue) {
return;
}
if (thread->flush_writers.count == thread->flush_writers.alloc) {
return;
writer->flush.in_queue = true;
writer->flush.next = NULL;
if (thread->flush_writers.head == NULL) {
thread->flush_writers.head = writer;
} else {
thread->flush_writers.tail->flush.next = writer;
}
if (thread->flush_writers.entries[0] == writer) {
return;
}
end = thread->flush_writers.entries + thread->flush_writers.count;
for (entry=thread->flush_writers.entries+1; entry<end; entry++) {
if (*entry == writer) {
return;
}
}
thread->flush_writers.entries[thread->flush_writers.count++] = writer;
thread->flush_writers.tail = writer;
}
static inline int flush_writer_files(SFBinlogWriterThread *thread)
{
struct sf_binlog_writer_info **entry;
struct sf_binlog_writer_info **end;
struct sf_binlog_writer_info *writer;
int result;
//logInfo("flush_writers count: %d", thread->flush_writers.count);
if (thread->flush_writers.count == 1) {
/*
logInfo("flush_writers filename: %s",
thread->flush_writers.entries[0]->file.name);
*/
return binlog_write_to_file(thread->flush_writers.entries[0]);
}
end = thread->flush_writers.entries + thread->flush_writers.count;
for (entry=thread->flush_writers.entries; entry<end; entry++) {
if ((result=binlog_write_to_file(*entry)) != 0) {
writer = thread->flush_writers.head;
while (writer != NULL) {
if ((result=binlog_write_to_file(writer)) != 0) {
return result;
}
writer->flush.in_queue = false;
writer = writer->flush.next;
}
thread->flush_writers.head = thread->flush_writers.tail = NULL;
return 0;
}
@ -483,9 +466,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
SFBinlogWriterBuffer *wbuffer;
SFBinlogWriterBuffer *current;
thread->flush_writers.count = 0;
wbuffer = wb_head;
if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) {
do {
current = wbuffer;
@ -511,7 +492,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
fast_mblock_free_object(&current->writer->
thread->mblock, current);
} else {
add_to_flush_writer_array(thread, current->writer);
add_to_flush_writer_queue(thread, current->writer);
if ((result=deal_record_by_version(current)) != 0) {
return result;
}
@ -526,7 +507,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
current = wbuffer;
wbuffer = wbuffer->next;
add_to_flush_writer_array(thread, current->writer);
add_to_flush_writer_queue(thread, current->writer);
fast_mblock_free_object(&current->writer->
thread->mblock, current);
} while (wbuffer != NULL);
@ -623,6 +604,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer,
bool create;
char filepath[PATH_MAX];
writer->flush.in_queue = false;
if ((result=sf_binlog_buffer_init(&writer->binlog_buffer,
buffer_size)) != 0)
{
@ -690,7 +672,6 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
int element_size;
pthread_t tid;
int result;
int bytes;
thread->order_mode = order_mode;
thread->order_by = order_by;
@ -715,14 +696,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
return result;
}
bytes = sizeof(struct sf_binlog_writer_info *) * writer_count;
thread->flush_writers.entries = (struct sf_binlog_writer_info **)fc_malloc(bytes);
if (thread->flush_writers.entries == NULL) {
return ENOMEM;
}
thread->flush_writers.alloc = writer_count;
thread->flush_writers.count = 0;
thread->flush_writers.head = thread->flush_writers.tail = NULL;
return fc_create_thread(&tid, binlog_writer_func, thread,
SF_G_THREAD_STACK_SIZE);
}

View File

@ -40,12 +40,6 @@
struct sf_binlog_writer_info;
typedef struct sf_binlog_writer_ptr_array {
struct sf_binlog_writer_info **entries;
int count;
int alloc;
} SFBinlogWriterPtrArray;
typedef struct sf_binlog_writer_buffer {
int64_t version;
BufferInfo bf;
@ -70,7 +64,10 @@ typedef struct binlog_writer_thread {
bool use_fixed_buffer_size;
short order_mode;
short order_by;
SFBinlogWriterPtrArray flush_writers;
struct {
struct sf_binlog_writer_info *head;
struct sf_binlog_writer_info *tail;
} flush_writers;
} SFBinlogWriterThread;
typedef struct sf_binlog_writer_info {
@ -96,6 +93,10 @@ typedef struct sf_binlog_writer_info {
} version_ctx;
SFBinlogBuffer binlog_buffer;
SFBinlogWriterThread *thread;
struct {
bool in_queue;
struct sf_binlog_writer_info *next;
} flush;
} SFBinlogWriterInfo;
typedef struct sf_binlog_writer_context {