From 9bc762bffba23eb9eb53d36c8810f7321435a3a6 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 12 Nov 2019 19:07:49 +0800 Subject: [PATCH] sync regenerated appender file --- HISTORY | 2 +- storage/storage_func.c | 23 ++++ storage/storage_func.h | 4 + storage/storage_service.c | 246 +++++++++++++++++++++++++++----------- storage/storage_sync.c | 164 ++++++++++++++++++++++--- 5 files changed, 353 insertions(+), 86 deletions(-) diff --git a/HISTORY b/HISTORY index 6eb47f6..5ae2c36 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 6.02 2019-11-11 +Version 6.02 2019-11-12 * get_file_info calculate CRC32 for appender file type * disk recovery download file to local temp file then rename it when the local file exists diff --git a/storage/storage_func.c b/storage/storage_func.c index 4229b15..3102a15 100644 --- a/storage/storage_func.c +++ b/storage/storage_func.c @@ -2180,6 +2180,29 @@ int storage_set_tracker_client_ips(ConnectionInfo *conn, return 0; } +int storage_logic_to_local_full_filename(const char *logic_filename, + const int logic_filename_len, int *store_path_index, + char *full_filename, const int filename_size) +{ + int result; + int filename_len; + char true_filename[128]; + + filename_len = logic_filename_len; + if ((result=storage_split_filename_ex(logic_filename, + &filename_len, true_filename, store_path_index)) != 0) + { + return result; + } + if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) + { + return result; + } + + snprintf(full_filename, filename_size, "%s/data/%s", + g_fdfs_store_paths.paths[*store_path_index], true_filename); + return 0; +} /* int write_serialized(int fd, const char *buff, size_t count, const bool bSync) diff --git a/storage/storage_func.h b/storage/storage_func.h index 5960ac4..712d394 100644 --- a/storage/storage_func.h +++ b/storage/storage_func.h @@ -39,6 +39,10 @@ int storage_set_tracker_client_ips(ConnectionInfo *conn, int storage_check_and_make_data_path(); +int storage_logic_to_local_full_filename(const char *logic_filename, + const int logic_filename_len, int *store_path_index, + char *full_filename, const int filename_size); + #define STORAGE_CHOWN(path, current_uid, current_gid) \ if (!(g_run_by_gid == current_gid && g_run_by_uid == current_uid)) \ { \ diff --git a/storage/storage_service.c b/storage/storage_service.c index 4700c92..6245d83 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -489,32 +489,30 @@ static void storage_sync_truncate_file_done_callback( \ storage_nio_notify(pTask); } -static int storage_sync_copy_file_rename_filename( \ +static int storage_sync_copy_file_rename_filename( StorageFileContext *pFileContext) { char full_filename[MAX_PATH_SIZE + 256]; - char true_filename[128]; int filename_len; int result; int store_path_index; filename_len = strlen(pFileContext->fname2log); - if ((result=storage_split_filename_ex(pFileContext->fname2log, \ - &filename_len, true_filename, &store_path_index)) != 0) + if ((result=storage_logic_to_local_full_filename( + pFileContext->fname2log, filename_len, + &store_path_index, full_filename, + sizeof(full_filename))) != 0) { return result; } - snprintf(full_filename, sizeof(full_filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \ - true_filename); if (rename(pFileContext->filename, full_filename) != 0) { result = errno != 0 ? errno : EPERM; - logWarning("file: "__FILE__", line: %d, " \ - "rename %s to %s fail, " \ - "errno: %d, error info: %s", __LINE__, \ - pFileContext->filename, full_filename, \ + logWarning("file: "__FILE__", line: %d, " + "rename %s to %s fail, " + "errno: %d, error info: %s", __LINE__, + pFileContext->filename, full_filename, result, STRERROR(result)); return result; } @@ -4212,7 +4210,9 @@ static int storage_server_fetch_one_path_binlog_dealer( \ if (!(record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE || record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK - || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK)) + || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK + || record.op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE + || record.op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE)) { continue; } @@ -4230,8 +4230,8 @@ static int storage_server_fetch_one_path_binlog_dealer( \ } else { - snprintf(full_filename, sizeof(full_filename), "%s/data/%s", \ - g_fdfs_store_paths.paths[record.store_path_index], \ + snprintf(full_filename, sizeof(full_filename), "%s/data/%s", + g_fdfs_store_paths.paths[record.store_path_index], record.true_filename); if (lstat(full_filename, &stat_buf) != 0) { @@ -4241,10 +4241,10 @@ static int storage_server_fetch_one_path_binlog_dealer( \ } else { - logError("file: "__FILE__", line: %d, " \ - "call stat fail, file: %s, "\ - "error no: %d, error info: %s", \ - __LINE__, full_filename, \ + logError("file: "__FILE__", line: %d, " + "call stat fail, file: %s, " + "error no: %d, error info: %s", + __LINE__, full_filename, errno, STRERROR(errno)); result = errno != 0 ? errno : EPERM; break; @@ -4291,11 +4291,20 @@ static int storage_server_fetch_one_path_binlog_dealer( \ } } + if (record.op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE) + { + record.op_type = STORAGE_OP_TYPE_SOURCE_CREATE_FILE; + } + else if (record.op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE) + { + record.op_type = STORAGE_OP_TYPE_REPLICA_CREATE_FILE; + } + if (record.op_type == STORAGE_OP_TYPE_SOURCE_CREATE_FILE || record.op_type == STORAGE_OP_TYPE_REPLICA_CREATE_FILE) { - pOutBuff += sprintf(pOutBuff, "%d %c %s\n", \ - (int)record.timestamp, \ + pOutBuff += sprintf(pOutBuff, "%d %c %s\n", + (int)record.timestamp, record.op_type, record.filename); } else @@ -4344,7 +4353,7 @@ static int storage_server_fetch_one_path_binlog_dealer( \ src_filename + base_path_len + 6); } - if (pTask->size - (pOutBuff - pTask->data) < \ + if (pTask->size - (pOutBuff - pTask->data) < STORAGE_BINLOG_LINE_SIZE + FDFS_PROTO_PKG_LEN_SIZE) { break; @@ -4873,7 +4882,7 @@ static void calc_crc32_done_callback_for_regenerate( path.store_path_index, filename); sprintf(binlog_msg, "%s %s", - pFileContext->fname2log, return_filename); + return_filename, pFileContext->fname2log); result = storage_binlog_write( pFileContext->timestamp2log, STORAGE_OP_TYPE_SOURCE_RENAME_FILE, @@ -5223,7 +5232,6 @@ static int storage_do_truncate_file(struct fast_task_info *pTask) StorageFileContext *pFileContext; char *p; char appender_filename[128]; - char true_filename[128]; char decode_buff[64]; struct stat stat_buf; int appender_filename_len; @@ -5285,18 +5293,14 @@ static int storage_do_truncate_file(struct fast_task_info *pTask) STORAGE_ACCESS_STRCPY_FNAME2LOG(appender_filename, \ appender_filename_len, pClientInfo); - if ((result=storage_split_filename_ex(appender_filename, \ - &filename_len, true_filename, &store_path_index)) != 0) - { - return result; - } - if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) + if ((result=storage_logic_to_local_full_filename( + appender_filename, filename_len, + &store_path_index, pFileContext->filename, + sizeof(pFileContext->filename))) != 0) { return result; } - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename); if (lstat(pFileContext->filename, &stat_buf) == 0) { if (!S_ISREG(stat_buf.st_mode)) @@ -5376,9 +5380,6 @@ static int storage_do_truncate_file(struct fast_task_info *pTask) pFileContext->calc_crc32 = false; pFileContext->calc_file_hash = false; - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], true_filename); - pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_TRUNCATE_FILE; pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time; pFileContext->extra_info.upload.file_type = _FILE_TYPE_APPENDER; @@ -5968,7 +5969,6 @@ static int storage_sync_append_file(struct fast_task_info *pTask) TaskDealFunc deal_func; char *p; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; - char true_filename[128]; char filename[128]; bool need_write_file; int filename_len; @@ -6064,19 +6064,13 @@ static int storage_sync_append_file(struct fast_task_info *pTask) *(filename + filename_len) = '\0'; p += filename_len; - if ((result=storage_split_filename_ex(filename, \ - &filename_len, true_filename, &store_path_index)) != 0) + if ((result=storage_logic_to_local_full_filename( + filename, filename_len, + &store_path_index, pFileContext->filename, + sizeof(pFileContext->filename))) != 0) { return result; } - if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) - { - return result; - } - - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \ - true_filename); if (lstat(pFileContext->filename, &stat_buf) != 0) { @@ -6196,7 +6190,6 @@ static int storage_sync_modify_file(struct fast_task_info *pTask) TaskDealFunc deal_func; char *p; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; - char true_filename[128]; char filename[128]; bool need_write_file; int filename_len; @@ -6293,19 +6286,13 @@ static int storage_sync_modify_file(struct fast_task_info *pTask) *(filename + filename_len) = '\0'; p += filename_len; - if ((result=storage_split_filename_ex(filename, \ - &filename_len, true_filename, &store_path_index)) != 0) + if ((result=storage_logic_to_local_full_filename( + filename, filename_len, + &store_path_index, pFileContext->filename, + sizeof(pFileContext->filename))) != 0) { return result; } - if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) - { - return result; - } - - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \ - true_filename); if (lstat(pFileContext->filename, &stat_buf) != 0) { @@ -6396,7 +6383,6 @@ static int storage_sync_truncate_file(struct fast_task_info *pTask) StorageFileContext *pFileContext; char *p; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; - char true_filename[128]; char filename[128]; int filename_len; int64_t nInPackLen; @@ -6494,19 +6480,13 @@ static int storage_sync_truncate_file(struct fast_task_info *pTask) *(filename + filename_len) = '\0'; p += filename_len; - if ((result=storage_split_filename_ex(filename, \ - &filename_len, true_filename, &store_path_index)) != 0) + if ((result=storage_logic_to_local_full_filename( + filename, filename_len, + &store_path_index, pFileContext->filename, + sizeof(pFileContext->filename))) != 0) { return result; } - if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0) - { - return result; - } - - snprintf(pFileContext->filename, sizeof(pFileContext->filename), \ - "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], \ - true_filename); if (lstat(pFileContext->filename, &stat_buf) != 0) { @@ -6905,6 +6885,135 @@ static int storage_sync_link_file(struct fast_task_info *pTask) return STORAGE_STATUE_DEAL_FILE; } +static int storage_sync_rename_file(struct fast_task_info *pTask) +{ + StorageClientInfo *pClientInfo; + StorageFileContext *pFileContext; + char *p; + char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; + char dest_filename[128]; + char dest_full_filename[MAX_PATH_SIZE]; + char src_full_filename[MAX_PATH_SIZE]; + char src_filename[128]; + int64_t nInPackLen; + int dest_filename_len; + int src_filename_len; + int result; + int dest_store_path_index; + int src_store_path_index; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pFileContext = &(pClientInfo->file_context); + + nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader); + if (nInPackLen <= 2 * FDFS_PROTO_PKG_LEN_SIZE + + 4 + FDFS_GROUP_NAME_MAX_LEN) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, package size " + "%"PRId64" is not correct, " + "expect length > %d", __LINE__, + pTask->client_ip, nInPackLen, + 2 * FDFS_PROTO_PKG_LEN_SIZE + + 4 + FDFS_GROUP_NAME_MAX_LEN); + return EINVAL; + } + + p = pTask->data + sizeof(TrackerHeader); + + dest_filename_len = buff2long(p); + p += FDFS_PROTO_PKG_LEN_SIZE; + + src_filename_len = buff2long(p); + p += FDFS_PROTO_PKG_LEN_SIZE; + + if (dest_filename_len < 0 || dest_filename_len >= sizeof(dest_filename)) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, in request pkg, " + "filename length: %d is invalid, " + "which < 0 or >= %d", __LINE__, pTask->client_ip, + dest_filename_len, (int)sizeof(dest_filename)); + return EINVAL; + } + + pFileContext->timestamp2log = buff2int(p); + p += 4; + + memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN); + *(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0'; + p += FDFS_GROUP_NAME_MAX_LEN; + if (strcmp(group_name, g_group_name) != 0) + { + logError("file: "__FILE__", line: %d, " + "client ip:%s, group_name: %s " + "not correct, should be: %s", + __LINE__, pTask->client_ip, + group_name, g_group_name); + return EINVAL; + } + + if (nInPackLen != 2 * FDFS_PROTO_PKG_LEN_SIZE + 4 + + FDFS_GROUP_NAME_MAX_LEN + dest_filename_len + src_filename_len) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, in request pkg, " + "pgk length: %"PRId64" != bytes: %d", + __LINE__, pTask->client_ip, + nInPackLen, 2 * FDFS_PROTO_PKG_LEN_SIZE + + FDFS_GROUP_NAME_MAX_LEN + dest_filename_len + + src_filename_len); + return EINVAL; + } + + memcpy(dest_filename, p, dest_filename_len); + *(dest_filename + dest_filename_len) = '\0'; + p += dest_filename_len; + + memcpy(src_filename, p, src_filename_len); + *(src_filename + src_filename_len) = '\0'; + + if ((result=storage_logic_to_local_full_filename( + dest_filename, dest_filename_len, + &dest_store_path_index, dest_full_filename, + sizeof(dest_full_filename))) != 0) + { + return result; + } + + if (access(dest_full_filename, F_OK) == 0) + { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, dest file: %s " + "already exist", __LINE__, + pTask->client_ip, dest_full_filename); + return EEXIST; + } + + if ((result=storage_logic_to_local_full_filename( + src_filename, src_filename_len, + &src_store_path_index, src_full_filename, + sizeof(src_full_filename))) != 0) + { + return result; + } + + if (rename(src_full_filename, dest_full_filename) != 0) + { + result = errno != 0 ? errno : EPERM; + logWarning("file: "__FILE__", line: %d, " + "client ip: %s, rename %s to %s fail, " + "errno: %d, error info: %s", __LINE__, + pTask->client_ip, src_full_filename, + dest_full_filename, result, STRERROR(result)); + return result; + } + + return storage_binlog_write_ex(pFileContext->timestamp2log, + STORAGE_OP_TYPE_REPLICA_RENAME_FILE, + dest_filename, src_filename); +} + /** pkg format: Header @@ -8297,6 +8406,9 @@ int storage_deal_task(struct fast_task_info *pTask) case STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE: result = storage_sync_truncate_file(pTask); break; + case STORAGE_PROTO_CMD_SYNC_RENAME_FILE: + result = storage_sync_rename_file(pTask); + break; case STORAGE_PROTO_CMD_SYNC_CREATE_LINK: result = storage_sync_link_file(pTask); break; diff --git a/storage/storage_sync.c b/storage/storage_sync.c index 706e641..f771efe 100644 --- a/storage/storage_sync.c +++ b/storage/storage_sync.c @@ -105,8 +105,8 @@ static int storage_sync_copy_file(ConnectionInfo *pStorageServer, \ int result; bool need_sync_file; - if ((result=trunk_file_stat(pRecord->store_path_index, \ - pRecord->true_filename, pRecord->true_filename_len, \ + if ((result=trunk_file_stat(pRecord->store_path_index, + pRecord->true_filename, pRecord->true_filename_len, &stat_buf, &trunkInfo, &trunkHeader)) != 0) { if (result == ENOENT) @@ -138,7 +138,7 @@ static int storage_sync_copy_file(ConnectionInfo *pStorageServer, \ { FDFSFileInfo file_info; result = storage_query_file_info_ex(NULL, \ - pStorageServer, g_group_name, \ + pStorageServer, g_group_name, \ pRecord->filename, &file_info, true); if (result == 0) { @@ -312,11 +312,11 @@ static int storage_sync_modify_file(ConnectionInfo *pStorageServer, \ StorageBinLogReader *pReader, StorageBinLogRecord *pRecord, \ const char cmd) { -#define FIELD_COUNT 3 +#define SYNC_MODIFY_FIELD_COUNT 3 TrackerHeader *pHeader; char *p; char *pBuff; - char *fields[FIELD_COUNT]; + char *fields[SYNC_MODIFY_FIELD_COUNT]; char full_filename[MAX_PATH_SIZE]; char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256]; char in_buff[1]; @@ -328,8 +328,8 @@ static int storage_sync_modify_file(ConnectionInfo *pStorageServer, \ int result; int count; - if ((count=splitEx(pRecord->filename, ' ', fields, FIELD_COUNT)) \ - != FIELD_COUNT) + if ((count=splitEx(pRecord->filename, ' ', fields, SYNC_MODIFY_FIELD_COUNT)) + != SYNC_MODIFY_FIELD_COUNT) { logError("file: "__FILE__", line: %d, " \ "the format of binlog not correct, filename: %s", \ @@ -483,11 +483,11 @@ filename bytes : filename static int storage_sync_truncate_file(ConnectionInfo *pStorageServer, \ StorageBinLogReader *pReader, StorageBinLogRecord *pRecord) { -#define FIELD_COUNT 3 +#define SYNC_TRUNCATE_FIELD_COUNT 3 TrackerHeader *pHeader; char *p; char *pBuff; - char *fields[FIELD_COUNT]; + char *fields[SYNC_TRUNCATE_FIELD_COUNT]; char full_filename[MAX_PATH_SIZE]; char out_buff[sizeof(TrackerHeader)+FDFS_GROUP_NAME_MAX_LEN+256]; char in_buff[1]; @@ -498,8 +498,8 @@ static int storage_sync_truncate_file(ConnectionInfo *pStorageServer, \ int result; int count; - if ((count=splitEx(pRecord->filename, ' ', fields, FIELD_COUNT)) \ - != FIELD_COUNT) + if ((count=splitEx(pRecord->filename, ' ', fields, + SYNC_TRUNCATE_FIELD_COUNT)) != SYNC_TRUNCATE_FIELD_COUNT) { logError("file: "__FILE__", line: %d, " \ "the format of binlog not correct, filename: %s", \ @@ -968,6 +968,123 @@ static int storage_sync_link_file(ConnectionInfo *pStorageServer, \ return result; } +/** +8 bytes: dest filename length +8 bytes: source filename length +4 bytes: source op timestamp +FDFS_GROUP_NAME_MAX_LEN bytes: group_name +dest filename length: dest filename +source filename length: source filename +**/ +static int storage_sync_rename_file(ConnectionInfo *pStorageServer, + StorageBinLogReader *pReader, StorageBinLogRecord *pRecord) +{ + TrackerHeader *pHeader; + int result; + char out_buff[sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE + + 4 + FDFS_GROUP_NAME_MAX_LEN + 256]; + char in_buff[1]; + int out_body_len; + int64_t in_bytes; + char *pBuff; + char full_filename[MAX_PATH_SIZE]; + struct stat stat_buf; + + if (pRecord->op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE) + { + return storage_sync_copy_file(pStorageServer, + pReader, pRecord, + STORAGE_PROTO_CMD_SYNC_CREATE_FILE); + } + + snprintf(full_filename, sizeof(full_filename), "%s/data/%s", + g_fdfs_store_paths.paths[pRecord->store_path_index], + pRecord->true_filename); + if (lstat(full_filename, &stat_buf) != 0) + { + if (errno == ENOENT) + { + logWarning("file: "__FILE__", line: %d, " + "sync file rename, file: %s not exists, " + "maybe deleted later?", __LINE__, full_filename); + + return 0; + } + else + { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "call stat fail, file: %s, " + "error no: %d, error info: %s", + __LINE__, full_filename, + result, STRERROR(result)); + return result; + } + } + + pHeader = (TrackerHeader *)out_buff; + memset(out_buff, 0, sizeof(out_buff)); + long2buff(pRecord->filename_len, out_buff + sizeof(TrackerHeader)); + long2buff(pRecord->src_filename_len, out_buff + sizeof(TrackerHeader) + + FDFS_PROTO_PKG_LEN_SIZE); + int2buff(pRecord->timestamp, out_buff + sizeof(TrackerHeader) + + 2 * FDFS_PROTO_PKG_LEN_SIZE); + sprintf(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE + + 4, "%s", g_group_name); + memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE + + 4 + FDFS_GROUP_NAME_MAX_LEN, + pRecord->filename, pRecord->filename_len); + memcpy(out_buff + sizeof(TrackerHeader) + 2 * FDFS_PROTO_PKG_LEN_SIZE + + 4 + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len, + pRecord->src_filename, pRecord->src_filename_len); + + out_body_len = 2 * FDFS_PROTO_PKG_LEN_SIZE + 4 + + FDFS_GROUP_NAME_MAX_LEN + pRecord->filename_len + + pRecord->src_filename_len; + long2buff(out_body_len, pHeader->pkg_len); + pHeader->cmd = STORAGE_PROTO_CMD_SYNC_RENAME_FILE; + + if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, + sizeof(TrackerHeader) + out_body_len, + g_fdfs_network_timeout)) != 0) + { + logError("FILE: "__FILE__", line: %d, " + "send data to storage server %s:%d fail, " + "errno: %d, error info: %s", + __LINE__, pStorageServer->ip_addr, + pStorageServer->port, result, STRERROR(result)); + return result; + } + + pBuff = in_buff; + result = fdfs_recv_response(pStorageServer, &pBuff, 0, &in_bytes); + if (result != 0) + { + if (result == ENOENT) + { + return storage_sync_copy_file(pStorageServer, + pReader, pRecord, + STORAGE_PROTO_CMD_SYNC_CREATE_FILE); + } + else if (result == EEXIST) + { + logDebug("file: "__FILE__", line: %d, " + "storage server ip: %s:%d, data file: %s " + "already exists", __LINE__, pStorageServer->ip_addr, + pStorageServer->port, pRecord->filename); + return 0; + } + else + { + logError("file: "__FILE__", line: %d, " + "fdfs_recv_response fail, result: %d", + __LINE__, result); + } + } + + return result; +} + #define STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) \ if ((!pReader->need_sync_old) || pReader->sync_old_done || \ (pRecord->timestamp > pReader->until_timestamp)) \ @@ -1022,6 +1139,10 @@ static int storage_sync_data(StorageBinLogReader *pReader, \ result = storage_sync_truncate_file(pStorageServer, \ pReader, pRecord); break; + case STORAGE_OP_TYPE_SOURCE_RENAME_FILE: + result = storage_sync_rename_file(pStorageServer, + pReader, pRecord); + break; case STORAGE_OP_TYPE_SOURCE_CREATE_LINK: result = storage_sync_link_file(pStorageServer, \ pRecord); @@ -1048,6 +1169,11 @@ static int storage_sync_data(StorageBinLogReader *pReader, \ result = storage_sync_link_file(pStorageServer, \ pRecord); break; + case STORAGE_OP_TYPE_REPLICA_RENAME_FILE: + STARAGE_CHECK_IF_NEED_SYNC_OLD(pReader, pRecord) + result = storage_sync_rename_file(pStorageServer, + pReader, pRecord); + break; case STORAGE_OP_TYPE_REPLICA_APPEND_FILE: return 0; case STORAGE_OP_TYPE_REPLICA_MODIFY_FILE: @@ -2560,8 +2686,10 @@ int storage_binlog_read(StorageBinLogReader *pReader, \ memcpy(pRecord->filename, cols[2], pRecord->filename_len); *(pRecord->filename + pRecord->filename_len) = '\0'; - if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK || \ - pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK) + if (pRecord->op_type == STORAGE_OP_TYPE_SOURCE_CREATE_LINK || + pRecord->op_type == STORAGE_OP_TYPE_REPLICA_CREATE_LINK || + pRecord->op_type == STORAGE_OP_TYPE_SOURCE_RENAME_FILE || + pRecord->op_type == STORAGE_OP_TYPE_REPLICA_RENAME_FILE) { char *p; @@ -2573,14 +2701,14 @@ int storage_binlog_read(StorageBinLogReader *pReader, \ } else { - pRecord->src_filename_len = pRecord->filename_len - \ + pRecord->src_filename_len = pRecord->filename_len - (p - pRecord->filename) - 1; pRecord->filename_len = p - pRecord->filename; *p = '\0'; - memcpy(pRecord->src_filename, p + 1, \ + memcpy(pRecord->src_filename, p + 1, pRecord->src_filename_len); - *(pRecord->src_filename + \ + *(pRecord->src_filename + pRecord->src_filename_len) = '\0'; } } @@ -2591,8 +2719,8 @@ int storage_binlog_read(StorageBinLogReader *pReader, \ } pRecord->true_filename_len = pRecord->filename_len; - if ((result=storage_split_filename_ex(pRecord->filename, \ - &pRecord->true_filename_len, pRecord->true_filename, \ + if ((result=storage_split_filename_ex(pRecord->filename, + &pRecord->true_filename_len, pRecord->true_filename, &pRecord->store_path_index)) != 0) { return result;