sf_file_writer.[hc]: support write done callback

use_iouring
YuQing 2025-02-09 09:26:34 +08:00
parent b6e24d0548
commit 318640572f
3 changed files with 28 additions and 0 deletions

View File

@ -247,6 +247,9 @@ int sf_binlog_writer_notify_exit(SFBinlogWriterInfo *writer);
#define sf_binlog_writer_set_flags(writer, flags) \ #define sf_binlog_writer_set_flags(writer, flags) \
sf_file_writer_set_flags(&(writer)->fw, flags) sf_file_writer_set_flags(&(writer)->fw, flags)
#define sf_binlog_writer_set_write_done_callback(writer, callback, args) \
sf_file_writer_set_write_done_callback(&(writer)->fw, callback, args)
#define sf_binlog_writer_get_last_version_ex(writer, log_level) \ #define sf_binlog_writer_get_last_version_ex(writer, log_level) \
sf_file_writer_get_last_version_ex(&(writer)->fw, log_level) sf_file_writer_get_last_version_ex(&(writer)->fw, log_level)

View File

@ -244,6 +244,11 @@ static int do_write_to_file(SFFileWriterInfo *writer,
} }
writer->file.size += len; writer->file.size += len;
if (writer->write_done_callback.func != NULL) {
writer->write_done_callback.func(writer,
writer->write_done_callback.args);
}
return 0; return 0;
} }
@ -422,6 +427,7 @@ int sf_file_writer_init(SFFileWriterInfo *writer, const char *data_path,
writer->last_versions.pending = 0; writer->last_versions.pending = 0;
writer->last_versions.done = 0; writer->last_versions.done = 0;
writer->flags = 0; writer->flags = 0;
sf_file_writer_set_write_done_callback(writer, NULL, NULL);
if ((result=sf_binlog_buffer_init(&writer-> if ((result=sf_binlog_buffer_init(&writer->
binlog_buffer, buffer_size)) != 0) binlog_buffer, buffer_size)) != 0)
{ {

View File

@ -30,6 +30,11 @@
#define SF_BINLOG_FILE_PREFIX "binlog" #define SF_BINLOG_FILE_PREFIX "binlog"
#define SF_BINLOG_FILE_EXT_FMT ".%06d" #define SF_BINLOG_FILE_EXT_FMT ".%06d"
struct sf_file_writer_info;
typedef void (*sf_file_write_done_callback)(
struct sf_file_writer_info *writer, void *args);
typedef struct sf_file_writer_info { typedef struct sf_file_writer_info {
struct { struct {
const char *data_path; const char *data_path;
@ -60,6 +65,12 @@ typedef struct sf_file_writer_info {
int64_t pending; int64_t pending;
volatile int64_t done; volatile int64_t done;
} last_versions; } last_versions;
struct {
sf_file_write_done_callback func;
void *args;
} write_done_callback;
} SFFileWriterInfo; } SFFileWriterInfo;
#ifdef __cplusplus #ifdef __cplusplus
@ -103,6 +114,14 @@ static inline void sf_file_writer_set_call_fsync(
writer->cfg.call_fsync = call_fsync; writer->cfg.call_fsync = call_fsync;
} }
static inline void sf_file_writer_set_write_done_callback (
SFFileWriterInfo *writer, sf_file_write_done_callback callback,
void *args)
{
writer->write_done_callback.func = callback;
writer->write_done_callback.args = args;
}
static inline int64_t sf_file_writer_get_last_version_ex( static inline int64_t sf_file_writer_get_last_version_ex(
SFFileWriterInfo *writer, const int log_level) SFFileWriterInfo *writer, const int log_level)
{ {