diff --git a/src/Makefile.in b/src/Makefile.in index 1d73cbf..209dae1 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -8,7 +8,7 @@ TARGET_LIB = $(TARGET_PREFIX)/$(LIB_VERSION) TOP_HEADERS = sf_types.h sf_global.h sf_define.h sf_nio.h sf_service.h \ sf_func.h sf_util.h sf_configs.h sf_proto.h sf_binlog_writer.h \ sf_cluster_cfg.h sf_sharding_htable.h sf_connection_manager.h \ - sf_serialize.h + sf_serialize.h sf_binlog_index.h IDEMP_SERVER_HEADER = idempotency/server/server_types.h \ idempotency/server/server_channel.h \ @@ -27,7 +27,7 @@ SHARED_OBJS = sf_nio.lo sf_service.lo sf_global.lo \ sf_func.lo sf_util.lo sf_configs.lo sf_proto.lo \ sf_binlog_writer.lo sf_sharding_htable.lo \ sf_cluster_cfg.lo sf_connection_manager.lo \ - sf_serialize.lo \ + sf_serialize.lo sf_binlog_index.lo \ idempotency/server/server_channel.lo \ idempotency/server/request_htable.lo \ idempotency/server/channel_htable.lo \ diff --git a/src/sf_binlog_index.c b/src/sf_binlog_index.c new file mode 100644 index 0000000..4cf76e1 --- /dev/null +++ b/src/sf_binlog_index.c @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "fastcommon/logger.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/fc_memory.h" +#include "sf_binlog_index.h" + +#define SF_BINLOG_HEADER_FIELD_COUNT 2 +#define SF_BINLOG_HEADER_FIELD_INDEX_RECORD_COUNT 0 +#define SF_BINLOG_HEADER_FIELD_INDEX_LAST_VERSION 1 + +void sf_binlog_index_init(SFBinlogIndexContext *ctx, const char *name, + const char *filename, const int record_max_size, + const int array_elt_size, pack_record_func pack_record, + unpack_record_func unpack_record) +{ + memset(ctx, 0, sizeof(SFBinlogIndexContext)); + ctx->name = name; + ctx->filename = fc_strdup(filename); + ctx->record_max_size = record_max_size; + ctx->array_elt_size = array_elt_size; + ctx->pack_record = pack_record; + ctx->unpack_record = unpack_record; +} + +static int parse_header(const string_t *line, int *record_count, + int64_t *last_version, char *error_info) +{ + int count; + char *endptr; + string_t cols[SF_BINLOG_HEADER_FIELD_COUNT]; + + count = split_string_ex(line, ' ', cols, + SF_BINLOG_HEADER_FIELD_COUNT, false); + if (count != SF_BINLOG_HEADER_FIELD_COUNT) { + sprintf(error_info, "field count: %d != %d", + count, SF_BINLOG_HEADER_FIELD_COUNT); + return EINVAL; + } + + SF_BINLOG_PARSE_INT_SILENCE(*record_count, "record count", + SF_BINLOG_HEADER_FIELD_INDEX_RECORD_COUNT, ' ', 0); + SF_BINLOG_PARSE_INT_SILENCE(*last_version, "last version", + SF_BINLOG_HEADER_FIELD_INDEX_LAST_VERSION, '\n', 0); + return 0; +} + +static int parse(SFBinlogIndexContext *ctx, const string_t *lines, + const int row_count) +{ + int result; + int record_count; + char error_info[256]; + const string_t *line; + const string_t *end; + void *bindex; + + if (row_count < 1) { + return EINVAL; + } + + if ((result=parse_header(lines, &record_count, &ctx-> + last_version, error_info)) != 0) + { + logError("file: "__FILE__", line: %d, " + "%s index file: %s, parse header fail, error info: %s", + __LINE__, ctx->name, ctx->filename, error_info); + return result; + } + + if (row_count != record_count + 1) { + logError("file: "__FILE__", line: %d, " + "%s index file: %s, line count: %d != record count: " + "%d + 1", __LINE__, ctx->name, ctx->filename, + row_count, record_count + 1); + return EINVAL; + } + + ctx->index_array.alloc = 64; + while (ctx->index_array.alloc < record_count) { + ctx->index_array.alloc *= 2; + } + ctx->index_array.indexes = fc_malloc(ctx->array_elt_size * + ctx->index_array.alloc); + if (ctx->index_array.indexes == NULL) { + return ENOMEM; + } + + end = lines + row_count; + for (line=lines+1, bindex = ctx->index_array.indexes; + lineunpack_record(line, bindex, error_info)) != 0) { + logError("file: "__FILE__", line: %d, " + "%s index file: %s, parse line #%d fail, error " + "info: %s", __LINE__, ctx->name, ctx->filename, + (int)(line - lines) + 1, error_info); + return result; + } + } + + ctx->index_array.count = bindex - ctx->index_array.indexes; + return 0; +} + +static int load(SFBinlogIndexContext *ctx) +{ + int result; + int row_count; + int64_t file_size; + string_t context; + string_t *lines; + + if ((result=getFileContent(ctx->filename, &context.str, + &file_size)) != 0) + { + return result; + } + + context.len = file_size; + row_count = getOccurCount(context.str, '\n'); + lines = (string_t *)fc_malloc(sizeof(string_t) * row_count); + if (lines == NULL) { + free(context.str); + return ENOMEM; + } + + row_count = split_string_ex(&context, '\n', lines, row_count, true); + result = parse(ctx, lines, row_count); + free(lines); + free(context.str); + return result; +} + +int sf_binlog_index_load(SFBinlogIndexContext *ctx) +{ + int result; + + if (access(ctx->filename, F_OK) == 0) { + return load(ctx); + } else if (errno == ENOENT) { + return 0; + } else { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "access file %s fail, " + "errno: %d, error info: %s", __LINE__, + ctx->filename, result, STRERROR(result)); + return result; + } +} + +static int save(SFBinlogIndexContext *ctx, const char *filename) +{ + char buff[16 * 1024]; + char *bend; + void *index; + void *end; + char *p; + int fd; + int len; + int result; + + if ((fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644)) < 0) { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "open file %s fail, errno: %d, error info: %s", + __LINE__, filename, result, STRERROR(result)); + return result; + } + + result = 0; + p = buff; + bend = buff + sizeof(buff); + p += sprintf(p, "%d %"PRId64"\n", + ctx->index_array.count, + ctx->last_version); + + end = ctx->index_array.indexes + ctx->index_array.count; + for (index=ctx->index_array.indexes; indexrecord_max_size) { + len = p - buff; + if (fc_safe_write(fd, buff, len) != len) { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write file %s fail, errno: %d, error info: %s", + __LINE__, filename, result, STRERROR(result)); + break; + } + p = buff; + } + + p += ctx->pack_record(p, index); + } + + if (result == 0) { + len = p - buff; + if (len > 0 && fc_safe_write(fd, buff, len) != len) { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write file %s fail, errno: %d, error info: %s", + __LINE__, filename, result, STRERROR(result)); + } + } + + close(fd); + return result; +} + +int sf_binlog_index_save(SFBinlogIndexContext *ctx) +{ + int result; + char tmp_filename[PATH_MAX]; + + snprintf(tmp_filename, sizeof(tmp_filename), "%s.tmp", ctx->filename); + if ((result=save(ctx, tmp_filename)) != 0) { + return result; + } + + if (rename(tmp_filename, ctx->filename) != 0) { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "rename file \"%s\" to \"%s\" fail, " + "errno: %d, error info: %s", + __LINE__, tmp_filename, ctx->filename, + result, STRERROR(result)); + return result; + } + + return 0; +} + +int sf_binlog_index_expand(SFBinlogIndexContext *ctx) +{ + int alloc; + void *indexes; + + if (ctx->index_array.alloc == 0) { + alloc = 64; + } else { + alloc = ctx->index_array.alloc * 2; + } + indexes = fc_malloc(ctx->array_elt_size * alloc); + if (indexes == NULL) { + return ENOMEM; + } + + if (ctx->index_array.count > 0) { + memcpy(indexes, ctx->index_array.indexes, + ctx->array_elt_size * + ctx->index_array.count); + free(ctx->index_array.indexes); + } + + ctx->index_array.indexes = indexes; + ctx->index_array.alloc = alloc; + return 0; +} diff --git a/src/sf_binlog_index.h b/src/sf_binlog_index.h new file mode 100644 index 0000000..5fd0cbe --- /dev/null +++ b/src/sf_binlog_index.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//sf_binlog_index.h + +#ifndef _SF_BINLOG_INDEX_H_ +#define _SF_BINLOG_INDEX_H_ + +#include "fastcommon/common_define.h" + +#define SF_BINLOG_PARSE_INT_SILENCE(var, caption, index, endchr, min_val) \ + do { \ + var = strtol(cols[index].str, &endptr, 10); \ + if (*endptr != endchr || var < min_val) { \ + sprintf(error_info, "invalid %s: %.*s", \ + caption, cols[index].len, cols[index].str); \ + return EINVAL; \ + } \ + } while (0) + +typedef int (*pack_record_func)(char *buff, void *record); +typedef int (*unpack_record_func)(const string_t *line, + void *record, char *error_info); + +typedef struct sf_binlog_index_array { + void *indexes; + int alloc; + int count; +} SFBinlogIndexArray; + +typedef struct sf_binlog_index_context { + const char *name; + char *filename; + int record_max_size; + int array_elt_size; + pack_record_func pack_record; + unpack_record_func unpack_record; + SFBinlogIndexArray index_array; + int64_t last_version; +} SFBinlogIndexContext; + +#ifdef __cplusplus +extern "C" { +#endif + +void sf_binlog_index_init(SFBinlogIndexContext *ctx, const char *name, + const char *filename, const int record_max_size, + const int array_elt_size, pack_record_func pack_record, + unpack_record_func unpack_record); + +int sf_binlog_index_load(SFBinlogIndexContext *ctx); + +int sf_binlog_index_save(SFBinlogIndexContext *ctx); + +int sf_binlog_index_expand(SFBinlogIndexContext *ctx); + +static inline void sf_binlog_index_free(SFBinlogIndexContext *ctx) +{ + if (ctx->index_array.indexes != NULL) { + free(ctx->index_array.indexes); + ctx->index_array.indexes = NULL; + ctx->index_array.alloc = ctx->index_array.count = 0; + } +} + +#ifdef __cplusplus +} +#endif + +#endif