From 9c0bbce9df40ec5f9b4f075db55009f92361877f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 10 Nov 2019 20:38:36 +0800 Subject: [PATCH] support regenerate filename for appender file --- HISTORY | 4 +- client/Makefile.in | 2 +- client/client_func.h | 2 + client/fdfs_file_info.c | 22 +- client/fdfs_regenerate_filename.c | 68 ++++ client/storage_client.c | 147 +++++++- client/storage_client.h | 17 + client/storage_client1.h | 16 + common/fdfs_global.c | 2 +- storage/storage_service.c | 556 +++++++++++++++++++----------- storage/storage_sync.h | 30 +- tracker/tracker_proto.h | 12 +- tracker/tracker_types.h | 4 + 13 files changed, 652 insertions(+), 230 deletions(-) create mode 100644 client/fdfs_regenerate_filename.c diff --git a/HISTORY b/HISTORY index 3c9faf5..0bde129 100644 --- a/HISTORY +++ b/HISTORY @@ -1,8 +1,10 @@ -Version 6.02 2019-11-07 +Version 6.02 2019-11-10 * get_file_info calculate CRC32 for appender file type * disk recovery download file to local temp file then rename it when the local file exists + * support regenerate filename for appender file + NOTE: the regenerated file will be a normal file! Version 6.01 2019-10-25 * compress and uncompress binlog file by gzip when need, diff --git a/client/Makefile.in b/client/Makefile.in index 859b8b8..346ca44 100644 --- a/client/Makefile.in +++ b/client/Makefile.in @@ -39,7 +39,7 @@ ALL_OBJS = $(STATIC_OBJS) $(FDFS_SHARED_OBJS) ALL_PRGS = fdfs_monitor fdfs_test fdfs_test1 fdfs_crc32 fdfs_upload_file \ fdfs_download_file fdfs_delete_file fdfs_file_info \ fdfs_appender_test fdfs_appender_test1 fdfs_append_file \ - fdfs_upload_appender + fdfs_upload_appender fdfs_regenerate_filename STATIC_LIBS = libfdfsclient.a diff --git a/client/client_func.h b/client/client_func.h index 01c338c..f3f2dfd 100644 --- a/client/client_func.h +++ b/client/client_func.h @@ -16,6 +16,8 @@ #define _CLIENT_FUNC_H_ typedef struct { + short file_type; + bool get_from_server; time_t create_timestamp; int crc32; int source_id; //source storage id diff --git a/client/fdfs_file_info.c b/client/fdfs_file_info.c index 2e153ad..050a6c2 100644 --- a/client/fdfs_file_info.c +++ b/client/fdfs_file_info.c @@ -19,6 +19,7 @@ int main(int argc, char *argv[]) { char *conf_filename; + const char *file_type_str; char file_id[128]; int result; FDFSFileInfo file_info; @@ -44,7 +45,7 @@ int main(int argc, char *argv[]) result = fdfs_get_file_info_ex1(file_id, true, &file_info); if (result != 0) { - printf("query file info fail, " \ + fprintf(stderr, "query file info fail, " \ "error no: %d, error info: %s\n", \ result, STRERROR(result)); } @@ -52,6 +53,25 @@ int main(int argc, char *argv[]) { char szDatetime[32]; + switch (file_info.file_type) + { + case FDFS_FILE_TYPE_NORMAL: + file_type_str = "normal"; + break; + case FDFS_FILE_TYPE_SLAVE: + file_type_str = "slave"; + break; + case FDFS_FILE_TYPE_APPENDER: + file_type_str = "appender"; + break; + default: + file_type_str = "unkown"; + break; + } + + printf("GET FROM SERVER: %s\n\n", + file_info.get_from_server ? "true" : "false"); + printf("file type: %s\n", file_type_str); printf("source storage id: %d\n", file_info.source_id); printf("source ip address: %s\n", file_info.source_ip_addr); printf("file create timestamp: %s\n", formatDatetime( diff --git a/client/fdfs_regenerate_filename.c b/client/fdfs_regenerate_filename.c new file mode 100644 index 0000000..ef07462 --- /dev/null +++ b/client/fdfs_regenerate_filename.c @@ -0,0 +1,68 @@ +/** +* Copyright (C) 2008 Happy Fish / YuQing +* +* FastDFS may be copied only under the terms of the GNU General +* Public License V3, which may be found in the FastDFS source kit. +* Please visit the FastDFS Home Page http://www.csource.org/ for more detail. +**/ + +#include +#include +#include +#include +#include +#include +#include +#include "fdfs_client.h" +#include "fastcommon/logger.h" + +int main(int argc, char *argv[]) +{ + char *conf_filename; + ConnectionInfo *pTrackerServer; + int result; + char appender_file_id[128]; + char new_file_id[128]; + + if (argc < 3) + { + fprintf(stderr, "regenerate filename for the appender file.\n" + "NOTE: the regenerated file will be a normal file!\n" + "Usage: %s \n", + argv[0]); + return 1; + } + + log_init(); + g_log_context.log_level = LOG_ERR; + + conf_filename = argv[1]; + if ((result=fdfs_client_init(conf_filename)) != 0) + { + return result; + } + + pTrackerServer = tracker_get_connection(); + if (pTrackerServer == NULL) + { + fdfs_client_destroy(); + return errno != 0 ? errno : ECONNREFUSED; + } + + snprintf(appender_file_id, sizeof(appender_file_id), "%s", argv[2]); + if ((result=storage_regenerate_appender_filename1(pTrackerServer, + NULL, appender_file_id, new_file_id)) != 0) + { + fprintf(stderr, "regenerate file %s fail, " + "error no: %d, error info: %s\n", + appender_file_id, result, STRERROR(result)); + return result; + } + + printf("%s\n", new_file_id); + + tracker_close_connection_ex(pTrackerServer, true); + fdfs_client_destroy(); + + return result; +} diff --git a/client/storage_client.c b/client/storage_client.c index ce7912a..37d5710 100644 --- a/client/storage_client.c +++ b/client/storage_client.c @@ -35,20 +35,20 @@ static struct base64_context the_base64_context; static int the_base64_context_inited = 0; #define FDFS_SPLIT_GROUP_NAME_AND_FILENAME(file_id) \ - char new_file_id[FDFS_GROUP_NAME_MAX_LEN + 128]; \ + char in_file_id[FDFS_GROUP_NAME_MAX_LEN + 128]; \ char *group_name; \ char *filename; \ char *pSeperator; \ \ - snprintf(new_file_id, sizeof(new_file_id), "%s", file_id); \ - pSeperator = strchr(new_file_id, FDFS_FILE_ID_SEPERATOR); \ + snprintf(in_file_id, sizeof(in_file_id), "%s", file_id); \ + pSeperator = strchr(in_file_id, FDFS_FILE_ID_SEPERATOR); \ if (pSeperator == NULL) \ { \ return EINVAL; \ } \ \ *pSeperator = '\0'; \ - group_name = new_file_id; \ + group_name = in_file_id; \ filename = pSeperator + 1; \ #define storage_get_read_connection(pTrackerServer, \ @@ -2203,8 +2203,21 @@ int fdfs_get_file_info_ex(const char *group_name, const char *remote_filename, \ pFileInfo->create_timestamp = buff2int(buff + sizeof(int)); pFileInfo->file_size = buff2long(buff + sizeof(int) * 2); - if (IS_SLAVE_FILE(filename_len, pFileInfo->file_size) || \ - IS_APPENDER_FILE(pFileInfo->file_size) || \ + if (IS_APPENDER_FILE(pFileInfo->file_size)) + { + pFileInfo->file_type = FDFS_FILE_TYPE_APPENDER; + } + else if (IS_SLAVE_FILE(filename_len, pFileInfo->file_size)) + { + pFileInfo->file_type = FDFS_FILE_TYPE_SLAVE; + } + else + { + pFileInfo->file_type = FDFS_FILE_TYPE_NORMAL; + } + + if (pFileInfo->file_type == FDFS_FILE_TYPE_SLAVE || + pFileInfo->file_type == FDFS_FILE_TYPE_APPENDER || (*(pFileInfo->source_ip_addr) == '\0' && get_from_server)) { //slave file or appender file if (get_from_server) @@ -2218,21 +2231,24 @@ int fdfs_get_file_info_ex(const char *group_name, const char *remote_filename, \ return result; } - result = storage_query_file_info(conn, \ + result = storage_query_file_info(conn, NULL, group_name, remote_filename, pFileInfo); - tracker_close_connection_ex(conn, result != 0 && \ + tracker_close_connection_ex(conn, result != 0 && result != ENOENT); + pFileInfo->get_from_server = true; return result; } else { + pFileInfo->get_from_server = false; pFileInfo->file_size = -1; return 0; } } else //master file (normal file) { + pFileInfo->get_from_server = false; if ((pFileInfo->file_size >> 63) != 0) { pFileInfo->file_size &= 0xFFFFFFFF; //low 32 bits is file size @@ -2352,3 +2368,118 @@ int storage_truncate_file(ConnectionInfo *pTrackerServer, \ return result; } +int storage_regenerate_appender_filename(ConnectionInfo *pTrackerServer, + ConnectionInfo *pStorageServer, const char *group_name, + const char *appender_filename, char *new_group_name, + char *new_remote_filename) +{ + TrackerHeader *pHeader; + int result; + char out_buff[512]; + char in_buff[256]; + char *p; + char *pInBuff; + int64_t in_bytes; + ConnectionInfo storageServer; + bool new_connection; + int appender_filename_len; + + appender_filename_len = strlen(appender_filename); + if ((result=storage_get_update_connection(pTrackerServer, + &pStorageServer, group_name, appender_filename, + &storageServer, &new_connection)) != 0) + { + return result; + } + + do + { + pHeader = (TrackerHeader *)out_buff; + p = out_buff + sizeof(TrackerHeader); + + snprintf(p, sizeof(out_buff) - sizeof(TrackerHeader), + "%s", group_name); + p += FDFS_GROUP_NAME_MAX_LEN; + memcpy(p, appender_filename, appender_filename_len); + p += appender_filename_len; + + long2buff((p - out_buff) - sizeof(TrackerHeader), + pHeader->pkg_len); + pHeader->cmd = STORAGE_PROTO_CMD_REGENERATE_APPENDER_FILENAME; + pHeader->status = 0; + + if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, + p - out_buff, g_fdfs_network_timeout)) != 0) + { + logError("file: "__FILE__", line: %d, " + "send data to storage server %s:%d fail, " + "errno: %d, error info: %s", __LINE__, + pStorageServer->ip_addr, pStorageServer->port, + result, STRERROR(result)); + break; + } + + pInBuff = in_buff; + if ((result=fdfs_recv_response(pStorageServer, + &pInBuff, sizeof(in_buff), &in_bytes)) != 0) + { + logError("file: "__FILE__", line: %d, " + "fdfs_recv_response fail, result: %d", + __LINE__, result); + break; + } + + if (in_bytes <= FDFS_GROUP_NAME_MAX_LEN) + { + logError("file: "__FILE__", line: %d, " + "storage server %s:%d response data " + "length: %"PRId64" is invalid, " + "should > %d", __LINE__, + pStorageServer->ip_addr, pStorageServer->port, + in_bytes, FDFS_GROUP_NAME_MAX_LEN); + result = EINVAL; + break; + } + + in_buff[in_bytes] = '\0'; + memcpy(new_group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN); + new_group_name[FDFS_GROUP_NAME_MAX_LEN] = '\0'; + + memcpy(new_remote_filename, in_buff + FDFS_GROUP_NAME_MAX_LEN, + in_bytes - FDFS_GROUP_NAME_MAX_LEN + 1); + + } while (0); + + if (new_connection) + { + tracker_close_connection_ex(pStorageServer, result != 0); + } + + return result; +} + +int storage_regenerate_appender_filename1(ConnectionInfo *pTrackerServer, + ConnectionInfo *pStorageServer, const char *appender_file_id, + char *new_file_id) +{ + int result; + char new_group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; + char new_remote_filename[128]; + + FDFS_SPLIT_GROUP_NAME_AND_FILENAME(appender_file_id) + + result = storage_regenerate_appender_filename(pTrackerServer, + pStorageServer, group_name, filename, + new_group_name, new_remote_filename); + if (result == 0) + { + sprintf(new_file_id, "%s%c%s", new_group_name, + FDFS_FILE_ID_SEPERATOR, new_remote_filename); + } + else + { + new_file_id[0] = '\0'; + } + + return result; +} diff --git a/client/storage_client.h b/client/storage_client.h index 12489a9..fd4849e 100644 --- a/client/storage_client.h +++ b/client/storage_client.h @@ -563,6 +563,23 @@ int fdfs_get_file_info_ex(const char *group_name, const char *remote_filename, \ const bool get_from_server, FDFSFileInfo *pFileInfo); +/** +* regenerate normal filename for appender file +* Note: the appender file will change to normal file +* params: +* pTrackerServer: the tracker server +* pStorageServer: the storage server +* group_name: the group name +* appender_filename: the appender filename +* new_group_name: return the new group name +* new_remote_filename: return the new filename +* return: 0 success, !=0 fail, return the error code +**/ +int storage_regenerate_appender_filename(ConnectionInfo *pTrackerServer, + ConnectionInfo *pStorageServer, const char *group_name, + const char *appender_filename, char *new_group_name, + char *new_remote_filename); + #ifdef __cplusplus } #endif diff --git a/client/storage_client1.h b/client/storage_client1.h index 63b5e4c..7a4bf77 100644 --- a/client/storage_client1.h +++ b/client/storage_client1.h @@ -524,6 +524,22 @@ int fdfs_get_file_info_ex1(const char *file_id, const bool get_from_server, \ int storage_file_exist1(ConnectionInfo *pTrackerServer, \ ConnectionInfo *pStorageServer, \ const char *file_id); + +/** +* regenerate normal filename for appender file +* Note: the appender file will change to normal file +* params: +* pTrackerServer: the tracker server +* pStorageServer: the storage server +* group_name: the group name +* appender_file_id: the appender file id +* file_id: regenerated file id return by storage server +* return: 0 success, !=0 fail, return the error code +**/ +int storage_regenerate_appender_filename1(ConnectionInfo *pTrackerServer, + ConnectionInfo *pStorageServer, const char *appender_file_id, + char *new_file_id); + #ifdef __cplusplus } #endif diff --git a/common/fdfs_global.c b/common/fdfs_global.c index e33dab4..9963451 100644 --- a/common/fdfs_global.c +++ b/common/fdfs_global.c @@ -23,7 +23,7 @@ int g_fdfs_connect_timeout = DEFAULT_CONNECT_TIMEOUT; int g_fdfs_network_timeout = DEFAULT_NETWORK_TIMEOUT; char g_fdfs_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'}; -Version g_fdfs_version = {6, 1}; +Version g_fdfs_version = {6, 2}; bool g_use_connection_pool = false; ConnectionPool g_connection_pool; int g_connection_pool_max_idle_time = 3600; diff --git a/storage/storage_service.c b/storage/storage_service.c index 77e6173..bd49875 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -59,6 +59,7 @@ #define ACCESS_LOG_ACTION_APPEND_FILE "append" #define ACCESS_LOG_ACTION_TRUNCATE_FILE "truncate" #define ACCESS_LOG_ACTION_QUERY_FILE "status" +#define ACCESS_LOG_ACTION_RENAME_FILE "rename" typedef struct { @@ -2111,9 +2112,9 @@ void storage_get_store_path(const char *filename, const int filename_len, \ } while (0) -static int storage_gen_filename(StorageClientInfo *pClientInfo, \ - const int64_t file_size, const int crc32, \ - const char *szFormattedExt, const int ext_name_len, \ +static int storage_gen_filename(StorageClientInfo *pClientInfo, + const int64_t file_size, const int crc32, + const char *szFormattedExt, const int ext_name_len, const time_t timestamp, char *filename, int *filename_len) { char buff[sizeof(int) * 5]; @@ -2143,18 +2144,18 @@ static int storage_gen_filename(StorageClientInfo *pClientInfo, \ { int sub_path_high; int sub_path_low; - storage_get_store_path(encoded, *filename_len, \ + storage_get_store_path(encoded, *filename_len, &sub_path_high, &sub_path_low); pTrunkInfo->path.sub_path_high = sub_path_high; pTrunkInfo->path.sub_path_low = sub_path_low; - pClientInfo->file_context.extra_info.upload. \ + pClientInfo->file_context.extra_info.upload. if_sub_path_alloced = true; } - len = sprintf(filename, FDFS_STORAGE_DATA_DIR_FORMAT"/" \ - FDFS_STORAGE_DATA_DIR_FORMAT"/", \ + len = sprintf(filename, FDFS_STORAGE_DATA_DIR_FORMAT"/" + FDFS_STORAGE_DATA_DIR_FORMAT"/", pTrunkInfo->path.sub_path_high, pTrunkInfo->path.sub_path_low); memcpy(filename+len, encoded, *filename_len); @@ -2187,7 +2188,7 @@ static int storage_sort_metadata_buff(char *meta_buff, const int meta_size) return 0; } -static void storage_format_ext_name(const char *file_ext_name, \ +static void storage_format_ext_name(const char *file_ext_name, char *szFormattedExt) { int i; @@ -2220,9 +2221,9 @@ static void storage_format_ext_name(const char *file_ext_name, \ *p = '\0'; } -static int storage_get_filename(StorageClientInfo *pClientInfo, \ - const int start_time, const int64_t file_size, const int crc32, \ - const char *szFormattedExt, char *filename, \ +static int storage_get_filename(StorageClientInfo *pClientInfo, + const int start_time, const int64_t file_size, const int crc32, + const char *szFormattedExt, char *filename, int *filename_len, char *full_filename) { int i; @@ -2233,14 +2234,14 @@ static int storage_get_filename(StorageClientInfo *pClientInfo, \ trunk_info.path.store_path_index; for (i=0; i<10; i++) { - if ((result=storage_gen_filename(pClientInfo, file_size, \ - crc32, szFormattedExt, FDFS_FILE_EXT_NAME_MAX_LEN+1, \ + if ((result=storage_gen_filename(pClientInfo, file_size, + crc32, szFormattedExt, FDFS_FILE_EXT_NAME_MAX_LEN + 1, start_time, filename, filename_len)) != 0) { return result; } - sprintf(full_filename, "%s/data/%s", \ + sprintf(full_filename, "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], filename); if (!fileExists(full_filename)) { @@ -2252,7 +2253,7 @@ static int storage_get_filename(StorageClientInfo *pClientInfo, \ if (*full_filename == '\0') { - logError("file: "__FILE__", line: %d, " \ + logError("file: "__FILE__", line: %d, " "Can't generate uniq filename", __LINE__); *filename = '\0'; *filename_len = 0; @@ -3491,8 +3492,8 @@ static int query_file_info_response(struct fast_task_info *pTask, return 0; } -static void calc_crc32_done_callback(struct fast_task_info *pTask, - const int err_no) +static void calc_crc32_done_callback_for_query_finfo( + struct fast_task_info *pTask, const int err_no) { StorageClientInfo *pClientInfo; StorageFileInfoForCRC32 *crc32_file_info; @@ -3525,6 +3526,7 @@ static void calc_crc32_done_callback(struct fast_task_info *pTask, pHeader->cmd = STORAGE_PROTO_CMD_RESP; long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len); + STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_QUERY_FILE, result); storage_nio_notify(pTask); } @@ -3534,6 +3536,39 @@ static int calc_crc32_continue_callback(struct fast_task_info *pTask) return storage_dio_queue_push(pTask); } +static int push_calc_crc32_to_dio_queue(struct fast_task_info *pTask, + FileDealDoneCallback done_callback, const int store_path_index, + const struct stat *file_stat, const int storage_id) +{ + StorageClientInfo *pClientInfo; + StorageFileContext *pFileContext; + StorageFileInfoForCRC32 *crc32_file_info; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pFileContext = &(pClientInfo->file_context); + + crc32_file_info = (StorageFileInfoForCRC32 *)fast_mblock_alloc_object( + &finfo_for_crc32_allocator); + if (crc32_file_info == NULL) + { + logError("file: "__FILE__", line: %d, " + "finfo_for_crc32_allocator %d bytes object fail", + __LINE__, (int)sizeof(StorageFileInfoForCRC32)); + return errno != 0 ? errno : ENOMEM; + } + + crc32_file_info->storage_id = storage_id; + crc32_file_info->fsize = file_stat->st_size; + crc32_file_info->mtime = file_stat->st_mtime; + pClientInfo->extra_arg = crc32_file_info; + + pFileContext->fd = -1; + pFileContext->calc_crc32 = true; + pFileContext->continue_callback = calc_crc32_continue_callback; + return storage_read_from_file(pTask, 0, file_stat->st_size, + done_callback, store_path_index); +} + static int query_file_info_deal_response(struct fast_task_info *pTask, const char *filename, const char *true_filename, struct stat *file_stat, const int store_path_index) @@ -3544,7 +3579,6 @@ static int query_file_info_deal_response(struct fast_task_info *pTask, int crc32; int64_t file_size; StorageFileInfoForCRC32 finfo; - StorageFileInfoForCRC32 *crc32_file_info; memset(decode_buff, 0, sizeof(decode_buff)); base64_decode_auto(&g_fdfs_base64_context, filename + @@ -3560,31 +3594,15 @@ static int query_file_info_deal_response(struct fast_task_info *pTask, pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); - crc32_file_info = (StorageFileInfoForCRC32 *)fast_mblock_alloc_object( - &finfo_for_crc32_allocator); - if (crc32_file_info == NULL) - { - logError("file: "__FILE__", line: %d, " - "finfo_for_crc32_allocator %d bytes object fail", - __LINE__, (int)sizeof(StorageFileInfoForCRC32)); - return errno != 0 ? errno : ENOMEM; - } - crc32_file_info->storage_id = storage_id; - crc32_file_info->fsize = file_stat->st_size; - crc32_file_info->mtime = file_stat->st_mtime; - snprintf(pFileContext->fname2log, sizeof(pFileContext->fname2log), "%s", filename); snprintf(pFileContext->filename, sizeof(pFileContext->filename), "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename); - pClientInfo->extra_arg = crc32_file_info; - pFileContext->fd = -1; - pFileContext->calc_crc32 = true; - pFileContext->continue_callback = calc_crc32_continue_callback; - return storage_read_from_file(pTask, 0, file_stat->st_size, - calc_crc32_done_callback, store_path_index); + return push_calc_crc32_to_dio_queue(pTask, + calc_crc32_done_callback_for_query_finfo, + store_path_index, file_stat, storage_id); } finfo.storage_id = storage_id; @@ -4708,6 +4726,274 @@ static int storage_deal_active_test(struct fast_task_info *pTask) return 0; } +static int storage_server_check_appender_file(struct fast_task_info *pTask, + char *appender_filename, const int appender_filename_len, + char *true_filename, struct stat *stat_buf, int *store_path_index) +{ + StorageFileContext *pFileContext; + char decode_buff[64]; + int result; + int filename_len; + int buff_len; + int64_t appender_file_size; + + pFileContext = &(((StorageClientInfo *)pTask->arg)->file_context); + filename_len = appender_filename_len; + if ((result=storage_split_filename_ex(appender_filename, + &filename_len, true_filename, store_path_index)) != 0) + { + return result; + } + if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) + { + return result; + } + + snprintf(pFileContext->filename, sizeof(pFileContext->filename), + "%s/data/%s", g_fdfs_store_paths.paths[*store_path_index], + true_filename); + if (lstat(pFileContext->filename, stat_buf) == 0) + { + if (!S_ISREG(stat_buf->st_mode)) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, appender file: %s " + "is not a regular file", __LINE__, + pTask->client_ip, pFileContext->filename); + + return EINVAL; + } + } + else + { + result = errno != 0 ? errno : ENOENT; + if (result == ENOENT) + { + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, appender file: %s " + "not exist", __LINE__, pTask->client_ip, + pFileContext->filename); + } + else + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, stat appednder file %s fail, " + "errno: %d, error info: %s", + __LINE__, pTask->client_ip, + pFileContext->filename, result, STRERROR(result)); + } + + return result; + } + + strcpy(pFileContext->fname2log, appender_filename); + + memset(decode_buff, 0, sizeof(decode_buff)); + base64_decode_auto(&g_fdfs_base64_context, pFileContext->fname2log + + FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, + decode_buff, &buff_len); + + appender_file_size = buff2long(decode_buff + sizeof(int) * 2); + if (!IS_APPENDER_FILE(appender_file_size)) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, file: %s is not a valid " + "appender file, file size: %"PRId64, + __LINE__, pTask->client_ip, appender_filename, + appender_file_size); + + return EINVAL; + } + + return 0; +} + +static void calc_crc32_done_callback_for_regenerate( + struct fast_task_info *pTask, const int err_no) +{ + StorageClientInfo *pClientInfo; + StorageFileInfoForCRC32 *crc32_file_info; + StorageFileContext *pFileContext; + TrackerHeader *pHeader; + char full_filename[MAX_PATH_SIZE + 128]; + int result; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pFileContext = &(pClientInfo->file_context); + crc32_file_info = (StorageFileInfoForCRC32 *)pClientInfo->extra_arg; + pHeader = (TrackerHeader *)pTask->data; + + if (err_no == 0) + { + char *formatted_ext_name; + char return_filename[128]; + char filename[128]; + int filename_len; + + *filename = '\0'; + filename_len = 0; + formatted_ext_name = pFileContext->filename + + (strlen(pFileContext->filename) - + (FDFS_FILE_EXT_NAME_MAX_LEN + 1)); + + pFileContext->timestamp2log = g_current_time; + if ((result=storage_get_filename(pClientInfo, + pFileContext->timestamp2log, + crc32_file_info->fsize, pFileContext->crc32, + formatted_ext_name, filename, &filename_len, + full_filename)) == 0) + { + if (*full_filename == '\0') + { + result = EBUSY; + logWarning("file: "__FILE__", line: %d, " + "Can't generate uniq filename", __LINE__); + } + else + { + if (rename(pFileContext->filename, full_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + logWarning("file: "__FILE__", line: %d, " + "rename %s to %s fail, " + "errno: %d, error info: %s", __LINE__, + pFileContext->filename, full_filename, + result, STRERROR(result)); + } + else + { + char *p; + char binlog_msg[256]; + + filename_len = snprintf(return_filename, + sizeof(return_filename), + "%c"FDFS_STORAGE_DATA_DIR_FORMAT"/%s", + FDFS_STORAGE_STORE_PATH_PREFIX_CHAR, + pFileContext->extra_info.upload.trunk_info. + path.store_path_index, filename); + + sprintf(binlog_msg, "%s %s", + pFileContext->fname2log, return_filename); + result = storage_binlog_write( + pFileContext->timestamp2log, + STORAGE_OP_TYPE_SOURCE_RENAME_FILE, + binlog_msg); + + pClientInfo->total_length = sizeof(TrackerHeader) + + FDFS_GROUP_NAME_MAX_LEN + filename_len; + p = pTask->data + sizeof(TrackerHeader); + memcpy(p, g_group_name, FDFS_GROUP_NAME_MAX_LEN); + p += FDFS_GROUP_NAME_MAX_LEN; + memcpy(p, return_filename, filename_len); + + if (g_use_access_log) + { + snprintf(pFileContext->fname2log + + strlen(pFileContext->fname2log), + sizeof(pFileContext->fname2log), + "=>%s", return_filename); + } + } + } + } + } + else + { + result = err_no; + } + + fast_mblock_free_object(&finfo_for_crc32_allocator, crc32_file_info); + if (result != 0) + { + pClientInfo->total_length = sizeof(TrackerHeader); + } + + pClientInfo->total_offset = 0; + pTask->length = pClientInfo->total_length; + + pHeader->status = result; + pHeader->cmd = STORAGE_PROTO_CMD_RESP; + long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len); + + STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_RENAME_FILE, result); + storage_nio_notify(pTask); +} + +/** +FDFS_GROUP_NAME_MAX_LEN: group name +body length - FDFS_GROUP_NAME_MAX_LEN: appender filename +**/ +static int storage_server_regenerate_appender_filename(struct fast_task_info *pTask) +{ + StorageClientInfo *pClientInfo; + StorageFileContext *pFileContext; + FDFSTrunkFullInfo *pTrunkInfo; + char *p; + char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; + char appender_filename[128]; + char true_filename[128]; + struct stat stat_buf; + int appender_filename_len; + int64_t nInPackLen; + int result; + int store_path_index; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pFileContext = &(pClientInfo->file_context); + + nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader); + appender_filename_len = nInPackLen - FDFS_GROUP_NAME_MAX_LEN; + if ((nInPackLen < FDFS_GROUP_NAME_MAX_LEN + + FDFS_LOGIC_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + + FDFS_FILE_EXT_NAME_MAX_LEN + 1) + || (appender_filename_len >= sizeof(appender_filename))) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, invalid package length: %"PRId64, + __LINE__, pTask->client_ip, nInPackLen); + return EINVAL; + } + + p = pTask->data + sizeof(TrackerHeader); + memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN); + *(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0'; + if (strcmp(group_name, g_group_name) != 0) + { + logError("file: "__FILE__", line: %d, " + "client ip:%s, group_name: %s " + "not correct, should be: %s", + __LINE__, pTask->client_ip, + group_name, g_group_name); + return EINVAL; + } + + p += FDFS_GROUP_NAME_MAX_LEN; + memcpy(appender_filename, p, appender_filename_len); + *(appender_filename + appender_filename_len) = '\0'; + + STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, + appender_filename_len, pClientInfo); + + if ((result=storage_server_check_appender_file(pTask, + appender_filename, appender_filename_len, + true_filename, &stat_buf, &store_path_index)) != 0) + { + return result; + } + + pTrunkInfo = &(pClientInfo->file_context.extra_info.upload.trunk_info); + pClientInfo->file_context.extra_info.upload.if_sub_path_alloced = true; + pFileContext->extra_info.upload.trunk_info.path.store_path_index = + store_path_index; + pTrunkInfo->path.sub_path_high = strtol(true_filename, NULL, 16); + pTrunkInfo->path.sub_path_low = strtol(true_filename + 3, NULL, 16); + + return push_calc_crc32_to_dio_queue(pTask, + calc_crc32_done_callback_for_regenerate, + store_path_index, &stat_buf, + g_server_id_in_filename); +} + /** 8 bytes: appender filename length 8 bytes: file size @@ -4721,16 +5007,12 @@ static int storage_append_file(struct fast_task_info *pTask) char *p; char appender_filename[128]; char true_filename[128]; - char decode_buff[64]; struct stat stat_buf; int appender_filename_len; int64_t nInPackLen; int64_t file_bytes; - int64_t appender_file_size; int result; int store_path_index; - int filename_len; - int buff_len; pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); @@ -4778,76 +5060,16 @@ static int storage_append_file(struct fast_task_info *pTask) memcpy(appender_filename, p, appender_filename_len); *(appender_filename + appender_filename_len) = '\0'; p += appender_filename_len; - filename_len = appender_filename_len; - STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, \ + STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, appender_filename_len, pClientInfo); - if ((result=storage_split_filename_ex(appender_filename, \ - &filename_len, true_filename, &store_path_index)) != 0) - { - return result; - } - if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) - { - return result; - } - - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename); - if (lstat(pFileContext->filename, &stat_buf) == 0) - { - if (!S_ISREG(stat_buf.st_mode)) - { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, appender file: %s " \ - "is not a regular file", __LINE__, \ - pTask->client_ip, pFileContext->filename); - - return EINVAL; - } - } - else - { - result = errno != 0 ? errno : ENOENT; - if (result == ENOENT) - { - logWarning("file: "__FILE__", line: %d, " \ - "client ip: %s, appender file: %s " \ - "not exist", __LINE__, \ - pTask->client_ip, pFileContext->filename); - } - else - { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, stat appednder file %s fail" \ - ", errno: %d, error info: %s.", \ - __LINE__, pTask->client_ip, \ - pFileContext->filename, \ - result, STRERROR(result)); - } - - return result; - } - - strcpy(pFileContext->fname2log, appender_filename); - - memset(decode_buff, 0, sizeof(decode_buff)); - base64_decode_auto(&g_fdfs_base64_context, pFileContext->fname2log + \ - FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, \ - decode_buff, &buff_len); - - appender_file_size = buff2long(decode_buff + sizeof(int) * 2); - if (!IS_APPENDER_FILE(appender_file_size)) - { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, file: %s is not a valid " \ - "appender file, file size: %"PRId64, \ - __LINE__, pTask->client_ip, appender_filename, \ - appender_file_size); - - return EINVAL; - } + if ((result=storage_server_check_appender_file(pTask, + appender_filename, appender_filename_len, + true_filename, &stat_buf, &store_path_index)) != 0) + { + return result; + } if (file_bytes == 0) { @@ -4860,24 +5082,20 @@ static int storage_append_file(struct fast_task_info *pTask) pFileContext->calc_crc32 = false; pFileContext->calc_file_hash = false; - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \ - true_filename); - pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_APPEND_FILE; pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time; pFileContext->extra_info.upload.file_type = _FILE_TYPE_APPENDER; pFileContext->extra_info.upload.before_open_callback = NULL; pFileContext->extra_info.upload.before_close_callback = NULL; - pFileContext->extra_info.upload.trunk_info.path.store_path_index = \ + pFileContext->extra_info.upload.trunk_info.path.store_path_index = store_path_index; pFileContext->op = FDFS_STORAGE_FILE_OP_APPEND; pFileContext->open_flags = O_WRONLY | O_APPEND | g_extra_open_file_flags; pFileContext->continue_callback = storage_nio_notify; - return storage_write_to_file(pTask, stat_buf.st_size, file_bytes, \ - p - pTask->data, dio_write_file, \ - storage_append_file_done_callback, \ + return storage_write_to_file(pTask, stat_buf.st_size, file_bytes, + p - pTask->data, dio_write_file, + storage_append_file_done_callback, dio_append_finish_clean_up, store_path_index); } @@ -4895,17 +5113,13 @@ static int storage_modify_file(struct fast_task_info *pTask) char *p; char appender_filename[128]; char true_filename[128]; - char decode_buff[64]; struct stat stat_buf; int appender_filename_len; int64_t nInPackLen; int64_t file_offset; int64_t file_bytes; - int64_t appender_file_size; int result; int store_path_index; - int filename_len; - int buff_len; pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); @@ -4964,83 +5178,23 @@ static int storage_modify_file(struct fast_task_info *pTask) memcpy(appender_filename, p, appender_filename_len); *(appender_filename + appender_filename_len) = '\0'; p += appender_filename_len; - filename_len = appender_filename_len; STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, \ appender_filename_len, pClientInfo); - if ((result=storage_split_filename_ex(appender_filename, \ - &filename_len, true_filename, &store_path_index)) != 0) - { - return result; - } - if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) - { - return result; - } - - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename); - if (lstat(pFileContext->filename, &stat_buf) == 0) - { - if (!S_ISREG(stat_buf.st_mode)) - { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, appender file: %s " \ - "is not a regular file", __LINE__, \ - pTask->client_ip, pFileContext->filename); - - return EINVAL; - } - } - else - { - result = errno != 0 ? errno : ENOENT; - if (result == ENOENT) - { - logWarning("file: "__FILE__", line: %d, " \ - "client ip: %s, appender file: %s " \ - "not exist", __LINE__, \ - pTask->client_ip, pFileContext->filename); - } - else - { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, stat appednder file %s fail" \ - ", errno: %d, error info: %s.", \ - __LINE__, pTask->client_ip, \ - pFileContext->filename, \ - result, STRERROR(result)); - } - - return result; - } - - strcpy(pFileContext->fname2log, appender_filename); - - memset(decode_buff, 0, sizeof(decode_buff)); - base64_decode_auto(&g_fdfs_base64_context, pFileContext->fname2log + \ - FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, \ - decode_buff, &buff_len); - - appender_file_size = buff2long(decode_buff + sizeof(int) * 2); - if (!IS_APPENDER_FILE(appender_file_size)) - { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, file: %s is not a valid " \ - "appender file, file size: %"PRId64, \ - __LINE__, pTask->client_ip, appender_filename, \ - appender_file_size); - - return EINVAL; - } + if ((result=storage_server_check_appender_file(pTask, + appender_filename, appender_filename_len, + true_filename, &stat_buf, &store_path_index)) != 0) + { + return result; + } if (file_offset > stat_buf.st_size) { - logError("file: "__FILE__", line: %d, " \ - "client ip: %s, file offset: %"PRId64 \ - " is invalid, which > appender file size: %" \ - PRId64, __LINE__, pTask->client_ip, \ + logError("file: "__FILE__", line: %d, " + "client ip: %s, file offset: %"PRId64 + " is invalid, which > appender file size: %" + PRId64, __LINE__, pTask->client_ip, file_offset, (int64_t)stat_buf.st_size); return EINVAL; @@ -5057,23 +5211,20 @@ static int storage_modify_file(struct fast_task_info *pTask) pFileContext->calc_crc32 = false; pFileContext->calc_file_hash = false; - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename); - pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_MODIFY_FILE; pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time; pFileContext->extra_info.upload.file_type = _FILE_TYPE_APPENDER; pFileContext->extra_info.upload.before_open_callback = NULL; pFileContext->extra_info.upload.before_close_callback = NULL; - pFileContext->extra_info.upload.trunk_info.path.store_path_index = \ + pFileContext->extra_info.upload.trunk_info.path.store_path_index = store_path_index; pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE; pFileContext->open_flags = O_WRONLY | g_extra_open_file_flags; pFileContext->continue_callback = storage_nio_notify; - return storage_write_to_file(pTask, file_offset, file_bytes, \ - p - pTask->data, dio_write_file, \ - storage_modify_file_done_callback, \ + return storage_write_to_file(pTask, file_offset, file_bytes, + p - pTask->data, dio_write_file, + storage_modify_file_done_callback, dio_modify_finish_clean_up, store_path_index); } @@ -8137,8 +8288,8 @@ int storage_deal_task(struct fast_task_info *pTask) case STORAGE_PROTO_CMD_QUERY_FILE_INFO: ACCESS_LOG_INIT_FIELDS(); result = storage_server_query_file_info(pTask); - STORAGE_ACCESS_LOG(pTask, \ - ACCESS_LOG_ACTION_QUERY_FILE, \ + STORAGE_ACCESS_LOG(pTask, + ACCESS_LOG_ACTION_QUERY_FILE, result); break; case STORAGE_PROTO_CMD_CREATE_LINK: @@ -8198,6 +8349,13 @@ int storage_deal_task(struct fast_task_info *pTask) case STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE: result = storage_server_trunk_truncate_binlog_file(pTask); break; + case STORAGE_PROTO_CMD_REGENERATE_APPENDER_FILENAME: + ACCESS_LOG_INIT_FIELDS(); + result = storage_server_regenerate_appender_filename(pTask); + STORAGE_ACCESS_LOG(pTask, + ACCESS_LOG_ACTION_RENAME_FILE, + result); + break; default: logError("file: "__FILE__", line: %d, " \ "client ip: %s, unkown cmd: %d", \ diff --git a/storage/storage_sync.h b/storage/storage_sync.h index 5cc10b6..96b2230 100644 --- a/storage/storage_sync.h +++ b/storage/storage_sync.h @@ -14,20 +14,22 @@ #include "fastcommon/fc_list.h" #include "storage_func.h" -#define STORAGE_OP_TYPE_SOURCE_CREATE_FILE 'C' //upload file -#define STORAGE_OP_TYPE_SOURCE_APPEND_FILE 'A' //append file -#define STORAGE_OP_TYPE_SOURCE_DELETE_FILE 'D' //delete file -#define STORAGE_OP_TYPE_SOURCE_UPDATE_FILE 'U' //for whole file update such as metadata file -#define STORAGE_OP_TYPE_SOURCE_MODIFY_FILE 'M' //for part modify -#define STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE 'T' //truncate file -#define STORAGE_OP_TYPE_SOURCE_CREATE_LINK 'L' //create symbol link -#define STORAGE_OP_TYPE_REPLICA_CREATE_FILE 'c' -#define STORAGE_OP_TYPE_REPLICA_APPEND_FILE 'a' -#define STORAGE_OP_TYPE_REPLICA_DELETE_FILE 'd' -#define STORAGE_OP_TYPE_REPLICA_UPDATE_FILE 'u' -#define STORAGE_OP_TYPE_REPLICA_MODIFY_FILE 'm' -#define STORAGE_OP_TYPE_REPLICA_TRUNCATE_FILE 't' -#define STORAGE_OP_TYPE_REPLICA_CREATE_LINK 'l' +#define STORAGE_OP_TYPE_SOURCE_CREATE_FILE 'C' //upload file +#define STORAGE_OP_TYPE_SOURCE_APPEND_FILE 'A' //append file +#define STORAGE_OP_TYPE_SOURCE_DELETE_FILE 'D' //delete file +#define STORAGE_OP_TYPE_SOURCE_UPDATE_FILE 'U' //for whole file update such as metadata file +#define STORAGE_OP_TYPE_SOURCE_MODIFY_FILE 'M' //for part modify +#define STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE 'T' //truncate file +#define STORAGE_OP_TYPE_SOURCE_CREATE_LINK 'L' //create symbol link +#define STORAGE_OP_TYPE_SOURCE_RENAME_FILE 'R' //rename appender file to normal file +#define STORAGE_OP_TYPE_REPLICA_CREATE_FILE 'c' +#define STORAGE_OP_TYPE_REPLICA_APPEND_FILE 'a' +#define STORAGE_OP_TYPE_REPLICA_DELETE_FILE 'd' +#define STORAGE_OP_TYPE_REPLICA_UPDATE_FILE 'u' +#define STORAGE_OP_TYPE_REPLICA_MODIFY_FILE 'm' +#define STORAGE_OP_TYPE_REPLICA_TRUNCATE_FILE 't' +#define STORAGE_OP_TYPE_REPLICA_CREATE_LINK 'l' +#define STORAGE_OP_TYPE_REPLICA_RENAME_FILE 'r' #define STORAGE_BINLOG_BUFFER_SIZE 64 * 1024 #define STORAGE_BINLOG_LINE_SIZE 256 diff --git a/tracker/tracker_proto.h b/tracker/tracker_proto.h index e37fd6c..2d754dc 100644 --- a/tracker/tracker_proto.h +++ b/tracker/tracker_proto.h @@ -78,7 +78,7 @@ #define STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE 21 #define STORAGE_PROTO_CMD_QUERY_FILE_INFO 22 #define STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE 23 //create appender file -#define STORAGE_PROTO_CMD_APPEND_FILE 24 //append file +#define STORAGE_PROTO_CMD_APPEND_FILE 24 //append file #define STORAGE_PROTO_CMD_SYNC_APPEND_FILE 25 #define STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG 26 //fetch binlog of one store path #define STORAGE_PROTO_CMD_RESP TRACKER_PROTO_CMD_RESP @@ -92,10 +92,12 @@ #define STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS 32 //since V3.07, tracker to storage #define STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE 33 //since V3.07, trunk storage to storage -#define STORAGE_PROTO_CMD_MODIFY_FILE 34 //since V3.08 -#define STORAGE_PROTO_CMD_SYNC_MODIFY_FILE 35 //since V3.08 -#define STORAGE_PROTO_CMD_TRUNCATE_FILE 36 //since V3.08 -#define STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE 37 //since V3.08 +#define STORAGE_PROTO_CMD_MODIFY_FILE 34 //since V3.08 +#define STORAGE_PROTO_CMD_SYNC_MODIFY_FILE 35 //since V3.08 +#define STORAGE_PROTO_CMD_TRUNCATE_FILE 36 //since V3.08 +#define STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE 37 //since V3.08 +#define STORAGE_PROTO_CMD_REGENERATE_APPENDER_FILENAME 38 //since V6.02, rename appender file to normal file +#define STORAGE_PROTO_CMD_SYNC_RENAME_FILE 40 //since V6.02 //for overwrite all old metadata #define STORAGE_SET_METADATA_FLAG_OVERWRITE 'O' diff --git a/tracker/tracker_types.h b/tracker/tracker_types.h index 04fc51e..40cb765 100644 --- a/tracker/tracker_types.h +++ b/tracker/tracker_types.h @@ -117,6 +117,10 @@ #define FDFS_TRUNK_FILE_TRUE_SIZE(file_size) \ (file_size & 0xFFFFFFFF) +#define FDFS_FILE_TYPE_NORMAL 1 //normal file +#define FDFS_FILE_TYPE_APPENDER 2 //appender file +#define FDFS_FILE_TYPE_SLAVE 4 //slave file + #define FDFS_STORAGE_ID_MAX_SIZE 16 #define TRACKER_STORAGE_RESERVED_SPACE_FLAG_MB 0