sf_file_writer_set_indexes impl.

vote_node
YuQing 2022-05-26 20:14:00 +08:00
parent f490366c03
commit 464573f9ff
4 changed files with 44 additions and 8 deletions

View File

@ -198,7 +198,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
thread->mblock, current); thread->mblock, current);
break; break;
case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE: case SF_BINLOG_BUFFER_TYPE_ROTATE_FILE:
if ((result=sf_file_writer_set_binlog_last_index(&current-> if ((result=sf_file_writer_set_binlog_write_index(&current->
writer->fw, current->writer->fw.binlog. writer->fw, current->writer->fw.binlog.
last_index + 1)) != 0) last_index + 1)) != 0)
{ {
@ -206,7 +206,7 @@ static int deal_binlog_records(SFBinlogWriterThread *thread,
} }
break; break;
case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX: case SF_BINLOG_BUFFER_TYPE_SET_WRITE_INDEX:
if ((result=sf_file_writer_set_binlog_last_index(&current-> if ((result=sf_file_writer_set_binlog_write_index(&current->
writer->fw, current->version.first)) != 0) writer->fw, current->version.first)) != 0)
{ {
return result; return result;

View File

@ -248,11 +248,14 @@ static inline SFBinlogWriterBuffer *sf_binlog_writer_alloc_versioned_buffer_ex(
sf_file_writer_get_binlog_last_index(data_path, \ sf_file_writer_get_binlog_last_index(data_path, \
subdir_name, last_index) subdir_name, last_index)
#define sf_binlog_set_indexes(writer, start_index, last_index) \
sf_file_writer_set_indexes(&(writer)->fw, start_index, last_index)
#define sf_binlog_writer_set_binlog_start_index(writer, start_index) \ #define sf_binlog_writer_set_binlog_start_index(writer, start_index) \
sf_file_writer_set_binlog_start_index(&(writer)->fw, start_index) sf_file_writer_set_binlog_start_index(&(writer)->fw, start_index)
#define sf_binlog_writer_set_binlog_last_index(writer, last_index) \ #define sf_binlog_writer_set_binlog_write_index(writer, last_index) \
sf_file_writer_set_binlog_last_index(&(writer)->fw, last_index) sf_file_writer_set_binlog_write_index(&(writer)->fw, last_index)
#define sf_push_to_binlog_thread_queue(thread, buffer) \ #define sf_push_to_binlog_thread_queue(thread, buffer) \
fc_queue_push(&(thread)->queue, buffer) fc_queue_push(&(thread)->queue, buffer)

View File

@ -400,6 +400,24 @@ void sf_file_writer_destroy(SFFileWriterInfo *writer)
sf_binlog_buffer_destroy(&writer->binlog_buffer); sf_binlog_buffer_destroy(&writer->binlog_buffer);
} }
int sf_file_writer_set_indexes(SFFileWriterInfo *writer,
const int start_index, const int last_index)
{
int result;
if (writer->binlog.start_index != start_index ||
writer->binlog.last_index != last_index)
{
writer->binlog.start_index = start_index;
writer->binlog.last_index = last_index;
if ((result=write_to_binlog_index_file(writer)) != 0) {
return result;
}
}
return 0;
}
int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer, int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer,
const int start_index) const int start_index)
{ {
@ -415,7 +433,7 @@ int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer,
return 0; return 0;
} }
int sf_file_writer_set_binlog_last_index(SFFileWriterInfo *writer, int sf_file_writer_set_binlog_write_index(SFFileWriterInfo *writer,
const int last_index) const int last_index)
{ {
int result; int result;
@ -449,8 +467,20 @@ int sf_file_writer_get_last_lines(const char *data_path,
current_count = remain_count; current_count = remain_count;
sf_file_writer_get_filename(data_path, subdir_name, sf_file_writer_get_filename(data_path, subdir_name,
current_index, filename, sizeof(filename)); current_index, filename, sizeof(filename));
result = fc_get_last_lines(filename, buff + *length, if (access(filename, F_OK) == 0) {
buff_size - *length, &lines, &current_count); result = fc_get_last_lines(filename, buff + *length,
buff_size - *length, &lines, &current_count);
} else {
result = errno != 0 ? errno : EPERM;
if (result != ENOENT) {
logError("file: "__FILE__", line: %d, "
"stat file %s fail, errno: %d, error info: %s",
__LINE__, filename, result, STRERROR(result));
*count = 0;
return result;
}
}
if (result == 0) { if (result == 0) {
memmove(buff + *length, lines.str, lines.len); memmove(buff + *length, lines.str, lines.len);
*length += lines.len; *length += lines.len;

View File

@ -118,6 +118,9 @@ static inline int sf_file_writer_get_binlog_last_index(
subdir_name, &start_index, last_index); subdir_name, &start_index, last_index);
} }
int sf_file_writer_set_indexes(SFFileWriterInfo *writer,
const int start_index, const int last_index);
int sf_file_writer_get_indexes(SFFileWriterInfo *writer, int sf_file_writer_get_indexes(SFFileWriterInfo *writer,
int *start_index, int *last_index); int *start_index, int *last_index);
@ -172,7 +175,7 @@ const char *sf_file_writer_get_index_filename(const char *data_path,
int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer, int sf_file_writer_set_binlog_start_index(SFFileWriterInfo *writer,
const int start_index); const int start_index);
int sf_file_writer_set_binlog_last_index(SFFileWriterInfo *writer, int sf_file_writer_set_binlog_write_index(SFFileWriterInfo *writer,
const int last_index); const int last_index);
int sf_file_writer_get_last_lines(const char *data_path, int sf_file_writer_get_last_lines(const char *data_path,