From 2ebb51dcfd4d7868fdecd502d22f798eafa40b67 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 25 Aug 2022 18:22:16 +0800 Subject: [PATCH] support alloc_recv_buffer callback --- src/idempotency/client/receipt_handler.c | 2 +- src/sf_global.c | 2 +- src/sf_nio.c | 59 +++++++++++++++--------- src/sf_nio.h | 7 +-- src/sf_service.c | 6 ++- src/sf_service.h | 1 + src/sf_types.h | 2 + 7 files changed, 49 insertions(+), 30 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 4424ba9..3fcc4bb 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -493,7 +493,7 @@ static int do_init() return sf_service_init_ex2(&g_sf_context, "idemp-receipt", receipt_alloc_thread_extra_data, receipt_thread_loop_callback, - NULL, sf_proto_set_body_length, receipt_deal_task, + NULL, sf_proto_set_body_length, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), 0, receipt_init_task, NULL); } diff --git a/src/sf_global.c b/src/sf_global.c index c3cf487..29f2180 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -45,7 +45,7 @@ SFGlobalVariables g_sf_global_vars = { SFContext g_sf_context = { {'\0'}, NULL, 0, -1, -1, 0, 0, 1, DEFAULT_WORK_THREADS, - {'\0'}, {'\0'}, 0, true, true, NULL, NULL, NULL, + {'\0'}, {'\0'}, 0, true, true, NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL }; diff --git a/src/sf_nio.c b/src/sf_nio.c index aafc6a0..b307ed4 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -45,12 +45,14 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, + sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback) { sf_context->header_size = header_size; sf_context->set_body_length = set_body_length_func; + sf_context->alloc_recv_buffer = alloc_recv_buffer_func; sf_context->deal_task = deal_func; sf_context->task_cleanup_func = cleanup_func; sf_context->timeout_callback = timeout_callback; @@ -525,11 +527,13 @@ int sf_client_sock_read(int sock, short event, void *arg) task->network_timeout); if (task->length == 0) { //recv header recv_bytes = SF_CTX->header_size - task->offset; + bytes = read(sock, task->data + task->offset, recv_bytes); } else { recv_bytes = task->length - task->offset; + bytes = read(sock, task->recv_body + (task->offset - + SF_CTX->header_size), recv_bytes); } - bytes = read(sock, task->data + task->offset, recv_bytes); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; @@ -609,35 +613,44 @@ int sf_client_sock_read(int sock, short event, void *arg) return -1; } - if (task->length > task->size) { - int old_size; - - if (!SF_CTX->realloc_task_buffer) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, pkg length: %d exceeds " - "task size: %d, but realloc buffer disabled", - __LINE__, task->client_ip, task->size, - task->length); - + if (SF_CTX->alloc_recv_buffer != NULL) { + if ((task->recv_body=SF_CTX->alloc_recv_buffer(task)) == NULL) { ioevent_add_to_deleted_list(task); return -1; } + } else { + if (task->length > task->size) { + int old_size; - old_size = task->size; - if (free_queue_realloc_buffer(task, task->length) != 0) { - logError("file: "__FILE__", line: %d, " - "client ip: %s, realloc buffer size " - "from %d to %d fail", __LINE__, - task->client_ip, task->size, task->length); + if (!SF_CTX->realloc_task_buffer) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, pkg length: %d exceeds " + "task size: %d, but realloc buffer disabled", + __LINE__, task->client_ip, task->size, + task->length); - ioevent_add_to_deleted_list(task); - return -1; + ioevent_add_to_deleted_list(task); + return -1; + } + + old_size = task->size; + if (free_queue_realloc_buffer(task, task->length) != 0) { + logError("file: "__FILE__", line: %d, " + "client ip: %s, realloc buffer size " + "from %d to %d fail", __LINE__, + task->client_ip, task->size, task->length); + + ioevent_add_to_deleted_list(task); + return -1; + } + + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, task length: %d, realloc buffer " + "size from %d to %d", __LINE__, task->client_ip, + task->length, old_size, task->size); } - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, task length: %d, realloc buffer size " - "from %d to %d", __LINE__, task->client_ip, - task->length, old_size, task->size); + task->recv_body = task->data + SF_CTX->header_size; } } diff --git a/src/sf_nio.h b/src/sf_nio.h index 91c03df..9863e38 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -32,15 +32,16 @@ extern "C" { void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, + sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback); #define sf_set_parameters(header_size, set_body_length_func, \ - deal_func, cleanup_func, timeout_callback) \ + alloc_recv_buffer_func, deal_func, cleanup_func, timeout_callback) \ sf_set_parameters_ex(&g_sf_context, header_size, \ - set_body_length_func, deal_func, \ - cleanup_func, timeout_callback, NULL) + set_body_length_func, alloc_recv_buffer_func, \ + deal_func, cleanup_func, timeout_callback, NULL) static inline void sf_set_deal_task_func_ex(SFContext *sf_context, sf_deal_task_func deal_func) diff --git a/src/sf_service.c b/src/sf_service.c index e7e788d..6ac2d5e 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -121,6 +121,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, ThreadLoopCallback thread_loop_callback, sf_accept_done_callback accept_done_callback, sf_set_body_length_callback set_body_length_func, + sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size, @@ -141,8 +142,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, min_buff_size < g_sf_global_vars.max_buff_size; sf_context->accept_done_func = accept_done_callback; sf_set_parameters_ex(sf_context, proto_header_size, - set_body_length_func, deal_func, task_cleanup_func, - timeout_callback, release_buffer_callback); + set_body_length_func, alloc_recv_buffer_func, + deal_func, task_cleanup_func, timeout_callback, + release_buffer_callback); if ((result=sf_init_free_queues(task_arg_size, init_callback)) != 0) { return result; diff --git a/src/sf_service.h b/src/sf_service.h index 5cc271b..b5470cb 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -39,6 +39,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, ThreadLoopCallback thread_loop_callback, sf_accept_done_callback accept_done_callback, sf_set_body_length_callback set_body_length_func, + sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size, diff --git a/src/sf_types.h b/src/sf_types.h index 08573e1..a7ddfdf 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -37,6 +37,7 @@ typedef void (*sf_accept_done_callback)(struct fast_task_info *task, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); +typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task); typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); @@ -65,6 +66,7 @@ typedef struct sf_context { bool realloc_task_buffer; sf_deal_task_func deal_task; sf_set_body_length_callback set_body_length; + sf_alloc_recv_buffer_callback alloc_recv_buffer; sf_accept_done_callback accept_done_func; TaskCleanUpCallback task_cleanup_func; sf_recv_timeout_callback timeout_callback;