correct macros for struct SFBinlogBuffer

use_iouring
YuQing 2024-02-15 15:08:28 +08:00
parent 9d3a92d7da
commit d5a9f40a66
4 changed files with 26 additions and 20 deletions

View File

@ -30,8 +30,11 @@ typedef struct {
#define sf_buffered_writer_init(writer, filename) \ #define sf_buffered_writer_init(writer, filename) \
sf_buffered_writer_init_ex(writer, filename, 1024 * 1024) sf_buffered_writer_init_ex(writer, filename, 1024 * 1024)
#define SF_BUFFERED_WRITER_LENGTH(bw) SF_BINLOG_BUFFER_LENGTH((bw).buffer) #define SF_BUFFERED_WRITER_LENGTH(bw) \
#define SF_BUFFERED_WRITER_REMAIN(bw) SF_BINLOG_BUFFER_REMAIN((bw).buffer) SF_BINLOG_BUFFER_PRODUCER_DATA_LENGTH((bw).buffer)
#define SF_BUFFERED_WRITER_REMAIN(bw) \
SF_BINLOG_BUFFER_PRODUCER_BUFF_REMAIN((bw).buffer)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -56,7 +59,6 @@ extern "C" {
if ((result=sf_binlog_buffer_init(&writer->buffer, buffer_size)) != 0) { if ((result=sf_binlog_buffer_init(&writer->buffer, buffer_size)) != 0) {
return result; return result;
} }
writer->buffer.end = writer->buffer.buff + writer->buffer.size;
return 0; return 0;
} }
@ -65,7 +67,7 @@ extern "C" {
int result; int result;
int length; int length;
length = writer->buffer.current - writer->buffer.buff; length = writer->buffer.data_end - writer->buffer.buff;
if (fc_safe_write(writer->fd, writer->buffer.buff, length) != length) { if (fc_safe_write(writer->fd, writer->buffer.buff, length) != length) {
result = errno != 0 ? errno : EIO; result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
@ -74,7 +76,7 @@ extern "C" {
return result; return result;
} }
writer->buffer.current = writer->buffer.buff; writer->buffer.data_end = writer->buffer.buff;
return 0; return 0;
} }

View File

@ -279,7 +279,7 @@ int sf_file_writer_flush(SFFileWriterInfo *writer)
int result; int result;
int len; int len;
len = SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer); len = SF_BINLOG_BUFFER_PRODUCER_DATA_LENGTH(writer->binlog_buffer);
if (len == 0) { if (len == 0) {
return 0; return 0;
} }
@ -292,7 +292,7 @@ int sf_file_writer_flush(SFFileWriterInfo *writer)
} }
} }
writer->binlog_buffer.end = writer->binlog_buffer.buff; writer->binlog_buffer.data_end = writer->binlog_buffer.buff;
return result; return result;
} }
@ -324,7 +324,7 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
int result; int result;
if (buffer->length >= writer->binlog_buffer.size / 4) { if (buffer->length >= writer->binlog_buffer.size / 4) {
if (SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer) > 0) { if (SF_BINLOG_BUFFER_PRODUCER_DATA_LENGTH(writer->binlog_buffer) > 0) {
if ((result=sf_file_writer_flush(writer)) != 0) { if ((result=sf_file_writer_flush(writer)) != 0) {
return result; return result;
} }
@ -342,13 +342,13 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
} }
if (writer->cfg.file_rotate_size > 0 && writer->file.size + if (writer->cfg.file_rotate_size > 0 && writer->file.size +
SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer) + SF_BINLOG_BUFFER_PRODUCER_DATA_LENGTH(writer->binlog_buffer) +
buffer->length > writer->cfg.file_rotate_size) buffer->length > writer->cfg.file_rotate_size)
{ {
if ((result=sf_file_writer_flush(writer)) != 0) { if ((result=sf_file_writer_flush(writer)) != 0) {
return result; return result;
} }
} else if (writer->binlog_buffer.size - SF_BINLOG_BUFFER_LENGTH( } else if (SF_BINLOG_BUFFER_PRODUCER_BUFF_REMAIN(
writer->binlog_buffer) < buffer->length) writer->binlog_buffer) < buffer->length)
{ {
if ((result=sf_file_writer_flush(writer)) != 0) { if ((result=sf_file_writer_flush(writer)) != 0) {
@ -359,8 +359,8 @@ int sf_file_writer_deal_versioned_buffer(SFFileWriterInfo *writer,
if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) { if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
writer->last_versions.pending = version; writer->last_versions.pending = version;
} }
memcpy(writer->binlog_buffer.end, buffer->buff, buffer->length); memcpy(writer->binlog_buffer.data_end, buffer->buff, buffer->length);
writer->binlog_buffer.end += buffer->length; writer->binlog_buffer.data_end += buffer->length;
return 0; return 0;
} }

View File

@ -56,7 +56,8 @@ static inline int sf_binlog_buffer_init(SFBinlogBuffer *buffer, const int size)
return ENOMEM; return ENOMEM;
} }
buffer->current = buffer->end = buffer->buff; buffer->current = buffer->data_end = buffer->buff;
buffer->buff_end = buffer->buff + size;
buffer->size = size; buffer->size = size;
return 0; return 0;
} }
@ -65,7 +66,8 @@ static inline void sf_binlog_buffer_destroy(SFBinlogBuffer *buffer)
{ {
if (buffer->buff != NULL) { if (buffer->buff != NULL) {
free(buffer->buff); free(buffer->buff);
buffer->current = buffer->end = buffer->buff = NULL; buffer->current = buffer->buff = NULL;
buffer->data_end = buffer->buff_end = NULL;
buffer->size = 0; buffer->size = 0;
} }
} }

View File

@ -42,8 +42,9 @@
#define SF_SOCKET_NETWORK_HANDLER_INDEX 0 #define SF_SOCKET_NETWORK_HANDLER_INDEX 0
#define SF_RDMACM_NETWORK_HANDLER_INDEX 1 #define SF_RDMACM_NETWORK_HANDLER_INDEX 1
#define SF_BINLOG_BUFFER_LENGTH(buffer) ((buffer).current - (buffer).buff) #define SF_BINLOG_BUFFER_PRODUCER_DATA_LENGTH(bf) ((bf).data_end - (bf).buff)
#define SF_BINLOG_BUFFER_REMAIN(buffer) ((buffer).end - (buffer).current) #define SF_BINLOG_BUFFER_PRODUCER_BUFF_REMAIN(bf) ((bf).buff_end - (bf).data_end)
#define SF_BINLOG_BUFFER_CONSUMER_DATA_REMAIN(bf) ((bf).data_end - (bf).current)
typedef int (*sf_accept_done_callback)(struct fast_task_info *task, typedef int (*sf_accept_done_callback)(struct fast_task_info *task,
const in_addr_64_t client_addr, const bool bInnerPort); const in_addr_64_t client_addr, const bool bInnerPort);
@ -224,7 +225,8 @@ typedef struct sf_binlog_file_position {
typedef struct server_binlog_buffer { typedef struct server_binlog_buffer {
char *buff; //the buffer pointer char *buff; //the buffer pointer
char *current; //for the consumer char *current; //for the consumer
char *end; //data end ptr char *data_end; //data end ptr
char *buff_end; //buffer end ptr
int size; //the buffer size (capacity) int size; //the buffer size (capacity)
} SFBinlogBuffer; } SFBinlogBuffer;