From 024c148700afd13d97fe11c71d8da73a7fc3b8fe Mon Sep 17 00:00:00 2001
From: YuQing <384681@qq.com>
Date: Sat, 21 Aug 2021 16:12:03 +0800
Subject: [PATCH] add files: sf_binlog_index.[hc]
---
src/Makefile.in | 4 +-
src/sf_binlog_index.c | 271 ++++++++++++++++++++++++++++++++++++++++++
src/sf_binlog_index.h | 82 +++++++++++++
3 files changed, 355 insertions(+), 2 deletions(-)
create mode 100644 src/sf_binlog_index.c
create mode 100644 src/sf_binlog_index.h
diff --git a/src/Makefile.in b/src/Makefile.in
index 1d73cbf..209dae1 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -8,7 +8,7 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION)
TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \
sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \
sf_cluster_cfg.h sf_sharding_htable.h sf_connection_manager.h \
- sf_serialize.h
+ sf_serialize.h sf_binlog_index.h
IDEMP_SERVER_HEADER = idempotency/server/server_types.h \
idempotency/server/server_channel.h \
@@ -27,7 +27,7 @@ SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \
sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \
sf_binlog_writer.lo sf_sharding_htable.lo \
sf_cluster_cfg.lo sf_connection_manager.lo \
- sf_serialize.lo \
+ sf_serialize.lo sf_binlog_index.lo \
idempotency/server/server_channel.lo \
idempotency/server/request_htable.lo \
idempotency/server/channel_htable.lo \
diff --git a/src/sf_binlog_index.c b/src/sf_binlog_index.c
new file mode 100644
index 0000000..4cf76e1
--- /dev/null
+++ b/src/sf_binlog_index.c
@@ -0,0 +1,271 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "fastcommon/logger.h"
+#include "fastcommon/shared_func.h"
+#include "fastcommon/fc_memory.h"
+#include "sf_binlog_index.h"
+
+#define SF_BINLOG_HEADER_FIELD_COUNT 2
+#define SF_BINLOG_HEADER_FIELD_INDEX_RECORD_COUNT 0
+#define SF_BINLOG_HEADER_FIELD_INDEX_LAST_VERSION 1
+
+void sf_binlog_index_init(SFBinlogIndexContext *ctx, const char *name,
+ const char *filename, const int record_max_size,
+ const int array_elt_size, pack_record_func pack_record,
+ unpack_record_func unpack_record)
+{
+ memset(ctx, 0, sizeof(SFBinlogIndexContext));
+ ctx->name = name;
+ ctx->filename = fc_strdup(filename);
+ ctx->record_max_size = record_max_size;
+ ctx->array_elt_size = array_elt_size;
+ ctx->pack_record = pack_record;
+ ctx->unpack_record = unpack_record;
+}
+
+static int parse_header(const string_t *line, int *record_count,
+ int64_t *last_version, char *error_info)
+{
+ int count;
+ char *endptr;
+ string_t cols[SF_BINLOG_HEADER_FIELD_COUNT];
+
+ count = split_string_ex(line, ' ', cols,
+ SF_BINLOG_HEADER_FIELD_COUNT, false);
+ if (count != SF_BINLOG_HEADER_FIELD_COUNT) {
+ sprintf(error_info, "field count: %d != %d",
+ count, SF_BINLOG_HEADER_FIELD_COUNT);
+ return EINVAL;
+ }
+
+ SF_BINLOG_PARSE_INT_SILENCE(*record_count, "record count",
+ SF_BINLOG_HEADER_FIELD_INDEX_RECORD_COUNT, ' ', 0);
+ SF_BINLOG_PARSE_INT_SILENCE(*last_version, "last version",
+ SF_BINLOG_HEADER_FIELD_INDEX_LAST_VERSION, '\n', 0);
+ return 0;
+}
+
+static int parse(SFBinlogIndexContext *ctx, const string_t *lines,
+ const int row_count)
+{
+ int result;
+ int record_count;
+ char error_info[256];
+ const string_t *line;
+ const string_t *end;
+ void *bindex;
+
+ if (row_count < 1) {
+ return EINVAL;
+ }
+
+ if ((result=parse_header(lines, &record_count, &ctx->
+ last_version, error_info)) != 0)
+ {
+ logError("file: "__FILE__", line: %d, "
+ "%s index file: %s, parse header fail, error info: %s",
+ __LINE__, ctx->name, ctx->filename, error_info);
+ return result;
+ }
+
+ if (row_count != record_count + 1) {
+ logError("file: "__FILE__", line: %d, "
+ "%s index file: %s, line count: %d != record count: "
+ "%d + 1", __LINE__, ctx->name, ctx->filename,
+ row_count, record_count + 1);
+ return EINVAL;
+ }
+
+ ctx->index_array.alloc = 64;
+ while (ctx->index_array.alloc < record_count) {
+ ctx->index_array.alloc *= 2;
+ }
+ ctx->index_array.indexes = fc_malloc(ctx->array_elt_size *
+ ctx->index_array.alloc);
+ if (ctx->index_array.indexes == NULL) {
+ return ENOMEM;
+ }
+
+ end = lines + row_count;
+ for (line=lines+1, bindex = ctx->index_array.indexes;
+ lineunpack_record(line, bindex, error_info)) != 0) {
+ logError("file: "__FILE__", line: %d, "
+ "%s index file: %s, parse line #%d fail, error "
+ "info: %s", __LINE__, ctx->name, ctx->filename,
+ (int)(line - lines) + 1, error_info);
+ return result;
+ }
+ }
+
+ ctx->index_array.count = bindex - ctx->index_array.indexes;
+ return 0;
+}
+
+static int load(SFBinlogIndexContext *ctx)
+{
+ int result;
+ int row_count;
+ int64_t file_size;
+ string_t context;
+ string_t *lines;
+
+ if ((result=getFileContent(ctx->filename, &context.str,
+ &file_size)) != 0)
+ {
+ return result;
+ }
+
+ context.len = file_size;
+ row_count = getOccurCount(context.str, '\n');
+ lines = (string_t *)fc_malloc(sizeof(string_t) * row_count);
+ if (lines == NULL) {
+ free(context.str);
+ return ENOMEM;
+ }
+
+ row_count = split_string_ex(&context, '\n', lines, row_count, true);
+ result = parse(ctx, lines, row_count);
+ free(lines);
+ free(context.str);
+ return result;
+}
+
+int sf_binlog_index_load(SFBinlogIndexContext *ctx)
+{
+ int result;
+
+ if (access(ctx->filename, F_OK) == 0) {
+ return load(ctx);
+ } else if (errno == ENOENT) {
+ return 0;
+ } else {
+ result = errno != 0 ? errno : EPERM;
+ logError("file: "__FILE__", line: %d, "
+ "access file %s fail, "
+ "errno: %d, error info: %s", __LINE__,
+ ctx->filename, result, STRERROR(result));
+ return result;
+ }
+}
+
+static int save(SFBinlogIndexContext *ctx, const char *filename)
+{
+ char buff[16 * 1024];
+ char *bend;
+ void *index;
+ void *end;
+ char *p;
+ int fd;
+ int len;
+ int result;
+
+ if ((fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "open file %s fail, errno: %d, error info: %s",
+ __LINE__, filename, result, STRERROR(result));
+ return result;
+ }
+
+ result = 0;
+ p = buff;
+ bend = buff + sizeof(buff);
+ p += sprintf(p, "%d %"PRId64"\n",
+ ctx->index_array.count,
+ ctx->last_version);
+
+ end = ctx->index_array.indexes + ctx->index_array.count;
+ for (index=ctx->index_array.indexes; indexrecord_max_size) {
+ len = p - buff;
+ if (fc_safe_write(fd, buff, len) != len) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "write file %s fail, errno: %d, error info: %s",
+ __LINE__, filename, result, STRERROR(result));
+ break;
+ }
+ p = buff;
+ }
+
+ p += ctx->pack_record(p, index);
+ }
+
+ if (result == 0) {
+ len = p - buff;
+ if (len > 0 && fc_safe_write(fd, buff, len) != len) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "write file %s fail, errno: %d, error info: %s",
+ __LINE__, filename, result, STRERROR(result));
+ }
+ }
+
+ close(fd);
+ return result;
+}
+
+int sf_binlog_index_save(SFBinlogIndexContext *ctx)
+{
+ int result;
+ char tmp_filename[PATH_MAX];
+
+ snprintf(tmp_filename, sizeof(tmp_filename), "%s.tmp", ctx->filename);
+ if ((result=save(ctx, tmp_filename)) != 0) {
+ return result;
+ }
+
+ if (rename(tmp_filename, ctx->filename) != 0) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "rename file \"%s\" to \"%s\" fail, "
+ "errno: %d, error info: %s",
+ __LINE__, tmp_filename, ctx->filename,
+ result, STRERROR(result));
+ return result;
+ }
+
+ return 0;
+}
+
+int sf_binlog_index_expand(SFBinlogIndexContext *ctx)
+{
+ int alloc;
+ void *indexes;
+
+ if (ctx->index_array.alloc == 0) {
+ alloc = 64;
+ } else {
+ alloc = ctx->index_array.alloc * 2;
+ }
+ indexes = fc_malloc(ctx->array_elt_size * alloc);
+ if (indexes == NULL) {
+ return ENOMEM;
+ }
+
+ if (ctx->index_array.count > 0) {
+ memcpy(indexes, ctx->index_array.indexes,
+ ctx->array_elt_size *
+ ctx->index_array.count);
+ free(ctx->index_array.indexes);
+ }
+
+ ctx->index_array.indexes = indexes;
+ ctx->index_array.alloc = alloc;
+ return 0;
+}
diff --git a/src/sf_binlog_index.h b/src/sf_binlog_index.h
new file mode 100644
index 0000000..5fd0cbe
--- /dev/null
+++ b/src/sf_binlog_index.h
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2020 YuQing <384681@qq.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+//sf_binlog_index.h
+
+#ifndef _SF_BINLOG_INDEX_H_
+#define _SF_BINLOG_INDEX_H_
+
+#include "fastcommon/common_define.h"
+
+#define SF_BINLOG_PARSE_INT_SILENCE(var, caption, index, endchr, min_val) \
+ do { \
+ var = strtol(cols[index].str, &endptr, 10); \
+ if (*endptr != endchr || var < min_val) { \
+ sprintf(error_info, "invalid %s: %.*s", \
+ caption, cols[index].len, cols[index].str); \
+ return EINVAL; \
+ } \
+ } while (0)
+
+typedef int (*pack_record_func)(char *buff, void *record);
+typedef int (*unpack_record_func)(const string_t *line,
+ void *record, char *error_info);
+
+typedef struct sf_binlog_index_array {
+ void *indexes;
+ int alloc;
+ int count;
+} SFBinlogIndexArray;
+
+typedef struct sf_binlog_index_context {
+ const char *name;
+ char *filename;
+ int record_max_size;
+ int array_elt_size;
+ pack_record_func pack_record;
+ unpack_record_func unpack_record;
+ SFBinlogIndexArray index_array;
+ int64_t last_version;
+} SFBinlogIndexContext;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void sf_binlog_index_init(SFBinlogIndexContext *ctx, const char *name,
+ const char *filename, const int record_max_size,
+ const int array_elt_size, pack_record_func pack_record,
+ unpack_record_func unpack_record);
+
+int sf_binlog_index_load(SFBinlogIndexContext *ctx);
+
+int sf_binlog_index_save(SFBinlogIndexContext *ctx);
+
+int sf_binlog_index_expand(SFBinlogIndexContext *ctx);
+
+static inline void sf_binlog_index_free(SFBinlogIndexContext *ctx)
+{
+ if (ctx->index_array.indexes != NULL) {
+ free(ctx->index_array.indexes);
+ ctx->index_array.indexes = NULL;
+ ctx->index_array.alloc = ctx->index_array.count = 0;
+ }
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif