sf_file_writer.[hc] support config call_fsync for performance

pull/6/head
YuQing 2023-06-10 14:31:48 +08:00
parent d5139804f9
commit 2a245a06aa
5 changed files with 63 additions and 18 deletions

View File

@ -194,6 +194,16 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
fast_mblock_free_object(&current->writer-> fast_mblock_free_object(&current->writer->
thread->mblock, current); thread->mblock, current);
break; break;
case SF_BINLOG_BUFFER_TYPE_CHANGE_PASSIVE_WRITE:
thread->passive_write = current->version.first;
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break;
case SF_BINLOG_BUFFER_TYPE_CHANGE_CALL_FSYNC:
current->writer->fw.cfg.call_fsync = current->version.first;
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break;
case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE: case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE:
if ((result=sf_file_writer_set_binlog_write_index(&current-> if ((result=sf_file_writer_set_binlog_write_index(&current->
writer->fw, current->writer->fw.binlog. writer->fw, current->writer->fw.binlog.
@ -201,11 +211,15 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
{ {
return result; return result;
} }
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break; break;
case SF_BINLOG_BUFFER_TYPE_FLUSH_FILE: case SF_BINLOG_BUFFER_TYPE_FLUSH_FILE:
if ((result=flush_writer_files(thread)) != 0) { if ((result=flush_writer_files(thread)) != 0) {
return result; return result;
} }
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break; break;
case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX: case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX:
if ((result=sf_file_writer_set_binlog_write_index(&current-> if ((result=sf_file_writer_set_binlog_write_index(&current->
@ -213,9 +227,13 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
{ {
return result; return result;
} }
fast_mblock_free_object(&current->writer->
thread->mblock, current);
break; break;
case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT: case SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT:
flush_writer_files(thread); flush_writer_files(thread);
fast_mblock_free_object(&current->writer->
thread->mblock, current);
return ERRNO_THREAD_EXIT; return ERRNO_THREAD_EXIT;
case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION: case SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION:
if (current->writer->order_by != if (current->writer->order_by !=
@ -255,7 +273,6 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
fast_mblock_free_object(&current->writer-> fast_mblock_free_object(&current->writer->
thread->mblock, current); thread->mblock, current);
break; break;
default: default:
current->writer->fw.total_count++; current->writer->fw.total_count++;
add_to_flush_writer_queue(thread, current->writer); add_to_flush_writer_queue(thread, current->writer);
@ -539,6 +556,22 @@ static inline int sf_binlog_writer_push_directive(SFBinlogWriterInfo *writer,
return 0; return 0;
} }
int sf_binlog_writer_change_passive_write(SFBinlogWriterInfo *writer,
const bool passive_write)
{
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_CHANGE_PASSIVE_WRITE,
passive_write);
}
int sf_binlog_writer_change_call_fsync(SFBinlogWriterInfo *writer,
const bool call_fsync)
{
return sf_binlog_writer_push_directive(writer,
SF_BINLOG_BUFFER_TYPE_CHANGE_CALL_FSYNC,
call_fsync);
}
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version) const int64_t next_version)
{ {

View File

@ -28,13 +28,15 @@
#define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0 #define SF_BINLOG_WRITER_TYPE_ORDER_BY_NONE 0
#define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1 #define SF_BINLOG_WRITER_TYPE_ORDER_BY_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0 #define SF_BINLOG_BUFFER_TYPE_WRITE_TO_FILE 0 //default type, must be 0
#define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1 #define SF_BINLOG_BUFFER_TYPE_SET_NEXT_VERSION 1
#define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2 #define SF_BINLOG_BUFFER_TYPE_CHANGE_ORDER_TYPE 2
#define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 3 #define SF_BINLOG_BUFFER_TYPE_CHANGE_PASSIVE_WRITE 3
#define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 4 #define SF_BINLOG_BUFFER_TYPE_CHANGE_CALL_FSYNC 4
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 5 #define SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX 5
#define SF_BINLOG_BUFFER_TYPE_FLUSH_FILE 6 #define SF_BINLOG_BUFFER_TYPE_ROTATE_FILE 6
#define SF_BINLOG_BUFFER_TYPE_NOTIFY_EXIT 7
#define SF_BINLOG_BUFFER_TYPE_FLUSH_FILE 8
#define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \ #define SF_BINLOG_BUFFER_SET_VERSION(buffer, ver) \
(buffer)->version.first = (buffer)->version.last = ver (buffer)->version.first = (buffer)->version.last = ver
@ -184,6 +186,12 @@ static inline void sf_binlog_writer_destroy(
int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_order_by(SFBinlogWriterInfo *writer,
const short order_by); const short order_by);
int sf_binlog_writer_change_passive_write(SFBinlogWriterInfo *writer,
const bool passive_write);
int sf_binlog_writer_change_call_fsync(SFBinlogWriterInfo *writer,
const bool call_fsync);
int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer, int sf_binlog_writer_change_next_version(SFBinlogWriterInfo *writer,
const int64_t next_version); const int64_t next_version);

View File

@ -232,14 +232,16 @@ static int do_write_to_file(SFFileWriterInfo *writer,
return result; return result;
} }
if (fsync(writer->file.fd) != 0) { if (writer->cfg.call_fsync) {
result = errno != 0 ? errno : EIO; if (fsync(writer->file.fd) != 0) {
logError("file: "__FILE__", line: %d, " result = errno != 0 ? errno : EIO;
"fsync to binlog file \"%s\" fail, " logError("file: "__FILE__", line: %d, "
"errno: %d, error info: %s", "fsync to binlog file \"%s\" fail, "
__LINE__, writer->file.name, "errno: %d, error info: %s",
result, STRERROR(result)); __LINE__, writer->file.name,
return result; result, STRERROR(result));
return result;
}
} }
writer->file.size += len; writer->file.size += len;
@ -382,6 +384,7 @@ int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path,
return result; return result;
} }
writer->cfg.call_fsync = true;
writer->cfg.file_rotate_size = file_rotate_size; writer->cfg.file_rotate_size = file_rotate_size;
writer->cfg.data_path = data_path; writer->cfg.data_path = data_path;
path_len = snprintf(filepath, sizeof(filepath), path_len = snprintf(filepath, sizeof(filepath),

View File

@ -40,6 +40,7 @@ typedef struct sf_file_writer_info {
char file_prefix[SF_BINLOG_FILE_PREFIX_SIZE]; char file_prefix[SF_BINLOG_FILE_PREFIX_SIZE];
int64_t file_rotate_size; int64_t file_rotate_size;
int max_record_size; int max_record_size;
bool call_fsync;
} cfg; } cfg;
struct { struct {

View File

@ -608,9 +608,9 @@ int sf_client_sock_read(int sock, short event, void *arg)
TCP_SET_QUICK_ACK(sock); TCP_SET_QUICK_ACK(sock);
total_read += bytes; total_read += bytes;
task->offset += bytes; task->offset += bytes;
if (task->length == 0) { //header if (task->length == 0) { //pkg header
if (task->offset < SF_CTX->header_size) { if (task->offset < SF_CTX->header_size) {
break; continue;
} }
if (SF_CTX->set_body_length(task) != 0) { if (SF_CTX->set_body_length(task) != 0) {