sf_binlog_writer.c: flow control more rigorously

pull/6/head
YuQing 2023-07-30 10:11:00 +08:00
parent 27a7696867
commit e440273f35
1 changed files with 8 additions and 6 deletions

View File

@ -629,26 +629,28 @@ int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
SFBinlogWriterBuffer *buffer) SFBinlogWriterBuffer *buffer)
{ {
time_t current_time;
int64_t last_timestamp; int64_t last_timestamp;
current_time = g_current_time;
last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp); last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp);
if ((last_timestamp > 0 && g_current_time - last_timestamp > writer-> if ((last_timestamp > 0 && current_time - last_timestamp > writer->
thread->flow_ctrol.max_delay) && !(writer->order_by == thread->flow_ctrol.max_delay) && !(writer->order_by ==
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION && buffer-> SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION && buffer->
version.first - writer->version_ctx.next < 128)) version.first - writer->version_ctx.next < 128))
{ {
time_t start_time;
time_t last_log_timestamp; time_t last_log_timestamp;
int time_used; int time_used;
int log_level; int log_level;
start_time = g_current_time;
PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock); PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock);
writer->thread->flow_ctrol.waiting_count++; writer->thread->flow_ctrol.waiting_count++;
last_timestamp = FC_ATOMIC_GET(writer->thread-> last_timestamp = FC_ATOMIC_GET(writer->thread->
flow_ctrol.last_timestamp); flow_ctrol.last_timestamp);
while (last_timestamp > 0 && g_current_time - last_timestamp > while ((last_timestamp > 0 && current_time - last_timestamp > writer->
writer->thread->flow_ctrol.max_delay) thread->flow_ctrol.max_delay) && !(writer->order_by ==
SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION && buffer->
version.first - writer->version_ctx.next < 128))
{ {
pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond, pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond,
&writer->thread->flow_ctrol.lcp.lock); &writer->thread->flow_ctrol.lcp.lock);
@ -658,7 +660,7 @@ void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
writer->thread->flow_ctrol.waiting_count--; writer->thread->flow_ctrol.waiting_count--;
PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock); PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock);
time_used = g_current_time - start_time; time_used = g_current_time - current_time;
if (time_used > 0) { if (time_used > 0) {
last_log_timestamp = FC_ATOMIC_GET( last_log_timestamp = FC_ATOMIC_GET(
LAST_BINLOG_WRITER_LOG_TIMESTAMP); LAST_BINLOG_WRITER_LOG_TIMESTAMP);