idempotency support RDMA

support_rdma
YuQing 2023-09-15 10:39:03 +08:00
parent 0eb842dc09
commit 9731e736df
3 changed files with 46 additions and 13 deletions

View File

@ -63,6 +63,15 @@ typedef struct idempotency_receipt_thread_context {
} last_check_times;
} IdempotencyReceiptThreadContext;
typedef struct idempotency_receipt_global_vars {
struct {
int task_padding_size;
sf_init_connection_callback init_connection;
struct ibv_pd *pd;
} rdma;
IdempotencyReceiptThreadContext *thread_contexts;
} IdempotencyReceiptGlobalVars;
#ifdef __cplusplus
extern "C" {
#endif

View File

@ -41,13 +41,22 @@
#include "client_channel.h"
#include "receipt_handler.h"
static IdempotencyReceiptThreadContext *receipt_thread_contexts = NULL;
static IdempotencyReceiptGlobalVars receipt_global_vars;
#define RECEIPT_THREAD_CONTEXTS receipt_global_vars.thread_contexts
#define TASK_PADDING_SIZE receipt_global_vars.rdma.task_padding_size
#define RDMA_INIT_CONNECTION receipt_global_vars.rdma.init_connection
#define RDMA_PD receipt_global_vars.rdma.pd
static int receipt_init_task(struct fast_task_info *task)
{
task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side
task->network_timeout = SF_G_NETWORK_TIMEOUT;
return 0;
if (RDMA_INIT_CONNECTION != NULL) {
return RDMA_INIT_CONNECTION(task, RDMA_PD);
} else {
return 0;
}
}
static int receipt_recv_timeout_callback(struct fast_task_info *task)
@ -83,8 +92,7 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
if (task->event.fd >= 0) {
sf_task_detach_thread(task);
close(task->event.fd);
task->event.fd = -1;
task->handler->close_connection(task);
}
task->length = 0;
@ -474,35 +482,51 @@ static void *receipt_alloc_thread_extra_data(const int thread_index)
{
IdempotencyReceiptThreadContext *ctx;
ctx = receipt_thread_contexts + thread_index;
ctx = RECEIPT_THREAD_CONTEXTS + thread_index;
FC_INIT_LIST_HEAD(&ctx->head);
return ctx;
}
static int do_init()
static int do_init(FCAddressPtrArray *address_array)
{
int result;
int bytes;
SFNetworkHandler *rdma_handler;
bytes = sizeof(IdempotencyReceiptThreadContext) * SF_G_WORK_THREADS;
receipt_thread_contexts = (IdempotencyReceiptThreadContext *)
RECEIPT_THREAD_CONTEXTS = (IdempotencyReceiptThreadContext *)
fc_malloc(bytes);
if (receipt_thread_contexts == NULL) {
if (RECEIPT_THREAD_CONTEXTS == NULL) {
return ENOMEM;
}
memset(receipt_thread_contexts, 0, bytes);
memset(RECEIPT_THREAD_CONTEXTS, 0, bytes);
if ((rdma_handler=sf_get_rdma_network_handler(&g_sf_context)) != NULL) {
if ((result=sf_alloc_rdma_pd(&g_sf_context, address_array)) != 0) {
return result;
}
TASK_PADDING_SIZE = rdma_handler->get_connection_size();
RDMA_INIT_CONNECTION = rdma_handler->init_connection;
RDMA_PD = rdma_handler->pd;
} else {
TASK_PADDING_SIZE = 0;
RDMA_INIT_CONNECTION = NULL;
RDMA_PD = NULL;
}
return sf_service_init_ex2(&g_sf_context, "idemp-receipt",
receipt_alloc_thread_extra_data, receipt_thread_loop_callback,
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), 0, 0, receipt_init_task, NULL);
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, 0,
receipt_init_task, NULL);
}
int receipt_handler_init()
int receipt_handler_init(FCAddressPtrArray *address_array)
{
int result;
if ((result=do_init()) != 0) {
if ((result=do_init(address_array)) != 0) {
return result;
}

View File

@ -24,7 +24,7 @@
extern "C" {
#endif
int receipt_handler_init();
int receipt_handler_init(FCAddressPtrArray *address_array);
int receipt_handler_destroy();
#ifdef __cplusplus