function sf_push_to_binlog_write_queue changed

pull/6/head
YuQing 2023-06-30 10:40:05 +08:00
parent 1abf7402ca
commit 5e8535db9c
2 changed files with 59 additions and 68 deletions

View File

@ -13,18 +13,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <fcntl.h>
#include <pthread.h>
#include "fastcommon/logger.h" #include "fastcommon/logger.h"
#include "fastcommon/sockopt.h" #include "fastcommon/sockopt.h"
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
@ -637,3 +625,59 @@ int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer)
return sf_binlog_writer_push_directive(writer, return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0); SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT, 0);
} }
void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
SFBinlogWriterBuffer *buffer)
{
int64_t last_timestamp;
last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp);
if (last_timestamp > 0 && g_current_time - last_timestamp >
writer->thread->flow_ctrol.max_delay)
{
time_t start_time;
time_t last_log_timestamp;
int time_used;
int log_level;
start_time = g_current_time;
PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock);
writer->thread->flow_ctrol.waiting_count++;
last_timestamp = FC_ATOMIC_GET(writer->thread->
flow_ctrol.last_timestamp);
while (last_timestamp > 0 && g_current_time - last_timestamp >
writer->thread->flow_ctrol.max_delay)
{
pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond,
&writer->thread->flow_ctrol.lcp.lock);
last_timestamp = FC_ATOMIC_GET(writer->thread->
flow_ctrol.last_timestamp);
}
writer->thread->flow_ctrol.waiting_count--;
PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock);
time_used = g_current_time - start_time;
if (time_used > 0) {
last_log_timestamp = FC_ATOMIC_GET(
LAST_BINLOG_WRITER_LOG_TIMESTAMP);
if (g_current_time != last_log_timestamp &&
__sync_bool_compare_and_swap(
&LAST_BINLOG_WRITER_LOG_TIMESTAMP,
last_log_timestamp, g_current_time))
{
if (time_used <= writer->thread->flow_ctrol.max_delay) {
log_level = LOG_DEBUG;
} else {
log_level = LOG_WARNING;
}
log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, "
"subdir_name: %s, max_delay: %d s, flow ctrol waiting "
"time: %d s", __LINE__, writer->fw.cfg.subdir_name,
writer->thread->flow_ctrol.max_delay, time_used);
}
}
}
buffer->timestamp = g_current_time;
fc_queue_push(&writer->thread->queue, buffer);
}

View File

@ -286,6 +286,9 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
return buffer; return buffer;
} }
void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
SFBinlogWriterBuffer *buffer);
#define sf_binlog_writer_get_filepath(data_path, subdir_name, filepath, size) \ #define sf_binlog_writer_get_filepath(data_path, subdir_name, filepath, size) \
sf_file_writer_get_filepath(data_path, subdir_name, filepath, size) sf_file_writer_get_filepath(data_path, subdir_name, filepath, size)
@ -328,62 +331,6 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
#define sf_binlog_writer_set_binlog_write_index(writer, last_index) \ #define sf_binlog_writer_set_binlog_write_index(writer, last_index) \
sf_file_writer_set_binlog_write_index(&(writer)->fw, last_index) sf_file_writer_set_binlog_write_index(&(writer)->fw, last_index)
static inline void sf_push_to_binlog_write_queue(SFBinlogWriterInfo *writer,
SFBinlogWriterBuffer *buffer)
{
int64_t last_timestamp;
last_timestamp = FC_ATOMIC_GET(writer->thread->flow_ctrol.last_timestamp);
if (last_timestamp > 0 && g_current_time - last_timestamp >
writer->thread->flow_ctrol.max_delay)
{
time_t start_time;
time_t last_log_timestamp;
int time_used;
int log_level;
start_time = g_current_time;
PTHREAD_MUTEX_LOCK(&writer->thread->flow_ctrol.lcp.lock);
writer->thread->flow_ctrol.waiting_count++;
last_timestamp = FC_ATOMIC_GET(writer->thread->
flow_ctrol.last_timestamp);
while (last_timestamp > 0 && g_current_time - last_timestamp >
writer->thread->flow_ctrol.max_delay)
{
pthread_cond_wait(&writer->thread->flow_ctrol.lcp.cond,
&writer->thread->flow_ctrol.lcp.lock);
last_timestamp = FC_ATOMIC_GET(writer->thread->
flow_ctrol.last_timestamp);
}
writer->thread->flow_ctrol.waiting_count--;
PTHREAD_MUTEX_UNLOCK(&writer->thread->flow_ctrol.lcp.lock);
time_used = g_current_time - start_time;
if (time_used > 0) {
last_log_timestamp = FC_ATOMIC_GET(
LAST_BINLOG_WRITER_LOG_TIMESTAMP);
if (g_current_time != last_log_timestamp &&
__sync_bool_compare_and_swap(
&LAST_BINLOG_WRITER_LOG_TIMESTAMP,
last_log_timestamp, g_current_time))
{
if (time_used <= writer->thread->flow_ctrol.max_delay) {
log_level = LOG_DEBUG;
} else {
log_level = LOG_WARNING;
}
log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, "
"subdir_name: %s, max_delay: %d s, flow ctrol waiting "
"time: %d s", __LINE__, writer->fw.cfg.subdir_name,
writer->thread->flow_ctrol.max_delay, time_used);
}
}
}
buffer->timestamp = g_current_time;
fc_queue_push(&writer->thread->queue, buffer);
}
#define sf_binlog_writer_get_last_lines(data_path, subdir_name, \ #define sf_binlog_writer_get_last_lines(data_path, subdir_name, \
current_write_index, buff, buff_size, count, length) \ current_write_index, buff, buff_size, count, length) \
sf_file_writer_get_last_lines(data_path, subdir_name, \ sf_file_writer_get_last_lines(data_path, subdir_name, \