diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index bd563d0..d592de2 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -423,56 +423,39 @@ static int deal_record_by_version(SFBinlogWriterBuffer *wb) return 0; } -static inline void add_to_flush_writer_array(SFBinlogWriterThread *thread, +static inline void add_to_flush_writer_queue(SFBinlogWriterThread *thread, SFBinlogWriterInfo *writer) { - struct sf_binlog_writer_info **entry; - struct sf_binlog_writer_info **end; - - if (thread->flush_writers.count == 0) { - thread->flush_writers.entries[thread->flush_writers.count++] = writer; + if (writer->flush.in_queue) { return; } - if (thread->flush_writers.count == thread->flush_writers.alloc) { - return; + writer->flush.in_queue = true; + writer->flush.next = NULL; + if (thread->flush_writers.head == NULL) { + thread->flush_writers.head = writer; + } else { + thread->flush_writers.tail->flush.next = writer; } - if (thread->flush_writers.entries[0] == writer) { - return; - } - - end = thread->flush_writers.entries + thread->flush_writers.count; - for (entry=thread->flush_writers.entries+1; entryflush_writers.entries[thread->flush_writers.count++] = writer; + thread->flush_writers.tail = writer; } static inline int flush_writer_files(SFBinlogWriterThread *thread) { - struct sf_binlog_writer_info **entry; - struct sf_binlog_writer_info **end; + struct sf_binlog_writer_info *writer; int result; - //logInfo("flush_writers count: %d", thread->flush_writers.count); - if (thread->flush_writers.count == 1) { - /* - logInfo("flush_writers filename: %s", - thread->flush_writers.entries[0]->file.name); - */ - return binlog_write_to_file(thread->flush_writers.entries[0]); - } - - end = thread->flush_writers.entries + thread->flush_writers.count; - for (entry=thread->flush_writers.entries; entryflush_writers.head; + while (writer != NULL) { + if ((result=binlog_write_to_file(writer)) != 0) { return result; } + + writer->flush.in_queue = false; + writer = writer->flush.next; } + thread->flush_writers.head = thread->flush_writers.tail = NULL; return 0; } @@ -483,9 +466,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, SFBinlogWriterBuffer *wbuffer; SFBinlogWriterBuffer *current; - thread->flush_writers.count = 0; wbuffer = wb_head; - if (thread->order_by == SF_BINLOG_THREAD_TYPE_ORDER_BY_VERSION) { do { current = wbuffer; @@ -511,7 +492,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, fast_mblock_free_object(¤t->writer-> thread->mblock, current); } else { - add_to_flush_writer_array(thread, current->writer); + add_to_flush_writer_queue(thread, current->writer); if ((result=deal_record_by_version(current)) != 0) { return result; } @@ -526,7 +507,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread, current = wbuffer; wbuffer = wbuffer->next; - add_to_flush_writer_array(thread, current->writer); + add_to_flush_writer_queue(thread, current->writer); fast_mblock_free_object(¤t->writer-> thread->mblock, current); } while (wbuffer != NULL); @@ -623,6 +604,7 @@ int sf_binlog_writer_init_normal(SFBinlogWriterInfo *writer, bool create; char filepath[PATH_MAX]; + writer->flush.in_queue = false; if ((result=sf_binlog_buffer_init(&writer->binlog_buffer, buffer_size)) != 0) { @@ -690,7 +672,6 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, int element_size; pthread_t tid; int result; - int bytes; thread->order_mode = order_mode; thread->order_by = order_by; @@ -715,14 +696,7 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread, return result; } - bytes = sizeof(struct sf_binlog_writer_info *) * writer_count; - thread->flush_writers.entries = (struct sf_binlog_writer_info **)fc_malloc(bytes); - if (thread->flush_writers.entries == NULL) { - return ENOMEM; - } - thread->flush_writers.alloc = writer_count; - thread->flush_writers.count = 0; - + thread->flush_writers.head = thread->flush_writers.tail = NULL; return fc_create_thread(&tid, binlog_writer_func, thread, SF_G_THREAD_STACK_SIZE); } diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 5a4eefd..bef3283 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -40,12 +40,6 @@ struct sf_binlog_writer_info; -typedef struct sf_binlog_writer_ptr_array { - struct sf_binlog_writer_info **entries; - int count; - int alloc; -} SFBinlogWriterPtrArray; - typedef struct sf_binlog_writer_buffer { int64_t version; BufferInfo bf; @@ -70,7 +64,10 @@ typedef struct binlog_writer_thread { bool use_fixed_buffer_size; short order_mode; short order_by; - SFBinlogWriterPtrArray flush_writers; + struct { + struct sf_binlog_writer_info *head; + struct sf_binlog_writer_info *tail; + } flush_writers; } SFBinlogWriterThread; typedef struct sf_binlog_writer_info { @@ -96,6 +93,10 @@ typedef struct sf_binlog_writer_info { } version_ctx; SFBinlogBuffer binlog_buffer; SFBinlogWriterThread *thread; + struct { + bool in_queue; + struct sf_binlog_writer_info *next; + } flush; } SFBinlogWriterInfo; typedef struct sf_binlog_writer_context {