sf_binlog_writer.[hc]: use config max_delay for flow control
parent
c9fba3b9a7
commit
a95f4cc725
|
|
@ -177,7 +177,7 @@ static inline int flush_writer_files(SFBinlogWriterThread *thread)
|
|||
}
|
||||
|
||||
static int deal_binlog_records(SFBinlogWriterThread *thread,
|
||||
SFBinlogWriterBuffer *wb_head)
|
||||
SFBinlogWriterBuffer *wb_head, uint32_t *last_timestamp)
|
||||
{
|
||||
int result;
|
||||
SFBinlogWriterBuffer *wbuffer;
|
||||
|
|
@ -187,6 +187,9 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
|
|||
do {
|
||||
current = wbuffer;
|
||||
wbuffer = wbuffer->next;
|
||||
if (wbuffer == NULL) {
|
||||
*last_timestamp = current->timestamp;
|
||||
}
|
||||
|
||||
switch (current->type) {
|
||||
case SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE:
|
||||
|
|
@ -306,6 +309,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
|
|||
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer)
|
||||
{
|
||||
SFBinlogWriterBuffer *wb_head;
|
||||
uint32_t last_timestamp;
|
||||
int count;
|
||||
|
||||
if (writer->fw.file.name != NULL) {
|
||||
|
|
@ -332,7 +336,8 @@ void sf_binlog_writer_finish(SFBinlogWriterInfo *writer)
|
|||
wb_head = (SFBinlogWriterBuffer *)fc_queue_try_pop_all(
|
||||
&writer->thread->queue);
|
||||
if (wb_head != NULL) {
|
||||
deal_binlog_records(writer->thread, wb_head);
|
||||
last_timestamp = 0;
|
||||
deal_binlog_records(writer->thread, wb_head, &last_timestamp);
|
||||
}
|
||||
|
||||
free(writer->fw.file.name);
|
||||
|
|
@ -349,6 +354,8 @@ static void *binlog_writer_func(void *arg)
|
|||
{
|
||||
SFBinlogWriterThread *thread;
|
||||
SFBinlogWriterBuffer *wb_head;
|
||||
uint32_t current_timestamp;
|
||||
uint32_t last_timestamp;
|
||||
int result;
|
||||
|
||||
thread = (SFBinlogWriterThread *)arg;
|
||||
|
|
@ -362,6 +369,7 @@ static void *binlog_writer_func(void *arg)
|
|||
}
|
||||
#endif
|
||||
|
||||
current_timestamp = last_timestamp = 0;
|
||||
thread->running = true;
|
||||
while (SF_G_CONTINUE_FLAG) {
|
||||
wb_head = (SFBinlogWriterBuffer *)fc_queue_pop_all(&thread->queue);
|
||||
|
|
@ -369,7 +377,9 @@ static void *binlog_writer_func(void *arg)
|
|||
continue;
|
||||
}
|
||||
|
||||
if ((result=deal_binlog_records(thread, wb_head)) != 0) {
|
||||
if ((result=deal_binlog_records(thread, wb_head,
|
||||
¤t_timestamp)) != 0)
|
||||
{
|
||||
if (result != ERRNO_THREAD_EXIT) {
|
||||
logCrit("file: "__FILE__", line: %d, "
|
||||
"deal_binlog_records fail, "
|
||||
|
|
@ -378,6 +388,23 @@ static void *binlog_writer_func(void *arg)
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (fc_queue_empty(&thread->queue)) {
|
||||
current_timestamp = 0;
|
||||
}
|
||||
if (current_timestamp == 0 || current_timestamp > last_timestamp) {
|
||||
if (current_timestamp != last_timestamp) {
|
||||
last_timestamp = current_timestamp;
|
||||
FC_ATOMIC_SET(thread->flow_ctrol.last_timestamp,
|
||||
current_timestamp);
|
||||
}
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&thread->flow_ctrol.lcp.lock);
|
||||
if (thread->flow_ctrol.waiting_count > 0) {
|
||||
pthread_cond_broadcast(&thread->flow_ctrol.lcp.cond);
|
||||
}
|
||||
PTHREAD_MUTEX_UNLOCK(&thread->flow_ctrol.lcp.lock);
|
||||
}
|
||||
}
|
||||
|
||||
thread->running = false;
|
||||
|
|
@ -452,8 +479,8 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer,
|
|||
|
||||
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
|
||||
const char *name, SFBinlogWriterInfo *writer, const short order_mode,
|
||||
const int max_record_size, const bool use_fixed_buffer_size,
|
||||
const bool passive_write)
|
||||
const int max_delay, const int max_record_size, const bool
|
||||
use_fixed_buffer_size, const bool passive_write)
|
||||
{
|
||||
const int alloc_elements_once = 1024;
|
||||
const int64_t alloc_elements_limit = 0;
|
||||
|
|
@ -467,9 +494,9 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
|
|||
thread->order_mode = order_mode;
|
||||
thread->use_fixed_buffer_size = use_fixed_buffer_size;
|
||||
thread->passive_write = passive_write;
|
||||
thread->flow_ctrol.max_delay = max_delay;
|
||||
writer->fw.cfg.max_record_size = max_record_size;
|
||||
writer->thread = thread;
|
||||
|
||||
callbacks.init_func = binlog_wbuffer_alloc_init;
|
||||
callbacks.args = writer;
|
||||
element_size = sizeof(SFBinlogWriterBuffer);
|
||||
|
|
@ -492,6 +519,12 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
|
|||
return result;
|
||||
}
|
||||
|
||||
thread->flow_ctrol.last_timestamp = 0;
|
||||
thread->flow_ctrol.waiting_count = 0;
|
||||
if ((result=init_pthread_lock_cond_pair(&thread->flow_ctrol.lcp)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
thread->flush_writers.head = thread->flush_writers.tail = NULL;
|
||||
return fc_create_thread(&tid, binlog_writer_func, thread,
|
||||
SF_G_THREAD_STACK_SIZE);
|
||||
|
|
@ -537,7 +570,7 @@ int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
|
|||
return ENOMEM;
|
||||
}
|
||||
|
||||
fc_queue_push(&writer->thread->queue, buffer);
|
||||
sf_push_to_binlog_write_queue(writer, buffer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -552,7 +585,7 @@ static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer,
|
|||
return ENOMEM;
|
||||
}
|
||||
|
||||
fc_queue_push(&writer->thread->queue, buffer);
|
||||
sf_push_to_binlog_write_queue(writer, buffer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@
|
|||
#define _SF_BINLOG_WRITER_H_
|
||||
|
||||
#include "fastcommon/fc_queue.h"
|
||||
#include "fastcommon/fc_atomic.h"
|
||||
#include "sf_types.h"
|
||||
#include "sf_file_writer.h"
|
||||
|
||||
|
|
@ -46,7 +47,8 @@ struct sf_binlog_writer_info;
|
|||
typedef struct sf_binlog_writer_buffer {
|
||||
SFVersionRange version;
|
||||
BufferInfo bf;
|
||||
int type; //for versioned writer
|
||||
int type;
|
||||
uint32_t timestamp; //for flow ctrol
|
||||
struct sf_binlog_writer_info *writer;
|
||||
struct sf_binlog_writer_buffer *next;
|
||||
} SFBinlogWriterBuffer;
|
||||
|
|
@ -70,6 +72,12 @@ typedef struct binlog_writer_thread {
|
|||
bool use_fixed_buffer_size;
|
||||
bool passive_write;
|
||||
char order_mode;
|
||||
struct {
|
||||
int max_delay; //in seconds
|
||||
volatile uint32_t last_timestamp;
|
||||
int waiting_count;
|
||||
pthread_lock_cond_pair_t lcp;
|
||||
} flow_ctrol;
|
||||
struct {
|
||||
struct sf_binlog_writer_info *head;
|
||||
struct sf_binlog_writer_info *tail;
|
||||
|
|
@ -115,8 +123,8 @@ int sf_binlog_writer_init_by_version_ex(SFBinlogWriterInfo *writer,
|
|||
|
||||
int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
|
||||
const char *name, SFBinlogWriterInfo *writer, const short order_mode,
|
||||
const int max_record_size, const bool use_fixed_buffer_size,
|
||||
const bool passive_write);
|
||||
const int max_delay, const int max_record_size, const bool
|
||||
use_fixed_buffer_size, const bool passive_write);
|
||||
|
||||
#define sf_binlog_writer_init_normal(writer, \
|
||||
data_path, subdir_name, buffer_size) \
|
||||
|
|
@ -129,15 +137,16 @@ int sf_binlog_writer_init_thread_ex(SFBinlogWriterThread *thread,
|
|||
SF_BINLOG_FILE_PREFIX, next_version, buffer_size, \
|
||||
ring_size, SF_BINLOG_DEFAULT_ROTATE_SIZE)
|
||||
|
||||
#define sf_binlog_writer_init_thread(thread, name, writer, max_record_size) \
|
||||
#define sf_binlog_writer_init_thread(thread, name, \
|
||||
writer, max_delay, max_record_size) \
|
||||
sf_binlog_writer_init_thread_ex(thread, name, writer, \
|
||||
SF_BINLOG_THREAD_ORDER_MODE_FIXED, \
|
||||
SF_BINLOG_THREAD_ORDER_MODE_FIXED, max_delay, \
|
||||
max_record_size, true, false)
|
||||
|
||||
static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context,
|
||||
const char *data_path, const char *subdir_name,
|
||||
const char *file_prefix, const int buffer_size,
|
||||
const int max_record_size)
|
||||
const int max_delay, const int max_record_size)
|
||||
{
|
||||
int result;
|
||||
if ((result=sf_binlog_writer_init_normal_ex(&context->writer,
|
||||
|
|
@ -147,14 +156,14 @@ static inline int sf_binlog_writer_init_ex(SFBinlogWriterContext *context,
|
|||
return result;
|
||||
}
|
||||
|
||||
return sf_binlog_writer_init_thread(&context->thread,
|
||||
subdir_name, &context->writer, max_record_size);
|
||||
return sf_binlog_writer_init_thread(&context->thread, subdir_name,
|
||||
&context->writer, max_delay, max_record_size);
|
||||
}
|
||||
|
||||
#define sf_binlog_writer_init(context, data_path, \
|
||||
subdir_name, buffer_size, max_record_size) \
|
||||
#define sf_binlog_writer_init(context, data_path, subdir_name, \
|
||||
buffer_size, max_delay, max_record_size) \
|
||||
sf_binlog_writer_init_ex(context, data_path, subdir_name, \
|
||||
SF_BINLOG_FILE_PREFIX, buffer_size, max_record_size)
|
||||
SF_BINLOG_FILE_PREFIX, buffer_size, max_delay, max_record_size)
|
||||
|
||||
void sf_binlog_writer_finish(SFBinlogWriterInfo *writer);
|
||||
|
||||
|
|
@ -240,7 +249,12 @@ int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer);
|
|||
static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_buffer(
|
||||
SFBinlogWriterThread *thread)
|
||||
{
|
||||
return (SFBinlogWriterBuffer *)fast_mblock_alloc_object(&thread->mblock);
|
||||
SFBinlogWriterBuffer *buffer;
|
||||
|
||||
if ((buffer=fast_mblock_alloc_object(&thread->mblock)) != NULL) {
|
||||
buffer->type = SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE;
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
#define sf_binlog_writer_alloc_one_version_buffer(writer, version) \
|
||||
|
|
@ -257,6 +271,7 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
|
|||
const int64_t last_version, const int type)
|
||||
{
|
||||
SFBinlogWriterBuffer *buffer;
|
||||
|
||||
buffer = (SFBinlogWriterBuffer *)fast_mblock_alloc_object(
|
||||
&writer->thread->mblock);
|
||||
if (buffer != NULL) {
|
||||
|
|
@ -310,13 +325,32 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
|
|||
#define sf_binlog_writer_set_binlog_write_index(writer, last_index) \
|
||||
sf_file_writer_set_binlog_write_index(&(writer)->fw, last_index)
|
||||
|
||||
#define sf_push_to_binlog_thread_queue(thread, buffer) \
|
||||
fc_queue_push(&(thread)->queue, buffer)
|
||||
|
||||
static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
|
||||
SFBinlogWriterBuffer *buffer)
|
||||
{
|
||||
buffer->type = SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE;
|
||||
int64_t last_timestamp;
|
||||
|
||||
last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp);
|
||||
if (last_timestamp > 0 && g_current_time - last_timestamp >
|
||||
writer->thread->flow_ctrol.max_delay)
|
||||
{
|
||||
PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock);
|
||||
writer->thread->flow_ctrol.waiting_count++;
|
||||
last_timestamp = FC_ATOMIC_GET(writer->thread->
|
||||
flow_ctrol.last_timestamp);
|
||||
while (last_timestamp > 0 && g_current_time - last_timestamp >
|
||||
writer->thread->flow_ctrol.max_delay)
|
||||
{
|
||||
pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond,
|
||||
&writer->thread->flow_ctrol.lcp.lock);
|
||||
last_timestamp = FC_ATOMIC_GET(writer->thread->
|
||||
flow_ctrol.last_timestamp);
|
||||
}
|
||||
writer->thread->flow_ctrol.waiting_count--;
|
||||
PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock);
|
||||
}
|
||||
|
||||
buffer->timestamp = g_current_time;
|
||||
fc_queue_push(&writer->thread->queue, buffer);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -124,10 +124,11 @@ static inline SFOrderedWriterBuffer *sf_ordered_writer_alloc_buffer(
|
|||
#define sf_ordered_writer_set_binlog_index(ctx, binlog_index) \
|
||||
sf_file_writer_set_binlog_index(&(ctx)->writer.fw, binlog_index)
|
||||
|
||||
#define sf_push_to_binlog_thread_queue(ctx, buffer) \
|
||||
#define sf_ordered_writer_push_to_thread_queue(ctx, buffer) \
|
||||
sorted_queue_push(&(ctx)->thread.queues.buffer, buffer)
|
||||
|
||||
static inline void sf_push_to_binlog_write_queue(SFOrderedWriterContext *ctx,
|
||||
static inline void sf_ordered_writer_push_to_queue(
|
||||
SFOrderedWriterContext *ctx,
|
||||
SFOrderedWriterBuffer *buffer)
|
||||
{
|
||||
sorted_queue_push(&ctx->thread.queues.buffer, buffer);
|
||||
|
|
|
|||
Loading…
Reference in New Issue