call sf_proto_init_task_magic when task->shrinked
parent
cf0950ea62
commit
3dcc1c570d
|
|
@ -523,6 +523,7 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
{
|
{
|
||||||
const int task_arg_size = 0;
|
const int task_arg_size = 0;
|
||||||
const bool double_buffers = false;
|
const bool double_buffers = false;
|
||||||
|
const bool need_shrink_task_buffer = false;
|
||||||
const bool explicit_post_recv = false;
|
const bool explicit_post_recv = false;
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
|
|
@ -555,8 +556,8 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
|
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
|
||||||
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
|
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
|
||||||
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
|
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
|
||||||
task_arg_size, double_buffers, explicit_post_recv,
|
task_arg_size, double_buffers, need_shrink_task_buffer,
|
||||||
receipt_init_task, pd, NULL);
|
explicit_post_recv, receipt_init_task, pd, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int receipt_handler_init(FCAddressPtrArray *address_array)
|
int receipt_handler_init(FCAddressPtrArray *address_array)
|
||||||
|
|
|
||||||
|
|
@ -269,14 +269,16 @@ static inline int set_read_event(struct fast_task_info *task)
|
||||||
|
|
||||||
int sf_set_read_event(struct fast_task_info *task)
|
int sf_set_read_event(struct fast_task_info *task)
|
||||||
{
|
{
|
||||||
|
/* reset recv offset and length */
|
||||||
|
task->recv.ptr->offset = 0;
|
||||||
|
task->recv.ptr->length = 0;
|
||||||
|
|
||||||
#if IOEVENT_USE_URING
|
#if IOEVENT_USE_URING
|
||||||
if (task->handler->use_io_uring) {
|
if (task->handler->use_io_uring) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
task->recv.ptr->offset = 0;
|
|
||||||
task->recv.ptr->length = 0;
|
|
||||||
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
task->nio_stages.current = SF_NIO_STAGE_RECV;
|
||||||
return set_read_event(task);
|
return set_read_event(task);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@
|
||||||
#include "fastcommon/ioevent_loop.h"
|
#include "fastcommon/ioevent_loop.h"
|
||||||
#include "fastcommon/fc_memory.h"
|
#include "fastcommon/fc_memory.h"
|
||||||
#include "sf_nio.h"
|
#include "sf_nio.h"
|
||||||
|
#include "sf_proto.h"
|
||||||
#include "sf_util.h"
|
#include "sf_util.h"
|
||||||
#include "sf_service.h"
|
#include "sf_service.h"
|
||||||
|
|
||||||
|
|
@ -60,9 +61,9 @@ struct worker_thread_context {
|
||||||
static void *worker_thread_entrance(void *arg);
|
static void *worker_thread_entrance(void *arg);
|
||||||
|
|
||||||
static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
||||||
const bool double_buffers, const int task_padding_size,
|
const bool double_buffers, const bool need_shrink_task_buffer,
|
||||||
const int task_arg_size, TaskInitCallback init_callback,
|
const int task_padding_size, const int task_arg_size,
|
||||||
void *init_arg)
|
TaskInitCallback init_callback, void *init_arg)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int m;
|
int m;
|
||||||
|
|
@ -82,9 +83,9 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
||||||
}
|
}
|
||||||
alloc_conn_once = 256 / m;
|
alloc_conn_once = 256 / m;
|
||||||
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
|
return free_queue_init_ex2(&sf_context->free_queue, name, double_buffers,
|
||||||
sf_context->net_buffer_cfg.max_connections, alloc_conn_once,
|
need_shrink_task_buffer, sf_context->net_buffer_cfg.max_connections,
|
||||||
sf_context->net_buffer_cfg.min_buff_size, sf_context->
|
alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size,
|
||||||
net_buffer_cfg.max_buff_size, task_padding_size,
|
sf_context->net_buffer_cfg.max_buff_size, task_padding_size,
|
||||||
task_arg_size, init_callback, init_arg);
|
task_arg_size, init_callback, init_arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,8 +101,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
||||||
const int proto_header_size, const int task_padding_size,
|
const int proto_header_size, const int task_padding_size,
|
||||||
const int task_arg_size, const bool double_buffers,
|
const int task_arg_size, const bool double_buffers,
|
||||||
const bool explicit_post_recv, TaskInitCallback init_callback,
|
const bool need_shrink_task_buffer, const bool explicit_post_recv,
|
||||||
void *init_arg, sf_release_buffer_callback release_buffer_callback)
|
TaskInitCallback init_callback, void *init_arg,
|
||||||
|
sf_release_buffer_callback release_buffer_callback)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
|
|
@ -131,8 +133,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
|
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
|
||||||
task_padding_size, task_arg_size, init_callback,
|
need_shrink_task_buffer, task_padding_size,
|
||||||
init_arg)) != 0)
|
task_arg_size, init_callback, init_arg)) != 0)
|
||||||
{
|
{
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@
|
||||||
#include "fastcommon/ioevent.h"
|
#include "fastcommon/ioevent.h"
|
||||||
#include "fastcommon/fast_task_queue.h"
|
#include "fastcommon/fast_task_queue.h"
|
||||||
#include "sf_types.h"
|
#include "sf_types.h"
|
||||||
|
#include "sf_proto.h"
|
||||||
#include "sf_global.h"
|
#include "sf_global.h"
|
||||||
|
|
||||||
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
|
typedef void* (*sf_alloc_thread_extra_data_callback)(const int thread_index);
|
||||||
|
|
@ -46,8 +47,9 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
|
||||||
const int proto_header_size, const int task_padding_size,
|
const int proto_header_size, const int task_padding_size,
|
||||||
const int task_arg_size, const bool double_buffers,
|
const int task_arg_size, const bool double_buffers,
|
||||||
const bool explicit_post_recv, TaskInitCallback init_callback,
|
const bool need_shrink_task_buffer, const bool explicit_post_recv,
|
||||||
void *init_arg, sf_release_buffer_callback release_buffer_callback);
|
TaskInitCallback init_callback, void *init_arg,
|
||||||
|
sf_release_buffer_callback release_buffer_callback);
|
||||||
|
|
||||||
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
|
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
|
|
@ -57,7 +59,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
NULL, send_done_callback, deal_func, task_cleanup_func, \
|
NULL, send_done_callback, deal_func, task_cleanup_func, \
|
||||||
timeout_callback, net_timeout_ms, proto_header_size, \
|
timeout_callback, net_timeout_ms, proto_header_size, \
|
||||||
0, task_arg_size, false, false, NULL, NULL, NULL)
|
0, task_arg_size, false, true, false, NULL, NULL, NULL)
|
||||||
|
|
||||||
#define sf_service_init(name, alloc_thread_extra_data_callback, \
|
#define sf_service_init(name, alloc_thread_extra_data_callback, \
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
|
|
@ -66,8 +68,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
|
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
|
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
|
||||||
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
|
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
|
||||||
net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \
|
net_timeout_ms, proto_header_size, 0, task_arg_size, false, true, \
|
||||||
NULL, NULL, NULL)
|
false, NULL, NULL, NULL)
|
||||||
|
|
||||||
int sf_service_destroy_ex(SFContext *sf_context);
|
int sf_service_destroy_ex(SFContext *sf_context);
|
||||||
|
|
||||||
|
|
@ -162,6 +164,11 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (task->shrinked) {
|
||||||
|
task->shrinked = false;
|
||||||
|
sf_proto_init_task_magic(task);
|
||||||
|
}
|
||||||
|
|
||||||
__sync_add_and_fetch(&task->reffer_count, reffer_count);
|
__sync_add_and_fetch(&task->reffer_count, reffer_count);
|
||||||
__sync_bool_compare_and_swap(&task->canceled, 1, 0);
|
__sync_bool_compare_and_swap(&task->canceled, 1, 0);
|
||||||
task->handler = handler;
|
task->handler = handler;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue