From 9731e736dfe4145ee347411f057b16d2ddaeb527 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 15 Sep 2023 10:39:03 +0800 Subject: [PATCH] idempotency support RDMA --- src/idempotency/client/client_types.h | 9 +++++ src/idempotency/client/receipt_handler.c | 48 ++++++++++++++++++------ src/idempotency/client/receipt_handler.h | 2 +- 3 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/idempotency/client/client_types.h b/src/idempotency/client/client_types.h index 59fd063..0ba8a7f 100644 --- a/src/idempotency/client/client_types.h +++ b/src/idempotency/client/client_types.h @@ -63,6 +63,15 @@ typedef struct idempotency_receipt_thread_context { } last_check_times; } IdempotencyReceiptThreadContext; +typedef struct idempotency_receipt_global_vars { + struct { + int task_padding_size; + sf_init_connection_callback init_connection; + struct ibv_pd *pd; + } rdma; + IdempotencyReceiptThreadContext *thread_contexts; +} IdempotencyReceiptGlobalVars; + #ifdef __cplusplus extern "C" { #endif diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 4ab44cb..78a5d3d 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -41,13 +41,22 @@ #include "client_channel.h" #include "receipt_handler.h" -static IdempotencyReceiptThreadContext *receipt_thread_contexts = NULL; +static IdempotencyReceiptGlobalVars receipt_global_vars; + +#define RECEIPT_THREAD_CONTEXTS receipt_global_vars.thread_contexts +#define TASK_PADDING_SIZE receipt_global_vars.rdma.task_padding_size +#define RDMA_INIT_CONNECTION receipt_global_vars.rdma.init_connection +#define RDMA_PD receipt_global_vars.rdma.pd static int receipt_init_task(struct fast_task_info *task) { task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side task->network_timeout = SF_G_NETWORK_TIMEOUT; - return 0; + if (RDMA_INIT_CONNECTION != NULL) { + return RDMA_INIT_CONNECTION(task, RDMA_PD); + } else { + return 0; + } } static int receipt_recv_timeout_callback(struct fast_task_info *task) @@ -83,8 +92,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) if (task->event.fd >= 0) { sf_task_detach_thread(task); - close(task->event.fd); - task->event.fd = -1; + task->handler->close_connection(task); } task->length = 0; @@ -474,35 +482,51 @@ static void *receipt_alloc_thread_extra_data(const int thread_index) { IdempotencyReceiptThreadContext *ctx; - ctx = receipt_thread_contexts + thread_index; + ctx = RECEIPT_THREAD_CONTEXTS + thread_index; FC_INIT_LIST_HEAD(&ctx->head); return ctx; } -static int do_init() +static int do_init(FCAddressPtrArray *address_array) { + int result; int bytes; + SFNetworkHandler *rdma_handler; bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS; - receipt_thread_contexts = (IdempotencyReceiptThreadContext *) + RECEIPT_THREAD_CONTEXTS = (IdempotencyReceiptThreadContext *) fc_malloc(bytes); - if (receipt_thread_contexts == NULL) { + if (RECEIPT_THREAD_CONTEXTS == NULL) { return ENOMEM; } - memset(receipt_thread_contexts, 0, bytes); + memset(RECEIPT_THREAD_CONTEXTS, 0, bytes); + if ((rdma_handler=sf_get_rdma_network_handler(&g_sf_context)) != NULL) { + if ((result=sf_alloc_rdma_pd(&g_sf_context, address_array)) != 0) { + return result; + } + + TASK_PADDING_SIZE = rdma_handler->get_connection_size(); + RDMA_INIT_CONNECTION = rdma_handler->init_connection; + RDMA_PD = rdma_handler->pd; + } else { + TASK_PADDING_SIZE = 0; + RDMA_INIT_CONNECTION = NULL; + RDMA_PD = NULL; + } return sf_service_init_ex2(&g_sf_context, "idemp-receipt", receipt_alloc_thread_extra_data, receipt_thread_loop_callback, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, - 1000, sizeof(SFCommonProtoHeader), 0, 0, receipt_init_task, NULL); + 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, 0, + receipt_init_task, NULL); } -int receipt_handler_init() +int receipt_handler_init(FCAddressPtrArray *address_array) { int result; - if ((result=do_init()) != 0) { + if ((result=do_init(address_array)) != 0) { return result; } diff --git a/src/idempotency/client/receipt_handler.h b/src/idempotency/client/receipt_handler.h index 953f614..7ea2dbe 100644 --- a/src/idempotency/client/receipt_handler.h +++ b/src/idempotency/client/receipt_handler.h @@ -24,7 +24,7 @@ extern "C" { #endif -int receipt_handler_init(); +int receipt_handler_init(FCAddressPtrArray *address_array); int receipt_handler_destroy(); #ifdef __cplusplus