task init callback support extra argument
parent
a01ccf66dc
commit
14d0a1c014
|
|
@ -67,7 +67,6 @@ typedef struct idempotency_receipt_global_vars {
|
||||||
struct {
|
struct {
|
||||||
int task_padding_size;
|
int task_padding_size;
|
||||||
sf_init_connection_callback init_connection;
|
sf_init_connection_callback init_connection;
|
||||||
struct ibv_pd *pd;
|
|
||||||
} rdma;
|
} rdma;
|
||||||
IdempotencyReceiptThreadContext *thread_contexts;
|
IdempotencyReceiptThreadContext *thread_contexts;
|
||||||
} IdempotencyReceiptGlobalVars;
|
} IdempotencyReceiptGlobalVars;
|
||||||
|
|
|
||||||
|
|
@ -46,12 +46,11 @@ static IdempotencyReceiptGlobalVars receipt_global_vars;
|
||||||
#define RECEIPT_THREAD_CONTEXTS receipt_global_vars.thread_contexts
|
#define RECEIPT_THREAD_CONTEXTS receipt_global_vars.thread_contexts
|
||||||
#define TASK_PADDING_SIZE receipt_global_vars.rdma.task_padding_size
|
#define TASK_PADDING_SIZE receipt_global_vars.rdma.task_padding_size
|
||||||
#define RDMA_INIT_CONNECTION receipt_global_vars.rdma.init_connection
|
#define RDMA_INIT_CONNECTION receipt_global_vars.rdma.init_connection
|
||||||
#define RDMA_PD receipt_global_vars.rdma.pd
|
|
||||||
|
|
||||||
static int receipt_init_task(struct fast_task_info *task)
|
static int receipt_init_task(struct fast_task_info *task, void *arg)
|
||||||
{
|
{
|
||||||
if (RDMA_INIT_CONNECTION != NULL) {
|
if (RDMA_INIT_CONNECTION != NULL) {
|
||||||
return RDMA_INIT_CONNECTION(task, RDMA_PD);
|
return RDMA_INIT_CONNECTION(task, arg);
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -525,6 +524,7 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
SFNetworkHandler *rdma_handler;
|
SFNetworkHandler *rdma_handler;
|
||||||
|
struct ibv_pd *pd;
|
||||||
|
|
||||||
bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS;
|
bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS;
|
||||||
RECEIPT_THREAD_CONTEXTS = (IdempotencyReceiptThreadContext *)
|
RECEIPT_THREAD_CONTEXTS = (IdempotencyReceiptThreadContext *)
|
||||||
|
|
@ -541,11 +541,11 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
|
|
||||||
TASK_PADDING_SIZE = rdma_handler->get_connection_size();
|
TASK_PADDING_SIZE = rdma_handler->get_connection_size();
|
||||||
RDMA_INIT_CONNECTION = rdma_handler->init_connection;
|
RDMA_INIT_CONNECTION = rdma_handler->init_connection;
|
||||||
RDMA_PD = rdma_handler->pd;
|
pd = rdma_handler->pd;
|
||||||
} else {
|
} else {
|
||||||
TASK_PADDING_SIZE = 0;
|
TASK_PADDING_SIZE = 0;
|
||||||
RDMA_INIT_CONNECTION = NULL;
|
RDMA_INIT_CONNECTION = NULL;
|
||||||
RDMA_PD = NULL;
|
pd = NULL;
|
||||||
}
|
}
|
||||||
return sf_service_init_ex2(&g_sf_context, "idemp-receipt",
|
return sf_service_init_ex2(&g_sf_context, "idemp-receipt",
|
||||||
receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
|
receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
|
||||||
|
|
@ -553,7 +553,7 @@ static int do_init(FCAddressPtrArray *address_array)
|
||||||
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
|
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
|
||||||
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
|
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
|
||||||
task_arg_size, double_buffers, explicit_post_recv,
|
task_arg_size, double_buffers, explicit_post_recv,
|
||||||
receipt_init_task, NULL);
|
receipt_init_task, pd, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int receipt_handler_init(FCAddressPtrArray *address_array)
|
int receipt_handler_init(FCAddressPtrArray *address_array)
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,8 @@ static void *worker_thread_entrance(void *arg);
|
||||||
|
|
||||||
static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
||||||
const bool double_buffers, const int task_padding_size,
|
const bool double_buffers, const int task_padding_size,
|
||||||
const int task_arg_size, TaskInitCallback init_callback)
|
const int task_arg_size, TaskInitCallback init_callback,
|
||||||
|
void *init_arg)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int m;
|
int m;
|
||||||
|
|
@ -85,7 +86,7 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name,
|
||||||
sf_context->net_buffer_cfg.max_connections, alloc_conn_once,
|
sf_context->net_buffer_cfg.max_connections, alloc_conn_once,
|
||||||
sf_context->net_buffer_cfg.min_buff_size, sf_context->
|
sf_context->net_buffer_cfg.min_buff_size, sf_context->
|
||||||
net_buffer_cfg.max_buff_size, task_padding_size,
|
net_buffer_cfg.max_buff_size, task_padding_size,
|
||||||
task_arg_size, init_callback);
|
task_arg_size, init_callback, init_arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
|
|
@ -101,7 +102,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
const int proto_header_size, const int task_padding_size,
|
const int proto_header_size, const int task_padding_size,
|
||||||
const int task_arg_size, const bool double_buffers,
|
const int task_arg_size, const bool double_buffers,
|
||||||
const bool explicit_post_recv, TaskInitCallback init_callback,
|
const bool explicit_post_recv, TaskInitCallback init_callback,
|
||||||
sf_release_buffer_callback release_buffer_callback)
|
void *init_arg, sf_release_buffer_callback release_buffer_callback)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
int bytes;
|
int bytes;
|
||||||
|
|
@ -131,7 +132,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
|
if ((result=sf_init_free_queue(sf_context, name, double_buffers,
|
||||||
task_padding_size, task_arg_size, init_callback)) != 0)
|
task_padding_size, task_arg_size, init_callback,
|
||||||
|
init_arg)) != 0)
|
||||||
{
|
{
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
const int proto_header_size, const int task_padding_size,
|
const int proto_header_size, const int task_padding_size,
|
||||||
const int task_arg_size, const bool double_buffers,
|
const int task_arg_size, const bool double_buffers,
|
||||||
const bool explicit_post_recv, TaskInitCallback init_callback,
|
const bool explicit_post_recv, TaskInitCallback init_callback,
|
||||||
sf_release_buffer_callback release_buffer_callback);
|
void *init_arg, sf_release_buffer_callback release_buffer_callback);
|
||||||
|
|
||||||
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
|
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
|
|
@ -56,7 +56,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
NULL, send_done_callback, deal_func, task_cleanup_func, \
|
NULL, send_done_callback, deal_func, task_cleanup_func, \
|
||||||
timeout_callback, net_timeout_ms, proto_header_size, \
|
timeout_callback, net_timeout_ms, proto_header_size, \
|
||||||
0, task_arg_size, false, false, NULL, NULL)
|
0, task_arg_size, false, false, NULL, NULL, NULL)
|
||||||
|
|
||||||
#define sf_service_init(name, alloc_thread_extra_data_callback, \
|
#define sf_service_init(name, alloc_thread_extra_data_callback, \
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
thread_loop_callback, accept_done_callback, set_body_length_func, \
|
||||||
|
|
@ -66,7 +66,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
|
||||||
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
|
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
|
||||||
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
|
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
|
||||||
net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \
|
net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \
|
||||||
NULL, NULL)
|
NULL, NULL, NULL)
|
||||||
|
|
||||||
int sf_service_destroy_ex(SFContext *sf_context);
|
int sf_service_destroy_ex(SFContext *sf_context);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue