diff --git a/src/sf_ordered_writer.c b/src/sf_ordered_writer.c index ba10888..0fbe1d7 100644 --- a/src/sf_ordered_writer.c +++ b/src/sf_ordered_writer.c @@ -52,64 +52,81 @@ static inline int flush_writer_files(SFOrderedWriterInfo *writer) return 0; } -static int deal_binlog_records(SFOrderedWriterContext *context, - SFOrderedWriterBuffer *wb_head) +static inline int deal_versioned_binlog(SFOrderedWriterContext *context) { + SFOrderedWriterBuffer *wb; int result; - SFOrderedWriterBuffer *wbuffer; - SFOrderedWriterBuffer *current; - wbuffer = wb_head; - do { - current = wbuffer; - wbuffer = wbuffer->next; - - context->writer.fw.total_count++; - if ((result=deal_binlog_one_record(&context->writer, current)) != 0) { + while (1) { + if ((wb=sorted_queue_pop(&context->thread.queues.buffer, + &context->thread.waiting)) != NULL) + { + context->writer.fw.total_count++; + result = deal_binlog_one_record(&context->writer, wb); + fast_mblock_free_object(&context->thread.allocators.buffer, wb); return result; } + } - fast_mblock_free_object(&context->thread.allocators.buffer, current); - } while (wbuffer != NULL); + return 0; +} +static int deal_version_chain(SFOrderedWriterContext *context, + struct fc_queue_info *qinfo) +{ + int result; + SFWriterVersionEntry *current_ver; + struct fast_mblock_node *prev_node; + struct fast_mblock_node *curr_node; + struct fast_mblock_chain node_chain; + + current_ver = qinfo->head; + prev_node = NULL; + do { + curr_node = fast_mblock_to_node_ptr(current_ver); + if (prev_node != NULL) { + prev_node->next = curr_node; + } + prev_node = curr_node; + + context->thread.waiting.version = current_ver->version; + if ((result=deal_versioned_binlog(context)) != 0) { + return result; + } + } while ((current_ver=current_ver->next) != NULL); + + node_chain.head = fast_mblock_to_node_ptr(qinfo->head); + node_chain.tail = prev_node; + prev_node->next = NULL; + fast_mblock_batch_free(&context->thread.allocators.version, &node_chain); return flush_writer_files(&context->writer); } -void sf_ordered_writer_finish(SFOrderedWriterInfo *writer) +void sf_ordered_writer_finish(SFOrderedWriterContext *ctx) { - //SFOrderedWriterBuffer *wb_head; int count; - if (writer->fw.file.name != NULL) { - sorted_queue_terminate(&writer->thread->queues.buffer); + if (ctx->writer.fw.file.name != NULL) { + fc_queue_terminate(&ctx->thread.queues.version); count = 0; - while (writer->thread->running && ++count < 300) { + while (ctx->thread.running && ++count < 300) { fc_sleep_ms(10); } - if (writer->thread->running) { + if (ctx->thread.running) { logWarning("file: "__FILE__", line: %d, " - "%s binlog write thread still running, " - "exit anyway!", __LINE__, writer->fw.cfg.subdir_name); + "%s binlog write thread still running, exit anyway!", + __LINE__, ctx->writer.fw.cfg.subdir_name); } - //TODO - /* - wb_head = (SFOrderedWriterBuffer *)sorted_queue_try_pop_all( - &writer->thread->queue); - if (wb_head != NULL) { - deal_binlog_records(writer->thread, wb_head); - } - */ - - free(writer->fw.file.name); - writer->fw.file.name = NULL; + free(ctx->writer.fw.file.name); + ctx->writer.fw.file.name = NULL; } - if (writer->fw.file.fd >= 0) { - close(writer->fw.file.fd); - writer->fw.file.fd = -1; + if (ctx->writer.fw.file.fd >= 0) { + close(ctx->writer.fw.file.fd); + ctx->writer.fw.file.fd = -1; } } @@ -117,7 +134,7 @@ static void *binlog_writer_func(void *arg) { SFOrderedWriterContext *context; SFOrderedWriterThread *thread; - SFOrderedWriterBuffer *wb_head; + struct fc_queue_info qinfo; context = (SFOrderedWriterContext *)arg; thread = &context->thread; @@ -133,15 +150,14 @@ static void *binlog_writer_func(void *arg) thread->running = true; while (SF_G_CONTINUE_FLAG) { - wb_head = NULL; - //wb_head = (SFOrderedWriterBuffer *)sorted_queue_pop_all(&thread->queue); - if (wb_head == NULL) { + fc_queue_pop_to_queue(&thread->queues.version, &qinfo); + if (qinfo.head== NULL) { continue; } - if (deal_binlog_records(context, wb_head) != 0) { + if (deal_version_chain(context, &qinfo) != 0) { logCrit("file: "__FILE__", line: %d, " - "deal_binlog_records fail, " + "deal_version_chain fail, " "program exit!", __LINE__); sf_terminate_myself(); } @@ -186,7 +202,7 @@ static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context, writer->thread = thread; if ((result=fast_mblock_init_ex1(&thread->allocators.version, - "writer-ver-info", sizeof(SFWriterVersionInfo), + "writer-ver-info", sizeof(SFWriterVersionEntry), 8 * 1024, 0, NULL, NULL, true)) != 0) { return result; @@ -201,7 +217,7 @@ static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context, } if ((result=fc_queue_init(&thread->queues.version, (unsigned long) - (&((SFWriterVersionInfo *)NULL)->next))) != 0) + (&((SFWriterVersionEntry *)NULL)->next))) != 0) { return result; } diff --git a/src/sf_ordered_writer.h b/src/sf_ordered_writer.h index 6f96bb6..8e84f82 100644 --- a/src/sf_ordered_writer.h +++ b/src/sf_ordered_writer.h @@ -21,10 +21,10 @@ #include "fastcommon/sorted_queue.h" #include "sf_file_writer.h" -typedef struct sf_writer_version_info { +typedef struct sf_writer_version_entry { int64_t version; - struct sf_writer_version_info *next; -} SFWriterVersionInfo; + struct sf_writer_version_entry *next; +} SFWriterVersionEntry; typedef struct sf_ordered_writer_buffer { int64_t version; @@ -44,6 +44,7 @@ typedef struct sf_orderd_writer_thread { } queues; char name[64]; volatile bool running; + SFOrderedWriterBuffer waiting; //for less equal than object } SFOrderedWriterThread; typedef struct sf_ordered_writer_info { @@ -65,33 +66,40 @@ int sf_ordered_writer_init(SFOrderedWriterContext *context, const char *data_path, const char *subdir_name, const int buffer_size, const int max_record_size); -#define sf_ordered_writer_set_flags(writer, flags) \ - sf_file_writer_set_flags(&(writer)->fw, flags) +#define sf_ordered_writer_set_flags(ctx, flags) \ + sf_file_writer_set_flags(&(ctx)->writer.fw, flags) -#define sf_ordered_writer_get_last_version(writer) \ - sf_ordered_writer_get_last_version(&(writer)->fw) +#define sf_ordered_writer_get_last_version(ctx) \ + sf_ordered_writer_get_last_version(&(ctx)->writer.fw) -void sf_ordered_writer_finish(SFOrderedWriterInfo *writer); +void sf_ordered_writer_finish(SFOrderedWriterContext *ctx); -#define sf_ordered_writer_get_current_index(writer) \ - sf_file_writer_get_current_index(&(writer)->fw) +#define sf_ordered_writer_get_current_index(ctx) \ + sf_file_writer_get_current_index(&(ctx)->writer.fw) -#define sf_ordered_writer_get_current_position(writer, position) \ - sf_file_writer_get_current_position(&(writer)->fw, position) +#define sf_ordered_writer_get_current_position(ctx, position) \ + sf_file_writer_get_current_position(&(ctx)->writer.fw, position) -static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer( - SFOrderedWriterThread *thread) +static inline int sf_ordered_writer_alloc_versions( + SFOrderedWriterContext *ctx, const int count, + struct fc_queue_info *chain) { - return (SFOrderedWriterBuffer *)fast_mblock_alloc_object( - &thread->allocators.buffer); + return fc_queue_alloc_chain(&ctx->thread.queues.version, + &ctx->thread.allocators.version, count, chain); } -static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_versioned_buffer_ex( - SFOrderedWriterInfo *writer, const int64_t version) +static inline void sf_ordered_writer_push_versions( + SFOrderedWriterContext *ctx, struct fc_queue_info *chain) +{ + fc_queue_push_queue_to_tail(&ctx->thread.queues.version, chain); +} + +static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer( + SFOrderedWriterContext *ctx, const int64_t version) { SFOrderedWriterBuffer *buffer; buffer = (SFOrderedWriterBuffer *)fast_mblock_alloc_object( - &writer->thread->allocators.buffer); + &ctx->thread.allocators.buffer); if (buffer != NULL) { buffer->version = version; } @@ -106,16 +114,16 @@ static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_versioned_buffer_ex sf_file_writer_get_filename(data_path, subdir_name, \ binlog_index, filename, size) -#define sf_ordered_writer_set_binlog_index(writer, binlog_index) \ - sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index) +#define sf_ordered_writer_set_binlog_index(ctx, binlog_index) \ + sf_file_writer_set_binlog_index(&(ctx)->writer.fw, binlog_index) -#define sf_push_to_binlog_thread_queue(thread, buffer) \ - sorted_queue_push(&(thread)->queues.buffer, buffer) +#define sf_push_to_binlog_thread_queue(ctx, buffer) \ + sorted_queue_push(&(ctx)->thread.queues.buffer, buffer) -static inline void sf_push_to_binlog_write_queue(SFOrderedWriterInfo *writer, +static inline void sf_push_to_binlog_write_queue(SFOrderedWriterContext *ctx, SFOrderedWriterBuffer *buffer) { - sorted_queue_push(&writer->thread->queues.buffer, buffer); + sorted_queue_push(&ctx->thread.queues.buffer, buffer); } #ifdef __cplusplus