diff --git a/HISTORY b/HISTORY index f77e853..1b88095 100644 --- a/HISTORY +++ b/HISTORY @@ -1,4 +1,7 @@ +Version 6.02 2019-11-06 + * get_file_info calculate CRC32 for appender file type + Version 6.01 2019-10-25 * compress and uncompress binlog file by gzip when need, config items in storage.conf: compress_binlog and compress_binlog_time diff --git a/storage/storage_dio.c b/storage/storage_dio.c index 5936c17..90caa76 100644 --- a/storage/storage_dio.c +++ b/storage/storage_dio.c @@ -144,12 +144,12 @@ void storage_dio_terminate() int storage_dio_queue_push(struct fast_task_info *pTask) { - StorageClientInfo *pClientInfo; + StorageClientInfo *pClientInfo; StorageFileContext *pFileContext; struct storage_dio_context *pContext; int result; - pClientInfo = (StorageClientInfo *)pTask->arg; + pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); pContext = g_dio_contexts + pFileContext->dio_thread_index; @@ -245,7 +245,7 @@ int dio_discard_file(struct fast_task_info *pTask) else { pFileContext->buff_offset = 0; - storage_nio_notify(pTask); //notify nio to deal + pFileContext->continue_callback(pTask); } return 0; @@ -353,6 +353,12 @@ int dio_read_file(struct fast_task_info *pTask) break; } + if (pFileContext->calc_crc32) + { + pFileContext->crc32 = CRC32_ex(pTask->data + pTask->length, + read_bytes, pFileContext->crc32); + } + pTask->length += read_bytes; pFileContext->offset += read_bytes; @@ -363,7 +369,7 @@ int dio_read_file(struct fast_task_info *pTask) if (pFileContext->offset < pFileContext->end) { - storage_nio_notify(pTask); //notify nio to deal + pFileContext->continue_callback(pTask); } else { @@ -371,6 +377,11 @@ int dio_read_file(struct fast_task_info *pTask) close(pFileContext->fd); pFileContext->fd = -1; + if (pFileContext->calc_crc32) + { + pFileContext->crc32 = CRC32_FINAL( \ + pFileContext->crc32); + } pFileContext->done_callback(pTask, result); } @@ -475,7 +486,7 @@ int dio_write_file(struct fast_task_info *pTask) if (pFileContext->offset < pFileContext->end) { pFileContext->buff_offset = 0; - storage_nio_notify(pTask); //notify nio to deal + pFileContext->continue_callback(pTask); } else { diff --git a/storage/storage_nio.h b/storage/storage_nio.h index 79094a5..d33870f 100644 --- a/storage/storage_nio.h +++ b/storage/storage_nio.h @@ -46,6 +46,8 @@ typedef void (*DeleteFileLogCallback)(struct fast_task_info *pTask, \ typedef void (*FileDealDoneCallback)(struct fast_task_info *pTask, \ const int err_no); +typedef int (*FileDealContinueCallback)(struct fast_task_info *pTask); + typedef int (*FileBeforeOpenCallback)(struct fast_task_info *pTask); typedef int (*FileBeforeCloseCallback)(struct fast_task_info *pTask); @@ -109,6 +111,7 @@ typedef struct int64_t start; //the start offset of file int64_t end; //the end offset of file int64_t offset; //the current offset of file + FileDealContinueCallback continue_callback; FileDealDoneCallback done_callback; DeleteFileLogCallback log_callback; diff --git a/storage/storage_service.c b/storage/storage_service.c index 4a24202..77e6173 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -28,6 +28,7 @@ #include "fastcommon/shared_func.h" #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" +#include "fastcommon/fast_mblock.h" #include "tracker_types.h" #include "tracker_proto.h" #include "storage_service.h" @@ -59,6 +60,12 @@ #define ACCESS_LOG_ACTION_TRUNCATE_FILE "truncate" #define ACCESS_LOG_ACTION_QUERY_FILE "status" +typedef struct +{ + int storage_id; + time_t mtime; + int64_t fsize; +} StorageFileInfoForCRC32; pthread_mutex_t g_storage_thread_lock; int g_storage_thread_count = 0; @@ -69,6 +76,8 @@ static int64_t temp_file_sequence = 0; static pthread_mutex_t path_index_thread_lock; static pthread_mutex_t stat_count_thread_lock; +static struct fast_mblock_man finfo_for_crc32_allocator; + static void *work_thread_entrance(void* arg); extern int storage_client_create_link(ConnectionInfo *pTrackerServer, \ @@ -1708,6 +1717,9 @@ int storage_service_init() return result; } + result = fast_mblock_init(&finfo_for_crc32_allocator, + sizeof(StorageFileInfoForCRC32), 1024); + return result; } @@ -1901,7 +1913,7 @@ void storage_accept_loop(int server_sock) accept_thread_entrance((void *)(long)server_sock); } -void storage_nio_notify(struct fast_task_info *pTask) +int storage_nio_notify(struct fast_task_info *pTask) { StorageClientInfo *pClientInfo; struct storage_nio_thread_data *pThreadData; @@ -1922,6 +1934,8 @@ void storage_nio_notify(struct fast_task_info *pTask) __LINE__, result, STRERROR(result)); abort(); } + + return 0; } static void *work_thread_entrance(void* arg) @@ -2775,6 +2789,7 @@ static int storage_trunk_do_create_link(struct fast_task_info *pTask, \ pTask, pFileContext->extra_info.upload.trunk_info.path. \ store_path_index, pFileContext->op); + pFileContext->continue_callback = storage_nio_notify; pFileContext->done_callback = done_callback; pClientInfo->clean_func = dio_trunk_write_finish_clean_up; @@ -3432,6 +3447,153 @@ static int storage_server_trunk_sync_binlog(struct fast_task_info *pTask) return trunk_binlog_write_buffer(binlog_buff, nInPackLen); } +static int query_file_info_response(struct fast_task_info *pTask, + const StorageFileInfoForCRC32 *finfo, const int crc32) +{ + char *p; + + p = pTask->data + sizeof(TrackerHeader); + long2buff(finfo->fsize, p); + p += FDFS_PROTO_PKG_LEN_SIZE; + long2buff(finfo->mtime, p); + p += FDFS_PROTO_PKG_LEN_SIZE; + long2buff(crc32, p); + p += FDFS_PROTO_PKG_LEN_SIZE; + + memset(p, 0, IP_ADDRESS_SIZE); + if (fdfs_get_server_id_type(finfo->storage_id) == FDFS_ID_TYPE_SERVER_ID) + { + if (g_use_storage_id) + { + FDFSStorageIdInfo *pStorageIdInfo; + char id[16]; + + sprintf(id, "%d", finfo->storage_id); + pStorageIdInfo = fdfs_get_storage_by_id(id); + if (pStorageIdInfo != NULL) + { + strcpy(p, fdfs_get_ipaddr_by_peer_ip( + &pStorageIdInfo->ip_addrs, + pTask->client_ip)); + } + } + } + else + { + struct in_addr ip_addr; + memset(&ip_addr, 0, sizeof(ip_addr)); + ip_addr.s_addr = finfo->storage_id; + inet_ntop(AF_INET, &ip_addr, p, IP_ADDRESS_SIZE); + } + p += IP_ADDRESS_SIZE; + + ((StorageClientInfo *)pTask->arg)->total_length = p - pTask->data; + return 0; +} + +static void calc_crc32_done_callback(struct fast_task_info *pTask, + const int err_no) +{ + StorageClientInfo *pClientInfo; + StorageFileInfoForCRC32 *crc32_file_info; + StorageFileContext *pFileContext; + TrackerHeader *pHeader; + int result; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pFileContext = &(pClientInfo->file_context); + crc32_file_info = (StorageFileInfoForCRC32 *)pClientInfo->extra_arg; + + if (err_no == 0) + { + result = query_file_info_response(pTask, crc32_file_info, + pFileContext->crc32); + } + else + { + result = err_no; + pClientInfo->total_length = sizeof(TrackerHeader); + } + + fast_mblock_free_object(&finfo_for_crc32_allocator, crc32_file_info); + + pClientInfo->total_offset = 0; + pTask->length = pClientInfo->total_length; + + pHeader = (TrackerHeader *)pTask->data; + pHeader->status = result; + pHeader->cmd = STORAGE_PROTO_CMD_RESP; + long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len); + + storage_nio_notify(pTask); +} + +static int calc_crc32_continue_callback(struct fast_task_info *pTask) +{ + pTask->length = 0; + return storage_dio_queue_push(pTask); +} + +static int query_file_info_deal_response(struct fast_task_info *pTask, + const char *filename, const char *true_filename, + struct stat *file_stat, const int store_path_index) +{ + char decode_buff[64]; + int buff_len; + int storage_id; + int crc32; + int64_t file_size; + StorageFileInfoForCRC32 finfo; + StorageFileInfoForCRC32 *crc32_file_info; + + memset(decode_buff, 0, sizeof(decode_buff)); + base64_decode_auto(&g_fdfs_base64_context, filename + + FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, + decode_buff, &buff_len); + storage_id = ntohl(buff2int(decode_buff)); + file_size = buff2long(decode_buff + sizeof(int) * 2); + if (IS_APPENDER_FILE(file_size)) + { + StorageClientInfo *pClientInfo; + StorageFileContext *pFileContext; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pFileContext = &(pClientInfo->file_context); + + crc32_file_info = (StorageFileInfoForCRC32 *)fast_mblock_alloc_object( + &finfo_for_crc32_allocator); + if (crc32_file_info == NULL) + { + logError("file: "__FILE__", line: %d, " + "finfo_for_crc32_allocator %d bytes object fail", + __LINE__, (int)sizeof(StorageFileInfoForCRC32)); + return errno != 0 ? errno : ENOMEM; + } + crc32_file_info->storage_id = storage_id; + crc32_file_info->fsize = file_stat->st_size; + crc32_file_info->mtime = file_stat->st_mtime; + + snprintf(pFileContext->fname2log, sizeof(pFileContext->fname2log), + "%s", filename); + snprintf(pFileContext->filename, sizeof(pFileContext->filename), + "%s/data/%s", g_fdfs_store_paths.paths[store_path_index], + true_filename); + pClientInfo->extra_arg = crc32_file_info; + + pFileContext->fd = -1; + pFileContext->calc_crc32 = true; + pFileContext->continue_callback = calc_crc32_continue_callback; + return storage_read_from_file(pTask, 0, file_stat->st_size, + calc_crc32_done_callback, store_path_index); + } + + finfo.storage_id = storage_id; + finfo.fsize = file_stat->st_size; + finfo.mtime = file_stat->st_mtime; + crc32 = buff2int(decode_buff + sizeof(int) * 4); + return query_file_info_response(pTask, &finfo, crc32); +} + /** FDFS_GROUP_NAME_MAX_LEN bytes: group_name filename @@ -3441,11 +3603,9 @@ static int storage_server_query_file_info(struct fast_task_info *pTask) StorageClientInfo *pClientInfo; char *in_buff; char *filename; - char *p; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; char true_filename[128]; char src_filename[MAX_PATH_SIZE + 128]; - char decode_buff[64]; struct stat file_lstat; struct stat file_stat; FDFSTrunkFullInfo trunkInfo; @@ -3454,11 +3614,8 @@ static int storage_server_query_file_info(struct fast_task_info *pTask) int store_path_index; int filename_len; int true_filename_len; - int crc32; - int storage_id; int result; int len; - int buff_len; bool bSilence; pClientInfo = (StorageClientInfo *)pTask->arg; @@ -3597,10 +3754,9 @@ static int storage_server_query_file_info(struct fast_task_info *pTask) } else { - char full_filename[MAX_PATH_SIZE + 128]; - - sprintf(full_filename, "%s/data/%s", \ - g_fdfs_store_paths.paths[store_path_index], \ + char full_filename[MAX_PATH_SIZE + 128]; + sprintf(full_filename, "%s/data/%s", + g_fdfs_store_paths.paths[store_path_index], true_filename); if ((len=readlink(full_filename, src_filename, \ sizeof(src_filename))) < 0) @@ -3624,6 +3780,8 @@ static int storage_server_query_file_info(struct fast_task_info *pTask) return result; } } + + file_stat.st_mtime = file_lstat.st_mtime; } else { @@ -3641,50 +3799,8 @@ static int storage_server_query_file_info(struct fast_task_info *pTask) return EINVAL; } - memset(decode_buff, 0, sizeof(decode_buff)); - base64_decode_auto(&g_fdfs_base64_context, filename + \ - FDFS_LOGIC_FILE_PATH_LEN, FDFS_FILENAME_BASE64_LENGTH, \ - decode_buff, &buff_len); - storage_id = ntohl(buff2int(decode_buff)); - crc32 = buff2int(decode_buff + sizeof(int) * 4); - - p = pTask->data + sizeof(TrackerHeader); - long2buff(file_stat.st_size, p); - p += FDFS_PROTO_PKG_LEN_SIZE; - long2buff(file_lstat.st_mtime, p); - p += FDFS_PROTO_PKG_LEN_SIZE; - long2buff(crc32, p); - p += FDFS_PROTO_PKG_LEN_SIZE; - - memset(p, 0, IP_ADDRESS_SIZE); - if (fdfs_get_server_id_type(storage_id) == FDFS_ID_TYPE_SERVER_ID) - { - if (g_use_storage_id) - { - FDFSStorageIdInfo *pStorageIdInfo; - char id[16]; - - sprintf(id, "%d", storage_id); - pStorageIdInfo = fdfs_get_storage_by_id(id); - if (pStorageIdInfo != NULL) - { - strcpy(p, fdfs_get_ipaddr_by_peer_ip( - &pStorageIdInfo->ip_addrs, - pTask->client_ip)); - } - } - } - else - { - struct in_addr ip_addr; - memset(&ip_addr, 0, sizeof(ip_addr)); - ip_addr.s_addr = storage_id; - inet_ntop(AF_INET, &ip_addr, p, IP_ADDRESS_SIZE); - } - p += IP_ADDRESS_SIZE; - - pClientInfo->total_length = p - pTask->data; - return 0; + return query_file_info_deal_response(pTask, filename, + true_filename, &file_stat, store_path_index); } #define CHECK_TRUNK_SERVER(pTask) \ @@ -4563,8 +4679,9 @@ static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile) | g_extra_open_file_flags; } - return storage_write_to_file(pTask, file_offset, file_bytes, \ - p - pTask->data, dio_write_file, \ + pFileContext->continue_callback = storage_nio_notify; + return storage_write_to_file(pTask, file_offset, file_bytes, \ + p - pTask->data, dio_write_file, \ storage_upload_file_done_callback, \ clean_func, store_path_index); } @@ -4756,6 +4873,7 @@ static int storage_append_file(struct fast_task_info *pTask) store_path_index; pFileContext->op = FDFS_STORAGE_FILE_OP_APPEND; pFileContext->open_flags = O_WRONLY | O_APPEND | g_extra_open_file_flags; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, stat_buf.st_size, file_bytes, \ p - pTask->data, dio_write_file, \ @@ -4951,6 +5069,7 @@ static int storage_modify_file(struct fast_task_info *pTask) store_path_index; pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE; pFileContext->open_flags = O_WRONLY | g_extra_open_file_flags; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, file_offset, file_bytes, \ p - pTask->data, dio_write_file, \ @@ -5134,6 +5253,7 @@ static int storage_do_truncate_file(struct fast_task_info *pTask) store_path_index; pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE; pFileContext->open_flags = O_WRONLY | g_extra_open_file_flags; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, remain_bytes, \ stat_buf.st_size, 0, dio_truncate_file, \ @@ -5368,6 +5488,7 @@ static int storage_upload_slave_file(struct fast_task_info *pTask) pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE; pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \ | g_extra_open_file_flags; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, 0, file_bytes, p - pTask->data, \ dio_write_file, storage_upload_file_done_callback, \ @@ -5680,6 +5801,7 @@ static int storage_sync_copy_file(struct fast_task_info *pTask, \ pFileContext->calc_crc32 = false; pFileContext->calc_file_hash = false; strcpy(pFileContext->fname2log, filename); + pFileContext->continue_callback = storage_nio_notify; if (have_file_content) { @@ -5915,6 +6037,7 @@ static int storage_sync_append_file(struct fast_task_info *pTask) pFileContext->calc_file_hash = false; pFileContext->extra_info.upload.before_open_callback = NULL; pFileContext->extra_info.upload.before_close_callback = NULL; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, start_offset, append_bytes, \ p - pTask->data, deal_func, \ @@ -6116,6 +6239,7 @@ static int storage_sync_modify_file(struct fast_task_info *pTask) pFileContext->calc_file_hash = false; pFileContext->extra_info.upload.before_open_callback = NULL; pFileContext->extra_info.upload.before_close_callback = NULL; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, start_offset, modify_bytes, \ p - pTask->data, deal_func, \ @@ -6301,6 +6425,7 @@ static int storage_sync_truncate_file(struct fast_task_info *pTask) pFileContext->calc_file_hash = false; pFileContext->extra_info.upload.before_open_callback = NULL; pFileContext->extra_info.upload.before_close_callback = NULL; + pFileContext->continue_callback = storage_nio_notify; return storage_write_to_file(pTask, new_file_size, \ old_file_size, 0, dio_truncate_file, \ @@ -6761,6 +6886,8 @@ static int storage_server_get_metadata(struct fast_task_info *pTask) } pFileContext->fd = -1; + pFileContext->calc_crc32 = false; + pFileContext->continue_callback = storage_nio_notify; return storage_read_from_file(pTask, 0, file_bytes, \ storage_get_metadata_done_callback, store_path_index); } @@ -6941,6 +7068,8 @@ static int storage_server_download_file(struct fast_task_info *pTask) g_fdfs_store_paths.paths[store_path_index], true_filename); } + pFileContext->calc_crc32 = false; + pFileContext->continue_callback = storage_nio_notify; return storage_read_from_file(pTask, file_offset, download_bytes, \ storage_download_file_done_callback, store_path_index); } @@ -7006,6 +7135,10 @@ static int storage_read_from_file(struct fast_task_info *pTask, \ pHeader->cmd = STORAGE_PROTO_CMD_RESP; long2buff(download_bytes, pHeader->pkg_len); + if (pFileContext->calc_crc32) + { + pFileContext->crc32 = CRC32_XINIT; + } if ((result=storage_dio_queue_push(pTask)) != 0) { if (pFileContext->fd >= 0) diff --git a/storage/storage_service.h b/storage/storage_service.h index fb896b0..ea96ce5 100644 --- a/storage/storage_service.h +++ b/storage/storage_service.h @@ -36,7 +36,7 @@ void storage_service_destroy(); int fdfs_stat_file_sync_func(void *args); int storage_deal_task(struct fast_task_info *pTask); -void storage_nio_notify(struct fast_task_info *pTask); +int storage_nio_notify(struct fast_task_info *pTask); void storage_accept_loop(int server_sock); int storage_terminate_threads();