support explicit post recv for RDMA

support_rdma
YuQing 2023-09-28 22:20:52 +08:00
parent 60d6b49998
commit 5f5db2b998
7 changed files with 48 additions and 17 deletions

View File

@ -493,6 +493,7 @@ static int do_init(FCAddressPtrArray *address_array)
{ {
const int task_arg_size = 0; const int task_arg_size = 0;
const bool double_buffers = false; const bool double_buffers = false;
const bool explicit_post_recv = false;
int result; int result;
int bytes; int bytes;
SFNetworkHandler *rdma_handler; SFNetworkHandler *rdma_handler;
@ -523,7 +524,8 @@ static int do_init(FCAddressPtrArray *address_array)
NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task,
receipt_task_finish_cleanup, receipt_recv_timeout_callback, receipt_task_finish_cleanup, receipt_recv_timeout_callback,
1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE,
task_arg_size, double_buffers, receipt_init_task, NULL); task_arg_size, double_buffers, explicit_post_recv,
receipt_init_task, NULL);
} }
int receipt_handler_init(FCAddressPtrArray *address_array) int receipt_handler_init(FCAddressPtrArray *address_array)

View File

@ -462,6 +462,7 @@ static int load_rdma_apis(SFNetworkHandler *handler)
LOAD_API(handler, close_connection); LOAD_API(handler, close_connection);
LOAD_API(handler, send_data); LOAD_API(handler, send_data);
LOAD_API(handler, recv_data); LOAD_API(handler, recv_data);
LOAD_API(handler, post_recv);
return 0; return 0;
} }
@ -474,6 +475,7 @@ static int init_network_handler(SFNetworkHandler *handler,
handler->outer.handler = handler; handler->outer.handler = handler;
handler->inner.is_inner = true; handler->inner.is_inner = true;
handler->outer.is_inner = false; handler->outer.is_inner = false;
handler->explicit_post_recv = false;
if (handler->comm_type == fc_comm_type_sock) { if (handler->comm_type == fc_comm_type_sock) {
handler->inner.sock = -1; handler->inner.sock = -1;
@ -486,6 +488,7 @@ static int init_network_handler(SFNetworkHandler *handler,
handler->close_connection = sf_socket_close_connection; handler->close_connection = sf_socket_close_connection;
handler->send_data = sf_socket_send_data; handler->send_data = sf_socket_send_data;
handler->recv_data = sf_socket_recv_data; handler->recv_data = sf_socket_recv_data;
handler->post_recv = NULL;
return 0; return 0;
} else { } else {
handler->inner.id = NULL; handler->inner.id = NULL;

View File

@ -569,14 +569,16 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action)
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, task offset: %d, length: %d, " "client ip: %s, send fail, task offset: %d, length: %d, "
"errno: %d, error info: %s", __LINE__, task->client_ip, "errno: %d, error info: %s", __LINE__, task->client_ip,
task->send.ptr->offset, task->send.ptr->length, errno, strerror(errno)); task->send.ptr->offset, task->send.ptr->length,
errno, strerror(errno));
return -1; return -1;
} }
} else if (bytes == 0) { } else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, task length: %d, offset: %d, " "client ip: %s, sock: %d, task length: %d, offset: %d, "
"send failed, connection disconnected", __LINE__, "send failed, connection disconnected", __LINE__,
task->client_ip, task->event.fd, task->send.ptr->length, task->send.ptr->offset); task->client_ip, task->event.fd, task->send.ptr->length,
task->send.ptr->offset);
return -1; return -1;
} }
@ -623,7 +625,8 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action)
return bytes; return bytes;
} }
ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) ssize_t sf_socket_recv_data(struct fast_task_info *task,
const bool call_post_recv, SFCommAction *action)
{ {
int bytes; int bytes;
int recv_bytes; int recv_bytes;
@ -852,7 +855,9 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
if (task->canceled) { if (task->canceled) {
continue; continue;
} }
if ((bytes=task->handler->recv_data(task, &action)) < 0) { if ((bytes=task->handler->recv_data(task, !task->handler->
explicit_post_recv, &action)) < 0)
{
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
continue; continue;
} }
@ -860,8 +865,13 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
if (action == sf_comm_action_finish) { if (action == sf_comm_action_finish) {
task->req_count++; task->req_count++;
task->nio_stages.current = SF_NIO_STAGE_SEND; task->nio_stages.current = SF_NIO_STAGE_SEND;
if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) {
/* fatal error */
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
} else if (task->handler->explicit_post_recv) {
if (task->handler->post_recv(task) != 0) {
ioevent_add_to_deleted_list(task);
}
} }
} else { } else {
if (calc_iops_and_remove_polling(task) != 0) { if (calc_iops_and_remove_polling(task) != 0) {
@ -926,7 +936,9 @@ int sf_client_sock_read(int sock, short event, void *arg)
&task->event.timer, g_current_time + &task->event.timer, g_current_time +
task->network_timeout); task->network_timeout);
if ((bytes=task->handler->recv_data(task, &action)) < 0) { if ((bytes=task->handler->recv_data(task, !task->handler->
explicit_post_recv, &action)) < 0)
{
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -938,6 +950,11 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} else if (task->handler->explicit_post_recv) {
if (task->handler->post_recv(task) != 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
} }
if (SF_CTX->smart_polling.enabled) { if (SF_CTX->smart_polling.enabled) {

View File

@ -144,7 +144,8 @@ int sf_socket_async_connect_server(struct fast_task_info *task);
int sf_socket_async_connect_check(struct fast_task_info *task); int sf_socket_async_connect_check(struct fast_task_info *task);
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action);
ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_recv_data(struct fast_task_info *task,
const bool call_post_recv, SFCommAction *action);
int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data); int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data);

View File

@ -108,8 +108,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, const bool double_buffers, const int task_arg_size, const bool double_buffers,
TaskInitCallback init_callback, sf_release_buffer_callback const bool explicit_post_recv, TaskInitCallback init_callback,
release_buffer_callback) sf_release_buffer_callback release_buffer_callback)
{ {
int result; int result;
int bytes; int bytes;
@ -130,6 +130,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
set_body_length_func, alloc_recv_buffer_func, set_body_length_func, alloc_recv_buffer_func,
send_done_callback, deal_func, task_cleanup_func, send_done_callback, deal_func, task_cleanup_func,
timeout_callback, release_buffer_callback); timeout_callback, release_buffer_callback);
if (explicit_post_recv) {
sf_context->handlers[SF_RDMACM_NETWORK_HANDLER_INDEX].
explicit_post_recv = true;
}
if ((result=sf_init_free_queue(&sf_context->free_queue, if ((result=sf_init_free_queue(&sf_context->free_queue,
name, double_buffers, task_padding_size, name, double_buffers, task_padding_size,

View File

@ -45,8 +45,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms,
const int proto_header_size, const int task_padding_size, const int proto_header_size, const int task_padding_size,
const int task_arg_size, const bool double_buffers, const int task_arg_size, const bool double_buffers,
TaskInitCallback init_callback, sf_release_buffer_callback const bool explicit_post_recv, TaskInitCallback init_callback,
release_buffer_callback); sf_release_buffer_callback release_buffer_callback);
#define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -56,7 +56,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
NULL, send_done_callback, deal_func, task_cleanup_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \
timeout_callback, net_timeout_ms, proto_header_size, \ timeout_callback, net_timeout_ms, proto_header_size, \
0, task_arg_size, false, NULL, NULL) 0, task_arg_size, false, false, NULL, NULL)
#define sf_service_init(name, alloc_thread_extra_data_callback, \ #define sf_service_init(name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, \ thread_loop_callback, accept_done_callback, set_body_length_func, \
@ -65,7 +65,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \
thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\
send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \
net_timeout_ms, proto_header_size, 0, task_arg_size, false, NULL, NULL) net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \
NULL, NULL)
int sf_service_destroy_ex(SFContext *sf_context); int sf_service_destroy_ex(SFContext *sf_context);

View File

@ -81,7 +81,8 @@ typedef void (*sf_close_connection_callback)(struct fast_task_info *task);
typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task,
SFCommAction *action); SFCommAction *action);
typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task, typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task,
SFCommAction *action); const bool call_post_recv, SFCommAction *action);
typedef int (*sf_post_recv_callback)(struct fast_task_info *task);
struct sf_network_handler; struct sf_network_handler;
typedef struct sf_listener { typedef struct sf_listener {
@ -99,6 +100,7 @@ typedef struct sf_listener {
struct sf_context; struct sf_context;
typedef struct sf_network_handler { typedef struct sf_network_handler {
bool enabled; bool enabled;
bool explicit_post_recv;
FCCommunicationType comm_type; FCCommunicationType comm_type;
struct sf_context *ctx; struct sf_context *ctx;
struct ibv_pd *pd; struct ibv_pd *pd;
@ -123,6 +125,7 @@ typedef struct sf_network_handler {
sf_send_data_callback send_data; sf_send_data_callback send_data;
sf_recv_data_callback recv_data; sf_recv_data_callback recv_data;
sf_post_recv_callback post_recv; //for rdma
} SFNetworkHandler; } SFNetworkHandler;
typedef struct sf_nio_callbacks { typedef struct sf_nio_callbacks {