nio reform for file upload and download

use_libserverframe
YuQing 2022-09-14 10:59:19 +08:00
parent c6a92de3d2
commit 87139983c8
3 changed files with 93 additions and 15 deletions

View File

@ -25,6 +25,7 @@
#include "fastcommon/logger.h"
#include "fastcommon/sockopt.h"
#include "fastcommon/ioevent_loop.h"
#include "sf/sf_service.h"
#include "storage_global.h"
#include "storage_service.h"
#include "trunk_mem.h"
@ -153,8 +154,10 @@ int storage_dio_queue_push(struct fast_task_info *pTask)
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
pContext = g_dio_contexts + pFileContext->dio_thread_index;
sf_hold_task(pTask);
if ((result=blocked_queue_push(&(pContext->queue), pTask)) != 0)
{
sf_release_task(pTask);
ioevent_add_to_deleted_list(pTask);
return result;
}
@ -477,8 +480,9 @@ int dio_write_file(struct fast_task_info *pTask)
}
/*
logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \
write_bytes, pTask->length, pFileContext->buff_offset);
logInfo("###dio fd: %d, write bytes: %d, pTask->length=%d, "
"buff_offset=%d", pFileContext->fd, write_bytes,
pTask->length, pFileContext->buff_offset);
*/
pFileContext->offset += write_bytes;
@ -744,6 +748,7 @@ static void *dio_thread_entrance(void* arg)
while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL)
{
((StorageClientInfo *)pTask->arg)->deal_func(pTask);
sf_release_task(pTask);
}
}

View File

@ -155,4 +155,3 @@ int storage_insert_ip_addr_to_multi_ips(FDFSMultiIP *multi_ip,
#endif
#endif

View File

@ -745,7 +745,6 @@ static void storage_download_file_done_callback( \
g_storage_stat.total_download_bytes, \
g_storage_stat.success_download_bytes, \
pFileContext->end - pFileContext->start)
sf_nio_notify(pTask, SF_NIO_STAGE_SEND);
}
}
@ -1576,6 +1575,38 @@ void task_finish_clean_up(struct fast_task_info *pTask)
sf_task_finish_clean_up(pTask);
}
int storage_set_body_length(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
int64_t total_length;
pClientInfo = (StorageClientInfo *)pTask->arg;
if (pClientInfo->total_length == 0) //header
{
total_length = buff2long(((TrackerHeader *)
pTask->data)->pkg_len);
if (total_length < 0)
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, pkg length: %"PRId64" < 0",
__LINE__, pTask->client_ip, total_length);
return EINVAL;
}
pClientInfo->total_length = total_length + sizeof(TrackerHeader);
if (pClientInfo->total_length > pTask->size)
{
pTask->length = pTask->size - sizeof(TrackerHeader);
}
else
{
pTask->length = total_length;
}
}
return 0;
}
static int sock_accept_done_callback(struct fast_task_info *task,
const in_addr_t client_addr, const bool bInnerPort)
{
@ -1595,6 +1626,38 @@ static int sock_accept_done_callback(struct fast_task_info *task,
return 0;
}
static int sock_send_done_callback(struct fast_task_info *pTask,
const int length)
{
StorageClientInfo *pClientInfo;
pClientInfo = (StorageClientInfo *)pTask->arg;
pClientInfo->total_offset += length;
if (pClientInfo->total_offset >= pClientInfo->total_length)
{
if (pClientInfo->total_length == sizeof(TrackerHeader)
&& ((TrackerHeader *)pTask->data)->status == EINVAL)
{
logDebug("file: "__FILE__", line: %d, "
"close conn: #%d, client ip: %s",
__LINE__, pTask->event.fd,
pTask->client_ip);
return EINVAL;
}
/* response done, try to recv again */
pClientInfo->total_length = 0;
pClientInfo->total_offset = 0;
return 0;
}
else //continue to send file content
{
/* continue read from file */
return storage_dio_queue_push(pTask);
}
}
static void *alloc_thread_extra_data_func(const int thread_index)
{
int result;
@ -1637,9 +1700,9 @@ int storage_service_init()
}
result = sf_service_init("fdfs_storaged", alloc_thread_extra_data_func,
NULL, sock_accept_done_callback, fdfs_set_body_length,
storage_deal_task, task_finish_clean_up, NULL, 1000,
sizeof(TrackerHeader), sizeof(StorageClientInfo));
NULL, sock_accept_done_callback, storage_set_body_length,
sock_send_done_callback, storage_deal_task, task_finish_clean_up,
NULL, 1000, sizeof(TrackerHeader), sizeof(StorageClientInfo));
sf_enable_thread_notify(false);
sf_set_remove_from_ready_list(false);
@ -4240,7 +4303,6 @@ static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE +
FDFS_FILE_EXT_NAME_MAX_LEN)
{
@ -7993,6 +8055,18 @@ static int storage_deal_task(struct fast_task_info *pTask, const int stage)
pClientInfo = (StorageClientInfo *)pTask->arg;
pHeader = (TrackerHeader *)pTask->data;
if (pClientInfo->total_offset == 0)
{
pClientInfo->total_offset = pTask->length;
}
else
{
pClientInfo->total_offset += pTask->length;
/* continue write to file */
return storage_dio_queue_push(pTask);
}
switch(pHeader->cmd)
{
case STORAGE_PROTO_CMD_DOWNLOAD_FILE: