diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c index 2c01c53..3de5770 100644 --- a/src/sf_binlog_writer.c +++ b/src/sf_binlog_writer.c @@ -13,18 +13,6 @@ * along with this program. If not, see . */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/shared_func.h" @@ -637,3 +625,59 @@ int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer) return sf_binlog_writer_push_directive(writer, SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0); } + +void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, + SFBinlogWriterBuffer *buffer) +{ + int64_t last_timestamp; + + last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp); + if (last_timestamp > 0 && g_current_time - last_timestamp > + writer->thread->flow_ctrol.max_delay) + { + time_t start_time; + time_t last_log_timestamp; + int time_used; + int log_level; + + start_time = g_current_time; + PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock); + writer->thread->flow_ctrol.waiting_count++; + last_timestamp = FC_ATOMIC_GET(writer->thread-> + flow_ctrol.last_timestamp); + while (last_timestamp > 0 && g_current_time - last_timestamp > + writer->thread->flow_ctrol.max_delay) + { + pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond, + &writer->thread->flow_ctrol.lcp.lock); + last_timestamp = FC_ATOMIC_GET(writer->thread-> + flow_ctrol.last_timestamp); + } + writer->thread->flow_ctrol.waiting_count--; + PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock); + + time_used = g_current_time - start_time; + if (time_used > 0) { + last_log_timestamp = FC_ATOMIC_GET( + LAST_BINLOG_WRITER_LOG_TIMESTAMP); + if (g_current_time != last_log_timestamp && + __sync_bool_compare_and_swap( + &LAST_BINLOG_WRITER_LOG_TIMESTAMP, + last_log_timestamp, g_current_time)) + { + if (time_used <= writer->thread->flow_ctrol.max_delay) { + log_level = LOG_DEBUG; + } else { + log_level = LOG_WARNING; + } + log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " + "subdir_name: %s, max_delay: %d s, flow ctrol waiting " + "time: %d s", __LINE__, writer->fw.cfg.subdir_name, + writer->thread->flow_ctrol.max_delay, time_used); + } + } + } + + buffer->timestamp = g_current_time; + fc_queue_push(&writer->thread->queue, buffer); +} diff --git a/src/sf_binlog_writer.h b/src/sf_binlog_writer.h index 23a96c3..6b39577 100644 --- a/src/sf_binlog_writer.h +++ b/src/sf_binlog_writer.h @@ -286,6 +286,9 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( return buffer; } +void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, + SFBinlogWriterBuffer *buffer); + #define sf_binlog_writer_get_filepath(data_path, subdir_name, filepath, size) \ sf_file_writer_get_filepath(data_path, subdir_name, filepath, size) @@ -328,62 +331,6 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex( #define sf_binlog_writer_set_binlog_write_index(writer, last_index) \ sf_file_writer_set_binlog_write_index(&(writer)->fw, last_index) -static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer, - SFBinlogWriterBuffer *buffer) -{ - int64_t last_timestamp; - - last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp); - if (last_timestamp > 0 && g_current_time - last_timestamp > - writer->thread->flow_ctrol.max_delay) - { - time_t start_time; - time_t last_log_timestamp; - int time_used; - int log_level; - - start_time = g_current_time; - PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock); - writer->thread->flow_ctrol.waiting_count++; - last_timestamp = FC_ATOMIC_GET(writer->thread-> - flow_ctrol.last_timestamp); - while (last_timestamp > 0 && g_current_time - last_timestamp > - writer->thread->flow_ctrol.max_delay) - { - pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond, - &writer->thread->flow_ctrol.lcp.lock); - last_timestamp = FC_ATOMIC_GET(writer->thread-> - flow_ctrol.last_timestamp); - } - writer->thread->flow_ctrol.waiting_count--; - PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock); - - time_used = g_current_time - start_time; - if (time_used > 0) { - last_log_timestamp = FC_ATOMIC_GET( - LAST_BINLOG_WRITER_LOG_TIMESTAMP); - if (g_current_time != last_log_timestamp && - __sync_bool_compare_and_swap( - &LAST_BINLOG_WRITER_LOG_TIMESTAMP, - last_log_timestamp, g_current_time)) - { - if (time_used <= writer->thread->flow_ctrol.max_delay) { - log_level = LOG_DEBUG; - } else { - log_level = LOG_WARNING; - } - log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " - "subdir_name: %s, max_delay: %d s, flow ctrol waiting " - "time: %d s", __LINE__, writer->fw.cfg.subdir_name, - writer->thread->flow_ctrol.max_delay, time_used); - } - } - } - - buffer->timestamp = g_current_time; - fc_queue_push(&writer->thread->queue, buffer); -} - #define sf_binlog_writer_get_last_lines(data_path, subdir_name, \ current_write_index, buff, buff_size, count, length) \ sf_file_writer_get_last_lines(data_path, subdir_name, \