sf_ordered_writer.[hc] impl

storage_engine
YuQing 2021-09-22 10:53:38 +08:00
parent d2b828bd7a
commit ae832465a1
2 changed files with 92 additions and 68 deletions

View File

@ -52,64 +52,81 @@ static inline int flush_writer_files(SFOrderedWriterInfo *writer)
return 0;
}
static int deal_binlog_records(SFOrderedWriterContext *context,
SFOrderedWriterBuffer *wb_head)
static inline int deal_versioned_binlog(SFOrderedWriterContext *context)
{
SFOrderedWriterBuffer *wb;
int result;
SFOrderedWriterBuffer *wbuffer;
SFOrderedWriterBuffer *current;
wbuffer = wb_head;
do {
current = wbuffer;
wbuffer = wbuffer->next;
context->writer.fw.total_count++;
if ((result=deal_binlog_one_record(&context->writer, current)) != 0) {
while (1) {
if ((wb=sorted_queue_pop(&context->thread.queues.buffer,
&context->thread.waiting)) != NULL)
{
context->writer.fw.total_count++;
result = deal_binlog_one_record(&context->writer, wb);
fast_mblock_free_object(&context->thread.allocators.buffer, wb);
return result;
}
}
fast_mblock_free_object(&context->thread.allocators.buffer, current);
} while (wbuffer != NULL);
return 0;
}
static int deal_version_chain(SFOrderedWriterContext *context,
struct fc_queue_info *qinfo)
{
int result;
SFWriterVersionEntry *current_ver;
struct fast_mblock_node *prev_node;
struct fast_mblock_node *curr_node;
struct fast_mblock_chain node_chain;
current_ver = qinfo->head;
prev_node = NULL;
do {
curr_node = fast_mblock_to_node_ptr(current_ver);
if (prev_node != NULL) {
prev_node->next = curr_node;
}
prev_node = curr_node;
context->thread.waiting.version = current_ver->version;
if ((result=deal_versioned_binlog(context)) != 0) {
return result;
}
} while ((current_ver=current_ver->next) != NULL);
node_chain.head = fast_mblock_to_node_ptr(qinfo->head);
node_chain.tail = prev_node;
prev_node->next = NULL;
fast_mblock_batch_free(&context->thread.allocators.version, &node_chain);
return flush_writer_files(&context->writer);
}
void sf_ordered_writer_finish(SFOrderedWriterInfo *writer)
void sf_ordered_writer_finish(SFOrderedWriterContext *ctx)
{
//SFOrderedWriterBuffer *wb_head;
int count;
if (writer->fw.file.name != NULL) {
sorted_queue_terminate(&writer->thread->queues.buffer);
if (ctx->writer.fw.file.name != NULL) {
fc_queue_terminate(&ctx->thread.queues.version);
count = 0;
while (writer->thread->running && ++count < 300) {
while (ctx->thread.running && ++count < 300) {
fc_sleep_ms(10);
}
if (writer->thread->running) {
if (ctx->thread.running) {
logWarning("file: "__FILE__", line: %d, "
"%s binlog write thread still running, "
"exit anyway!", __LINE__, writer->fw.cfg.subdir_name);
"%s binlog write thread still running, exit anyway!",
__LINE__, ctx->writer.fw.cfg.subdir_name);
}
//TODO
/*
wb_head = (SFOrderedWriterBuffer *)sorted_queue_try_pop_all(
&writer->thread->queue);
if (wb_head != NULL) {
deal_binlog_records(writer->thread, wb_head);
}
*/
free(writer->fw.file.name);
writer->fw.file.name = NULL;
free(ctx->writer.fw.file.name);
ctx->writer.fw.file.name = NULL;
}
if (writer->fw.file.fd >= 0) {
close(writer->fw.file.fd);
writer->fw.file.fd = -1;
if (ctx->writer.fw.file.fd >= 0) {
close(ctx->writer.fw.file.fd);
ctx->writer.fw.file.fd = -1;
}
}
@ -117,7 +134,7 @@ static void *binlog_writer_func(void *arg)
{
SFOrderedWriterContext *context;
SFOrderedWriterThread *thread;
SFOrderedWriterBuffer *wb_head;
struct fc_queue_info qinfo;
context = (SFOrderedWriterContext *)arg;
thread = &context->thread;
@ -133,15 +150,14 @@ static void *binlog_writer_func(void *arg)
thread->running = true;
while (SF_G_CONTINUE_FLAG) {
wb_head = NULL;
//wb_head = (SFOrderedWriterBuffer *)sorted_queue_pop_all(&thread->queue);
if (wb_head == NULL) {
fc_queue_pop_to_queue(&thread->queues.version, &qinfo);
if (qinfo.head== NULL) {
continue;
}
if (deal_binlog_records(context, wb_head) != 0) {
if (deal_version_chain(context, &qinfo) != 0) {
logCrit("file: "__FILE__", line: %d, "
"deal_binlog_records fail, "
"deal_version_chain fail, "
"program exit!", __LINE__);
sf_terminate_myself();
}
@ -186,7 +202,7 @@ static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context,
writer->thread = thread;
if ((result=fast_mblock_init_ex1(&thread->allocators.version,
"writer-ver-info", sizeof(SFWriterVersionInfo),
"writer-ver-info", sizeof(SFWriterVersionEntry),
8 * 1024, 0, NULL, NULL, true)) != 0)
{
return result;
@ -201,7 +217,7 @@ static int sf_ordered_writer_init_thread(SFOrderedWriterContext *context,
}
if ((result=fc_queue_init(&thread->queues.version, (unsigned long)
(&((SFWriterVersionInfo *)NULL)->next))) != 0)
(&((SFWriterVersionEntry *)NULL)->next))) != 0)
{
return result;
}

View File

@ -21,10 +21,10 @@
#include "fastcommon/sorted_queue.h"
#include "sf_file_writer.h"
typedef struct sf_writer_version_info {
typedef struct sf_writer_version_entry {
int64_t version;
struct sf_writer_version_info *next;
} SFWriterVersionInfo;
struct sf_writer_version_entry *next;
} SFWriterVersionEntry;
typedef struct sf_ordered_writer_buffer {
int64_t version;
@ -44,6 +44,7 @@ typedef struct sf_orderd_writer_thread {
} queues;
char name[64];
volatile bool running;
SFOrderedWriterBuffer waiting; //for less equal than object
} SFOrderedWriterThread;
typedef struct sf_ordered_writer_info {
@ -65,33 +66,40 @@ int sf_ordered_writer_init(SFOrderedWriterContext *context,
const char *data_path, const char *subdir_name,
const int buffer_size, const int max_record_size);
#define sf_ordered_writer_set_flags(writer, flags) \
sf_file_writer_set_flags(&(writer)->fw, flags)
#define sf_ordered_writer_set_flags(ctx, flags) \
sf_file_writer_set_flags(&(ctx)->writer.fw, flags)
#define sf_ordered_writer_get_last_version(writer) \
sf_ordered_writer_get_last_version(&(writer)->fw)
#define sf_ordered_writer_get_last_version(ctx) \
sf_ordered_writer_get_last_version(&(ctx)->writer.fw)
void sf_ordered_writer_finish(SFOrderedWriterInfo *writer);
void sf_ordered_writer_finish(SFOrderedWriterContext *ctx);
#define sf_ordered_writer_get_current_index(writer) \
sf_file_writer_get_current_index(&(writer)->fw)
#define sf_ordered_writer_get_current_index(ctx) \
sf_file_writer_get_current_index(&(ctx)->writer.fw)
#define sf_ordered_writer_get_current_position(writer, position) \
sf_file_writer_get_current_position(&(writer)->fw, position)
#define sf_ordered_writer_get_current_position(ctx, position) \
sf_file_writer_get_current_position(&(ctx)->writer.fw, position)
static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer(
SFOrderedWriterThread *thread)
static inline int sf_ordered_writer_alloc_versions(
SFOrderedWriterContext *ctx, const int count,
struct fc_queue_info *chain)
{
return (SFOrderedWriterBuffer *)fast_mblock_alloc_object(
&thread->allocators.buffer);
return fc_queue_alloc_chain(&ctx->thread.queues.version,
&ctx->thread.allocators.version, count, chain);
}
static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_versioned_buffer_ex(
SFOrderedWriterInfo *writer, const int64_t version)
static inline void sf_ordered_writer_push_versions(
SFOrderedWriterContext *ctx, struct fc_queue_info *chain)
{
fc_queue_push_queue_to_tail(&ctx->thread.queues.version, chain);
}
static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer(
SFOrderedWriterContext *ctx, const int64_t version)
{
SFOrderedWriterBuffer *buffer;
buffer = (SFOrderedWriterBuffer *)fast_mblock_alloc_object(
&writer->thread->allocators.buffer);
&ctx->thread.allocators.buffer);
if (buffer != NULL) {
buffer->version = version;
}
@ -106,16 +114,16 @@ static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_versioned_buffer_ex
sf_file_writer_get_filename(data_path, subdir_name, \
binlog_index, filename, size)
#define sf_ordered_writer_set_binlog_index(writer, binlog_index) \
sf_file_writer_set_binlog_index(&(writer)->fw, binlog_index)
#define sf_ordered_writer_set_binlog_index(ctx, binlog_index) \
sf_file_writer_set_binlog_index(&(ctx)->writer.fw, binlog_index)
#define sf_push_to_binlog_thread_queue(thread, buffer) \
sorted_queue_push(&(thread)->queues.buffer, buffer)
#define sf_push_to_binlog_thread_queue(ctx, buffer) \
sorted_queue_push(&(ctx)->thread.queues.buffer, buffer)
static inline void sf_push_to_binlog_write_queue(SFOrderedWriterInfo *writer,
static inline void sf_push_to_binlog_write_queue(SFOrderedWriterContext *ctx,
SFOrderedWriterBuffer *buffer)
{
sorted_queue_push(&writer->thread->queues.buffer, buffer);
sorted_queue_push(&ctx->thread.queues.buffer, buffer);
}
#ifdef __cplusplus