From 66fe9767f70f901c65037b5aa3fcc4ca4f4c2320 Mon Sep 17 00:00:00 2001
From: YuQing <384681@qq.com>
Date: Mon, 20 Sep 2021 20:55:23 +0800
Subject: [PATCH] add files: sf_file_writer.[hc]
---
src/Makefile.in | 3 +-
src/sf_binlog_writer.c | 2 +-
src/sf_file_writer.c | 396 +++++++++++++++++++++++++++++++++++++++++
src/sf_file_writer.h | 131 ++++++++++++++
4 files changed, 530 insertions(+), 2 deletions(-)
create mode 100644 src/sf_file_writer.c
create mode 100644 src/sf_file_writer.h
diff --git a/src/Makefile.in b/src/Makefile.in
index a9a29be..48a97cf 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -8,7 +8,7 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION)
TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \
sf_cluster_cfg.h sf_sharding_htable.h sf_connection_manager.h \
- sf_serializer.h sf_binlog_index.h
+ sf_serializer.h sf_binlog_index.h sf_file_writer.h
IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
@@ -28,6 +28,7 @@ SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \
sf_binlog_writer.lo sf_sharding_htable.lo \
sf_cluster_cfg.lo sf_connection_manager.lo \
sf_serializer.lo sf_binlog_index.lo \
+ sf_file_writer.lo \
idempotency/server/server_channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
diff --git a/src/sf_binlog_writer.c b/src/sf_binlog_writer.c
index 5610d03..139612f 100644
--- a/src/sf_binlog_writer.c
+++ b/src/sf_binlog_writer.c
@@ -258,7 +258,7 @@ static inline void binlog_writer_set_next_version(SFBinlogWriterInfo *writer,
writer->version_ctx.next = next_version;
}
-static inline int deal_binlog_one_record(SFBinlogWriterBuffer *wb)
+static int deal_binlog_one_record(SFBinlogWriterBuffer *wb)
{
int result;
diff --git a/src/sf_file_writer.c b/src/sf_file_writer.c
new file mode 100644
index 0000000..88f893c
--- /dev/null
+++ b/src/sf_file_writer.c
@@ -0,0 +1,396 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include "fastcommon/logger.h"
+#include "fastcommon/sockopt.h"
+#include "fastcommon/shared_func.h"
+#include "fastcommon/pthread_func.h"
+#include "fastcommon/sched_thread.h"
+#include "sf_global.h"
+#include "sf_func.h"
+#include "sf_file_writer.h"
+
+#define BINLOG_INDEX_FILENAME SF_BINLOG_FILE_PREFIX"_index.dat"
+
+#define BINLOG_INDEX_ITEM_CURRENT_WRITE "current_write"
+#define BINLOG_INDEX_ITEM_CURRENT_COMPRESS "current_compress"
+
+#define GET_BINLOG_FILENAME(writer) \
+ sprintf(writer->file.name, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, \
+ writer->cfg.data_path, writer->cfg.subdir_name, \
+ SF_BINLOG_FILE_PREFIX, writer->binlog.index)
+
+static int write_to_binlog_index_file(SFFileWriterInfo *writer)
+{
+ char full_filename[PATH_MAX];
+ char buff[256];
+ int result;
+ int len;
+
+ snprintf(full_filename, sizeof(full_filename), "%s/%s/%s",
+ writer->cfg.data_path, writer->cfg.subdir_name,
+ BINLOG_INDEX_FILENAME);
+
+ len = sprintf(buff, "%s=%d\n"
+ "%s=%d\n",
+ BINLOG_INDEX_ITEM_CURRENT_WRITE,
+ writer->binlog.index,
+ BINLOG_INDEX_ITEM_CURRENT_COMPRESS,
+ writer->binlog.compress_index);
+ if ((result=safeWriteToFile(full_filename, buff, len)) != 0) {
+ logError("file: "__FILE__", line: %d, "
+ "write to file \"%s\" fail, "
+ "errno: %d, error info: %s",
+ __LINE__, full_filename,
+ result, STRERROR(result));
+ }
+
+ return result;
+}
+
+static int get_binlog_index_from_file(SFFileWriterInfo *writer)
+{
+ char full_filename[PATH_MAX];
+ IniContext ini_context;
+ int result;
+
+ snprintf(full_filename, sizeof(full_filename), "%s/%s/%s",
+ writer->cfg.data_path, writer->cfg.subdir_name,
+ BINLOG_INDEX_FILENAME);
+ if (access(full_filename, F_OK) != 0) {
+ if (errno == ENOENT) {
+ writer->binlog.index = 0;
+ return write_to_binlog_index_file(writer);
+ }
+ }
+
+ if ((result=iniLoadFromFile(full_filename, &ini_context)) != 0) {
+ logError("file: "__FILE__", line: %d, "
+ "load from file \"%s\" fail, error code: %d",
+ __LINE__, full_filename, result);
+ return result;
+ }
+
+ writer->binlog.index = iniGetIntValue(NULL,
+ BINLOG_INDEX_ITEM_CURRENT_WRITE, &ini_context, 0);
+ writer->binlog.compress_index = iniGetIntValue(NULL,
+ BINLOG_INDEX_ITEM_CURRENT_COMPRESS, &ini_context, 0);
+
+ iniFreeContext(&ini_context);
+ return 0;
+}
+
+static int open_writable_binlog(SFFileWriterInfo *writer)
+{
+ if (writer->file.fd >= 0) {
+ close(writer->file.fd);
+ }
+
+ GET_BINLOG_FILENAME(writer);
+ writer->file.fd = open(writer->file.name,
+ O_WRONLY | O_CREAT | O_APPEND, 0644);
+ if (writer->file.fd < 0) {
+ logError("file: "__FILE__", line: %d, "
+ "open file \"%s\" fail, "
+ "errno: %d, error info: %s",
+ __LINE__, writer->file.name,
+ errno, STRERROR(errno));
+ return errno != 0 ? errno : EACCES;
+ }
+
+ writer->file.size = lseek(writer->file.fd, 0, SEEK_END);
+ if (writer->file.size < 0) {
+ logError("file: "__FILE__", line: %d, "
+ "lseek file \"%s\" fail, "
+ "errno: %d, error info: %s",
+ __LINE__, writer->file.name,
+ errno, STRERROR(errno));
+ return errno != 0 ? errno : EIO;
+ }
+
+ return 0;
+}
+
+static int open_next_binlog(SFFileWriterInfo *writer)
+{
+ GET_BINLOG_FILENAME(writer);
+ if (access(writer->file.name, F_OK) == 0) {
+ char bak_filename[PATH_MAX];
+ char date_str[32];
+
+ snprintf(bak_filename, sizeof(bak_filename), "%s.%s",
+ writer->file.name, formatDatetime(g_current_time,
+ "%Y%m%d%H%M%S", date_str, sizeof(date_str)));
+ if (rename(writer->file.name, bak_filename) == 0) {
+ logWarning("file: "__FILE__", line: %d, "
+ "binlog file %s exist, rename to %s",
+ __LINE__, writer->file.name, bak_filename);
+ } else {
+ logError("file: "__FILE__", line: %d, "
+ "rename binlog %s to backup %s fail, "
+ "errno: %d, error info: %s",
+ __LINE__, writer->file.name, bak_filename,
+ errno, STRERROR(errno));
+ return errno != 0 ? errno : EPERM;
+ }
+ }
+
+ return open_writable_binlog(writer);
+}
+
+static int do_write_to_file(SFFileWriterInfo *writer,
+ char *buff, const int len)
+{
+ int result;
+
+ if (fc_safe_write(writer->file.fd, buff, len) != len) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "write to binlog file \"%s\" fail, "
+ "errno: %d, error info: %s",
+ __LINE__, writer->file.name,
+ result, STRERROR(result));
+ return result;
+ }
+
+ if (fsync(writer->file.fd) != 0) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "fsync to binlog file \"%s\" fail, "
+ "errno: %d, error info: %s",
+ __LINE__, writer->file.name,
+ result, STRERROR(result));
+ return result;
+ }
+
+ writer->file.size += len;
+ return 0;
+}
+
+static int check_write_to_file(SFFileWriterInfo *writer,
+ char *buff, const int len)
+{
+ int result;
+
+ if (writer->file.size + len <= SF_BINLOG_FILE_MAX_SIZE) {
+ return do_write_to_file(writer, buff, len);
+ }
+
+ writer->binlog.index++; //binlog rotate
+ if ((result=write_to_binlog_index_file(writer)) == 0) {
+ result = open_next_binlog(writer);
+ }
+
+ if (result != 0) {
+ logError("file: "__FILE__", line: %d, "
+ "open binlog file \"%s\" fail",
+ __LINE__, writer->file.name);
+ return result;
+ }
+
+ return do_write_to_file(writer, buff, len);
+}
+
+int sf_file_writer_flush(SFFileWriterInfo *writer)
+{
+ int result;
+ int len;
+
+ len = SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer);
+ if (len == 0) {
+ return 0;
+ }
+
+ result = check_write_to_file(writer, writer->binlog_buffer.buff, len);
+ writer->binlog_buffer.end = writer->binlog_buffer.buff;
+ return result;
+}
+
+int sf_file_writer_get_current_index(SFFileWriterInfo *writer)
+{
+ if (writer == NULL) { //for data recovery
+ return 0;
+ }
+
+ if (writer->binlog.index < 0) {
+ get_binlog_index_from_file(writer);
+ }
+
+ return writer->binlog.index;
+}
+
+int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
+ BufferInfo *buffer, const int64_t version)
+{
+ int result;
+
+ if (buffer->length >= writer->binlog_buffer.size / 4) {
+ if (SF_BINLOG_BUFFER_LENGTH(writer->binlog_buffer) > 0) {
+ if ((result=sf_file_writer_flush(writer)) != 0) {
+ return result;
+ }
+ }
+
+ if ((result=check_write_to_file(writer, buffer->buff,
+ buffer->length)) == 0)
+ {
+ if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
+ writer->last_versions.pending = version;
+ }
+ }
+ return result;
+ }
+
+ if (writer->file.size + SF_BINLOG_BUFFER_LENGTH(writer->
+ binlog_buffer) + buffer->length > SF_BINLOG_FILE_MAX_SIZE)
+ {
+ if ((result=sf_file_writer_flush(writer)) != 0) {
+ return result;
+ }
+ } else if (writer->binlog_buffer.size - SF_BINLOG_BUFFER_LENGTH(
+ writer->binlog_buffer) < buffer->length)
+ {
+ if ((result=sf_file_writer_flush(writer)) != 0) {
+ return result;
+ }
+ }
+
+ if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
+ writer->last_versions.pending = version;
+ }
+ memcpy(writer->binlog_buffer.end, buffer->buff, buffer->length);
+ writer->binlog_buffer.end += buffer->length;
+
+ return 0;
+}
+
+int sf_file_writer_init_normal(SFFileWriterInfo *writer,
+ const char *data_path, const char *subdir_name,
+ const int buffer_size)
+{
+ int result;
+ int path_len;
+ bool create;
+ char filepath[PATH_MAX];
+
+ writer->total_count = 0;
+ writer->last_versions.pending = 0;
+ writer->last_versions.done = 0;
+ writer->flags = 0;
+ if ((result=sf_binlog_buffer_init(&writer->
+ binlog_buffer, buffer_size)) != 0)
+ {
+ return result;
+ }
+
+ writer->cfg.data_path = data_path;
+ path_len = snprintf(filepath, sizeof(filepath),
+ "%s/%s", data_path, subdir_name);
+ if ((result=fc_check_mkdir_ex(filepath, 0775, &create)) != 0) {
+ return result;
+ }
+ if (create) {
+ SF_CHOWN_TO_RUNBY_RETURN_ON_ERROR(filepath);
+ }
+
+ writer->file.fd = -1;
+ snprintf(writer->cfg.subdir_name,
+ sizeof(writer->cfg.subdir_name),
+ "%s", subdir_name);
+ writer->file.name = (char *)fc_malloc(path_len + 32);
+ if (writer->file.name == NULL) {
+ return ENOMEM;
+ }
+
+ if ((result=get_binlog_index_from_file(writer)) != 0) {
+ return result;
+ }
+
+ if ((result=open_writable_binlog(writer)) != 0) {
+ return result;
+ }
+
+ return 0;
+}
+
+int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer,
+ const int binlog_index)
+{
+ int result;
+
+ if (writer->binlog.index != binlog_index) {
+ writer->binlog.index = binlog_index;
+ if ((result=write_to_binlog_index_file(writer)) != 0) {
+ return result;
+ }
+ }
+
+ return open_writable_binlog(writer);
+}
+
+int sf_file_writer_get_last_lines(const char *data_path,
+ const char *subdir_name, const int current_write_index,
+ char *buff, const int buff_size, int *count, int *length)
+{
+ int result;
+ int remain_count;
+ int current_count;
+ int current_index;
+ int i;
+ char filename[PATH_MAX];
+ string_t lines;
+
+ current_index = current_write_index;
+ *length = 0;
+ remain_count = *count;
+ for (i=0; i<2; i++) {
+ current_count = remain_count;
+ sf_file_writer_get_filename(data_path, subdir_name,
+ current_index, filename, sizeof(filename));
+ result = fc_get_last_lines(filename, buff + *length,
+ buff_size - *length, &lines, ¤t_count);
+ if (result == 0) {
+ memmove(buff + *length, lines.str, lines.len);
+ *length += lines.len;
+ remain_count -= current_count;
+ if (remain_count == 0) {
+ break;
+ }
+ } else if (result != ENOENT) {
+ *count = 0;
+ return result;
+ }
+ if (current_index == 0) {
+ break;
+ }
+
+ --current_index; //try previous binlog file
+ }
+
+ *count -= remain_count;
+ return 0;
+}
diff --git a/src/sf_file_writer.h b/src/sf_file_writer.h
new file mode 100644
index 0000000..74a7fca
--- /dev/null
+++ b/src/sf_file_writer.h
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+//sf_file_writer.h
+
+#ifndef _SF_FILE_WRITER_H_
+#define _SF_FILE_WRITER_H_
+
+#include "fastcommon/fc_queue.h"
+#include "sf_types.h"
+
+#define SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION 1
+
+#define SF_BINLOG_SUBDIR_NAME_SIZE 128
+#define SF_BINLOG_FILE_MAX_SIZE (1024 * 1024 * 1024) //for binlog rotating by size
+#define SF_BINLOG_FILE_PREFIX "binlog"
+#define SF_BINLOG_FILE_EXT_FMT ".%06d"
+
+#define SF_BINLOG_BUFFER_LENGTH(buffer) ((buffer).end - (buffer).buff)
+#define SF_BINLOG_BUFFER_REMAIN(buffer) ((buffer).end - (buffer).current)
+
+typedef struct sf_file_writer_info {
+ struct {
+ const char *data_path;
+ char subdir_name[SF_BINLOG_SUBDIR_NAME_SIZE];
+ int max_record_size;
+ } cfg;
+
+ struct {
+ int index;
+ int compress_index;
+ } binlog;
+
+ struct {
+ int fd;
+ int64_t size;
+ char *name;
+ } file;
+
+ int64_t total_count;
+ SFBinlogBuffer binlog_buffer;
+
+ short flags;
+ struct {
+ int64_t pending;
+ volatile int64_t done;
+ } last_versions;
+} SFFileWriterInfo;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int sf_file_writer_init_normal(SFFileWriterInfo *writer,
+ const char *data_path, const char *subdir_name,
+ const int buffer_size);
+
+int sf_file_writer_deal_buffer(SFFileWriterInfo *writer,
+ BufferInfo *buffer, const int64_t version);
+
+int sf_file_writer_flush(SFFileWriterInfo *writer);
+
+static inline void sf_file_writer_set_flags(
+ SFFileWriterInfo *writer, const short flags)
+{
+ writer->flags = flags;
+}
+
+static inline int64_t sf_file_writer_get_last_version(
+ SFFileWriterInfo *writer)
+{
+ if (writer->flags & SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION) {
+ return writer->last_versions.done;
+ } else {
+ logError("file: "__FILE__", line: %d, "
+ "should set writer flags to %d!", __LINE__,
+ SF_FILE_WRITER_FLAGS_WANT_DONE_VERSION);
+ return -1;
+ }
+}
+
+int sf_file_writer_get_current_index(SFFileWriterInfo *writer);
+
+static inline void sf_file_writer_get_current_position(
+ SFFileWriterInfo *writer, SFBinlogFilePosition *position)
+{
+ position->index = writer->binlog.index;
+ position->offset = writer->file.size;
+}
+
+static inline const char *sf_file_writer_get_filepath(
+ const char *data_path, const char *subdir_name,
+ char *filename, const int size)
+{
+ snprintf(filename, size, "%s/%s", data_path, subdir_name);
+ return filename;
+}
+
+static inline const char *sf_file_writer_get_filename(
+ const char *data_path, const char *subdir_name,
+ const int binlog_index, char *filename, const int size)
+{
+ snprintf(filename, size, "%s/%s/%s"SF_BINLOG_FILE_EXT_FMT, data_path,
+ subdir_name, SF_BINLOG_FILE_PREFIX, binlog_index);
+ return filename;
+}
+
+int sf_file_writer_set_binlog_index(SFFileWriterInfo *writer,
+ const int binlog_index);
+
+int sf_file_writer_get_last_lines(const char *data_path,
+ const char *subdir_name, const int current_write_index,
+ char *buff, const int buff_size, int *count, int *length);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif