diff --git a/storage/storage_dio.c b/storage/storage_dio.c index bec3431..aa88a22 100644 --- a/storage/storage_dio.c +++ b/storage/storage_dio.c @@ -25,6 +25,7 @@ #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/ioevent_loop.h" +#include "sf/sf_service.h" #include "storage_global.h" #include "storage_service.h" #include "trunk_mem.h" @@ -153,8 +154,10 @@ int storage_dio_queue_push(struct fast_task_info *pTask) pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); pContext = g_dio_contexts + pFileContext->dio_thread_index; + sf_hold_task(pTask); if ((result=blocked_queue_push(&(pContext->queue), pTask)) != 0) { + sf_release_task(pTask); ioevent_add_to_deleted_list(pTask); return result; } @@ -476,10 +479,11 @@ int dio_write_file(struct fast_task_info *pTask) } } - /* - logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \ - write_bytes, pTask->length, pFileContext->buff_offset); - */ + /* + logInfo("###dio fd: %d, write bytes: %d, pTask->length=%d, " + "buff_offset=%d", pFileContext->fd, write_bytes, + pTask->length, pFileContext->buff_offset); + */ pFileContext->offset += write_bytes; if (pFileContext->offset < pFileContext->end) @@ -742,9 +746,10 @@ static void *dio_thread_entrance(void* arg) while (SF_G_CONTINUE_FLAG) { while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL) - { - ((StorageClientInfo *)pTask->arg)->deal_func(pTask); - } + { + ((StorageClientInfo *)pTask->arg)->deal_func(pTask); + sf_release_task(pTask); + } } if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0) diff --git a/storage/storage_global.h b/storage/storage_global.h index 4134540..bfad481 100644 --- a/storage/storage_global.h +++ b/storage/storage_global.h @@ -155,4 +155,3 @@ int storage_insert_ip_addr_to_multi_ips(FDFSMultiIP *multi_ip, #endif #endif - diff --git a/storage/storage_service.c b/storage/storage_service.c index 4a00245..4142380 100644 --- a/storage/storage_service.c +++ b/storage/storage_service.c @@ -745,8 +745,7 @@ static void storage_download_file_done_callback( \ g_storage_stat.total_download_bytes, \ g_storage_stat.success_download_bytes, \ pFileContext->end - pFileContext->start) - - sf_nio_notify(pTask, SF_NIO_STAGE_SEND); + sf_nio_notify(pTask, SF_NIO_STAGE_SEND); } } @@ -1576,6 +1575,38 @@ void task_finish_clean_up(struct fast_task_info *pTask) sf_task_finish_clean_up(pTask); } +int storage_set_body_length(struct fast_task_info *pTask) +{ + StorageClientInfo *pClientInfo; + int64_t total_length; + + pClientInfo = (StorageClientInfo *)pTask->arg; + if (pClientInfo->total_length == 0) //header + { + total_length = buff2long(((TrackerHeader *) + pTask->data)->pkg_len); + if (total_length < 0) + { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %"PRId64" < 0", + __LINE__, pTask->client_ip, total_length); + return EINVAL; + } + + pClientInfo->total_length = total_length + sizeof(TrackerHeader); + if (pClientInfo->total_length > pTask->size) + { + pTask->length = pTask->size - sizeof(TrackerHeader); + } + else + { + pTask->length = total_length; + } + } + + return 0; +} + static int sock_accept_done_callback(struct fast_task_info *task, const in_addr_t client_addr, const bool bInnerPort) { @@ -1595,6 +1626,38 @@ static int sock_accept_done_callback(struct fast_task_info *task, return 0; } +static int sock_send_done_callback(struct fast_task_info *pTask, + const int length) +{ + StorageClientInfo *pClientInfo; + + pClientInfo = (StorageClientInfo *)pTask->arg; + pClientInfo->total_offset += length; + + if (pClientInfo->total_offset >= pClientInfo->total_length) + { + if (pClientInfo->total_length == sizeof(TrackerHeader) + && ((TrackerHeader *)pTask->data)->status == EINVAL) + { + logDebug("file: "__FILE__", line: %d, " + "close conn: #%d, client ip: %s", + __LINE__, pTask->event.fd, + pTask->client_ip); + return EINVAL; + } + + /* response done, try to recv again */ + pClientInfo->total_length = 0; + pClientInfo->total_offset = 0; + return 0; + } + else //continue to send file content + { + /* continue read from file */ + return storage_dio_queue_push(pTask); + } +} + static void *alloc_thread_extra_data_func(const int thread_index) { int result; @@ -1637,9 +1700,9 @@ int storage_service_init() } result = sf_service_init("fdfs_storaged", alloc_thread_extra_data_func, - NULL, sock_accept_done_callback, fdfs_set_body_length, - storage_deal_task, task_finish_clean_up, NULL, 1000, - sizeof(TrackerHeader), sizeof(StorageClientInfo)); + NULL, sock_accept_done_callback, storage_set_body_length, + sock_send_done_callback, storage_deal_task, task_finish_clean_up, + NULL, 1000, sizeof(TrackerHeader), sizeof(StorageClientInfo)); sf_enable_thread_notify(false); sf_set_remove_from_ready_list(false); @@ -4240,7 +4303,6 @@ static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile) pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader); - if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE + FDFS_FILE_EXT_NAME_MAX_LEN) { @@ -4348,7 +4410,7 @@ static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile) clean_func = dio_trunk_write_finish_clean_up; file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo)); - pFileContext->extra_info.upload.if_gen_filename = true; + pFileContext->extra_info.upload.if_gen_filename = true; trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \ sizeof(pFileContext->filename)); pFileContext->extra_info.upload.before_open_callback = \ @@ -7993,6 +8055,18 @@ static int storage_deal_task(struct fast_task_info *pTask, const int stage) pClientInfo = (StorageClientInfo *)pTask->arg; pHeader = (TrackerHeader *)pTask->data; + if (pClientInfo->total_offset == 0) + { + pClientInfo->total_offset = pTask->length; + } + else + { + pClientInfo->total_offset += pTask->length; + + /* continue write to file */ + return storage_dio_queue_push(pTask); + } + switch(pHeader->cmd) { case STORAGE_PROTO_CMD_DOWNLOAD_FILE: