diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index b6db0bf..061d3ef 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -523,6 +523,7 @@ static int do_init(FCAddressPtrArray *address_array) { const int task_arg_size = 0; const bool double_buffers = false; + const bool need_shrink_task_buffer = false; const bool explicit_post_recv = false; int result; int bytes; @@ -555,8 +556,8 @@ static int do_init(FCAddressPtrArray *address_array) NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, - task_arg_size, double_buffers, explicit_post_recv, - receipt_init_task, pd, NULL); + task_arg_size, double_buffers, need_shrink_task_buffer, + explicit_post_recv, receipt_init_task, pd, NULL); } int receipt_handler_init(FCAddressPtrArray *address_array) diff --git a/src/sf_nio.c b/src/sf_nio.c index 44cbc58..291583c 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -269,14 +269,16 @@ static inline int set_read_event(struct fast_task_info *task) int sf_set_read_event(struct fast_task_info *task) { + /* reset recv offset and length */ + task->recv.ptr->offset = 0; + task->recv.ptr->length = 0; + #if IOEVENT_USE_URING if (task->handler->use_io_uring) { return 0; } #endif - task->recv.ptr->offset = 0; - task->recv.ptr->length = 0; task->nio_stages.current = SF_NIO_STAGE_RECV; return set_read_event(task); } diff --git a/src/sf_service.c b/src/sf_service.c index a9adfee..26393b7 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -34,6 +34,7 @@ #include "fastcommon/ioevent_loop.h" #include "fastcommon/fc_memory.h" #include "sf_nio.h" +#include "sf_proto.h" #include "sf_util.h" #include "sf_service.h" @@ -60,9 +61,9 @@ struct worker_thread_context { static void *worker_thread_entrance(void *arg); static int sf_init_free_queue(SFContext *sf_context, const char *name, - const bool double_buffers, const int task_padding_size, - const int task_arg_size, TaskInitCallback init_callback, - void *init_arg) + const bool double_buffers, const bool need_shrink_task_buffer, + const int task_padding_size, const int task_arg_size, + TaskInitCallback init_callback, void *init_arg) { int result; int m; @@ -82,9 +83,9 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, } alloc_conn_once = 256 / m; return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers, - sf_context->net_buffer_cfg.max_connections, alloc_conn_once, - sf_context->net_buffer_cfg.min_buff_size, sf_context-> - net_buffer_cfg.max_buff_size, task_padding_size, + need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections, + alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, + sf_context->net_buffer_cfg.max_buff_size, task_padding_size, task_arg_size, init_callback, init_arg); } @@ -100,8 +101,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - const bool explicit_post_recv, TaskInitCallback init_callback, - void *init_arg, sf_release_buffer_callback release_buffer_callback) + const bool need_shrink_task_buffer, const bool explicit_post_recv, + TaskInitCallback init_callback, void *init_arg, + sf_release_buffer_callback release_buffer_callback) { int result; int bytes; @@ -131,8 +133,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, } if ((result=sf_init_free_queue(sf_context, name, double_buffers, - task_padding_size, task_arg_size, init_callback, - init_arg)) != 0) + need_shrink_task_buffer, task_padding_size, + task_arg_size, init_callback, init_arg)) != 0) { return result; } diff --git a/src/sf_service.h b/src/sf_service.h index eba2c7f..f806806 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -25,6 +25,7 @@ #include "fastcommon/ioevent.h" #include "fastcommon/fast_task_queue.h" #include "sf_types.h" +#include "sf_proto.h" #include "sf_global.h" typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index); @@ -46,8 +47,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - const bool explicit_post_recv, TaskInitCallback init_callback, - void *init_arg, sf_release_buffer_callback release_buffer_callback); + const bool need_shrink_task_buffer, const bool explicit_post_recv, + TaskInitCallback init_callback, void *init_arg, + sf_release_buffer_callback release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -57,7 +59,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \ timeout_callback, net_timeout_ms, proto_header_size, \ - 0, task_arg_size, false, false, NULL, NULL, NULL) + 0, task_arg_size, false, true, false, NULL, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -66,8 +68,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ - net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ - NULL, NULL, NULL) + net_timeout_ms, proto_header_size, 0, task_arg_size, false, true, \ + false, NULL, NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); @@ -162,6 +164,11 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( return NULL; } + if (task->shrinked) { + task->shrinked = false; + sf_proto_init_task_magic(task); + } + __sync_add_and_fetch(&task->reffer_count, reffer_count); __sync_bool_compare_and_swap(&task->canceled, 1, 0); task->handler = handler;