diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h index 0ba8a7f..238ce51 100644 --- a/src/idempotency/client/client_types.h +++ b/src/idempotency/client/client_types.h @@ -67,7 +67,6 @@ typedef struct idempotency_receipt_global_vars { struct { int task_padding_size; sf_init_connection_callback init_connection; - struct ibv_pd *pd; } rdma; IdempotencyReceiptThreadContext *thread_contexts; } IdempotencyReceiptGlobalVars; diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index b013cd2..0a44d01 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -46,12 +46,11 @@ static IdempotencyReceiptGlobalVars receipt_global_vars; #define RECEIPT_THREAD_CONTEXTS receipt_global_vars.thread_contexts #define TASK_PADDING_SIZE receipt_global_vars.rdma.task_padding_size #define RDMA_INIT_CONNECTION receipt_global_vars.rdma.init_connection -#define RDMA_PD receipt_global_vars.rdma.pd -static int receipt_init_task(struct fast_task_info *task) +static int receipt_init_task(struct fast_task_info *task, void *arg) { if (RDMA_INIT_CONNECTION != NULL) { - return RDMA_INIT_CONNECTION(task, RDMA_PD); + return RDMA_INIT_CONNECTION(task, arg); } else { return 0; } @@ -525,6 +524,7 @@ static int do_init(FCAddressPtrArray *address_array) int result; int bytes; SFNetworkHandler *rdma_handler; + struct ibv_pd *pd; bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS; RECEIPT_THREAD_CONTEXTS = (IdempotencyReceiptThreadContext *) @@ -541,11 +541,11 @@ static int do_init(FCAddressPtrArray *address_array) TASK_PADDING_SIZE = rdma_handler->get_connection_size(); RDMA_INIT_CONNECTION = rdma_handler->init_connection; - RDMA_PD = rdma_handler->pd; + pd = rdma_handler->pd; } else { TASK_PADDING_SIZE = 0; RDMA_INIT_CONNECTION = NULL; - RDMA_PD = NULL; + pd = NULL; } return sf_service_init_ex2(&g_sf_context, "idemp-receipt", receipt_alloc_thread_extra_data, receipt_thread_loop_callback, @@ -553,7 +553,7 @@ static int do_init(FCAddressPtrArray *address_array) receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, task_arg_size, double_buffers, explicit_post_recv, - receipt_init_task, NULL); + receipt_init_task, pd, NULL); } int receipt_handler_init(FCAddressPtrArray *address_array) diff --git a/src/sf_service.c b/src/sf_service.c index ae52739..d89a754 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -62,7 +62,8 @@ static void *worker_thread_entrance(void *arg); static int sf_init_free_queue(SFContext *sf_context, const char *name, const bool double_buffers, const int task_padding_size, - const int task_arg_size, TaskInitCallback init_callback) + const int task_arg_size, TaskInitCallback init_callback, + void *init_arg) { int result; int m; @@ -85,7 +86,7 @@ static int sf_init_free_queue(SFContext *sf_context, const char *name, sf_context->net_buffer_cfg.max_connections, alloc_conn_once, sf_context->net_buffer_cfg.min_buff_size, sf_context-> net_buffer_cfg.max_buff_size, task_padding_size, - task_arg_size, init_callback); + task_arg_size, init_callback, init_arg); } int sf_service_init_ex2(SFContext *sf_context, const char *name, @@ -101,7 +102,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, const bool explicit_post_recv, TaskInitCallback init_callback, - sf_release_buffer_callback release_buffer_callback) + void *init_arg, sf_release_buffer_callback release_buffer_callback) { int result; int bytes; @@ -131,7 +132,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, } if ((result=sf_init_free_queue(sf_context, name, double_buffers, - task_padding_size, task_arg_size, init_callback)) != 0) + task_padding_size, task_arg_size, init_callback, + init_arg)) != 0) { return result; } diff --git a/src/sf_service.h b/src/sf_service.h index 136fe1d..0980328 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -46,7 +46,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, const bool explicit_post_recv, TaskInitCallback init_callback, - sf_release_buffer_callback release_buffer_callback); + void *init_arg, sf_release_buffer_callback release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -56,7 +56,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \ timeout_callback, net_timeout_ms, proto_header_size, \ - 0, task_arg_size, false, false, NULL, NULL) + 0, task_arg_size, false, false, NULL, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -66,7 +66,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ - NULL, NULL) + NULL, NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context);