diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 6adcf19..b98a8fa 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -493,6 +493,7 @@ static int do_init(FCAddressPtrArray *address_array) { const int task_arg_size = 0; const bool double_buffers = false; + const bool explicit_post_recv = false; int result; int bytes; SFNetworkHandler *rdma_handler; @@ -523,7 +524,8 @@ static int do_init(FCAddressPtrArray *address_array) NULL, sf_proto_set_body_length, NULL, NULL, receipt_deal_task, receipt_task_finish_cleanup, receipt_recv_timeout_callback, 1000, sizeof(SFCommonProtoHeader), TASK_PADDING_SIZE, - task_arg_size, double_buffers, receipt_init_task, NULL); + task_arg_size, double_buffers, explicit_post_recv, + receipt_init_task, NULL); } int receipt_handler_init(FCAddressPtrArray *address_array) diff --git a/src/sf_global.c b/src/sf_global.c index f2b800c..018f61b 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -462,6 +462,7 @@ static int load_rdma_apis(SFNetworkHandler *handler) LOAD_API(handler, close_connection); LOAD_API(handler, send_data); LOAD_API(handler, recv_data); + LOAD_API(handler, post_recv); return 0; } @@ -474,6 +475,7 @@ static int init_network_handler(SFNetworkHandler *handler, handler->outer.handler = handler; handler->inner.is_inner = true; handler->outer.is_inner = false; + handler->explicit_post_recv = false; if (handler->comm_type == fc_comm_type_sock) { handler->inner.sock = -1; @@ -486,6 +488,7 @@ static int init_network_handler(SFNetworkHandler *handler, handler->close_connection = sf_socket_close_connection; handler->send_data = sf_socket_send_data; handler->recv_data = sf_socket_recv_data; + handler->post_recv = NULL; return 0; } else { handler->inner.id = NULL; diff --git a/src/sf_nio.c b/src/sf_nio.c index 306e4ad..21bcef0 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -569,14 +569,16 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) logWarning("file: "__FILE__", line: %d, " "client ip: %s, send fail, task offset: %d, length: %d, " "errno: %d, error info: %s", __LINE__, task->client_ip, - task->send.ptr->offset, task->send.ptr->length, errno, strerror(errno)); + task->send.ptr->offset, task->send.ptr->length, + errno, strerror(errno)); return -1; } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, task length: %d, offset: %d, " "send failed, connection disconnected", __LINE__, - task->client_ip, task->event.fd, task->send.ptr->length, task->send.ptr->offset); + task->client_ip, task->event.fd, task->send.ptr->length, + task->send.ptr->offset); return -1; } @@ -623,7 +625,8 @@ ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action) return bytes; } -ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) +ssize_t sf_socket_recv_data(struct fast_task_info *task, + const bool call_post_recv, SFCommAction *action) { int bytes; int recv_bytes; @@ -852,7 +855,9 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) if (task->canceled) { continue; } - if ((bytes=task->handler->recv_data(task, &action)) < 0) { + if ((bytes=task->handler->recv_data(task, !task->handler-> + explicit_post_recv, &action)) < 0) + { ioevent_add_to_deleted_list(task); continue; } @@ -860,8 +865,13 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) if (action == sf_comm_action_finish) { task->req_count++; task->nio_stages.current = SF_NIO_STAGE_SEND; - if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error + if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { + /* fatal error */ ioevent_add_to_deleted_list(task); + } else if (task->handler->explicit_post_recv) { + if (task->handler->post_recv(task) != 0) { + ioevent_add_to_deleted_list(task); + } } } else { if (calc_iops_and_remove_polling(task) != 0) { @@ -926,7 +936,9 @@ int sf_client_sock_read(int sock, short event, void *arg) &task->event.timer, g_current_time + task->network_timeout); - if ((bytes=task->handler->recv_data(task, &action)) < 0) { + if ((bytes=task->handler->recv_data(task, !task->handler-> + explicit_post_recv, &action)) < 0) + { ioevent_add_to_deleted_list(task); return -1; } @@ -938,6 +950,11 @@ int sf_client_sock_read(int sock, short event, void *arg) if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error ioevent_add_to_deleted_list(task); return -1; + } else if (task->handler->explicit_post_recv) { + if (task->handler->post_recv(task) != 0) { + ioevent_add_to_deleted_list(task); + return -1; + } } if (SF_CTX->smart_polling.enabled) { diff --git a/src/sf_nio.h b/src/sf_nio.h index 1822842..14a80f5 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -144,7 +144,8 @@ int sf_socket_async_connect_server(struct fast_task_info *task); int sf_socket_async_connect_check(struct fast_task_info *task); ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); -ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); +ssize_t sf_socket_recv_data(struct fast_task_info *task, + const bool call_post_recv, SFCommAction *action); int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data); diff --git a/src/sf_service.c b/src/sf_service.c index ab931f5..a7cbe2e 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -108,8 +108,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - TaskInitCallback init_callback, sf_release_buffer_callback - release_buffer_callback) + const bool explicit_post_recv, TaskInitCallback init_callback, + sf_release_buffer_callback release_buffer_callback) { int result; int bytes; @@ -130,6 +130,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, set_body_length_func, alloc_recv_buffer_func, send_done_callback, deal_func, task_cleanup_func, timeout_callback, release_buffer_callback); + if (explicit_post_recv) { + sf_context->handlers[SF_RDMACM_NETWORK_HANDLER_INDEX]. + explicit_post_recv = true; + } if ((result=sf_init_free_queue(&sf_context->free_queue, name, double_buffers, task_padding_size, diff --git a/src/sf_service.h b/src/sf_service.h index 6b74e05..b0d3eb1 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -45,8 +45,8 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, const bool double_buffers, - TaskInitCallback init_callback, sf_release_buffer_callback - release_buffer_callback); + const bool explicit_post_recv, TaskInitCallback init_callback, + sf_release_buffer_callback release_buffer_callback); #define sf_service_init_ex(sf_context, name, alloc_thread_extra_data_callback,\ thread_loop_callback, accept_done_callback, set_body_length_func, \ @@ -56,16 +56,17 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, thread_loop_callback, accept_done_callback, set_body_length_func, \ NULL, send_done_callback, deal_func, task_cleanup_func, \ timeout_callback, net_timeout_ms, proto_header_size, \ - 0, task_arg_size, false, NULL, NULL) + 0, task_arg_size, false, false, NULL, NULL) #define sf_service_init(name, alloc_thread_extra_data_callback, \ thread_loop_callback, accept_done_callback, set_body_length_func, \ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ net_timeout_ms, proto_header_size, task_arg_size) \ - sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ - thread_loop_callback, accept_done_callback, set_body_length_func, NULL, \ + sf_service_init_ex2(&g_sf_context, name, alloc_thread_extra_data_callback, \ + thread_loop_callback, accept_done_callback, set_body_length_func, NULL,\ send_done_callback, deal_func, task_cleanup_func, timeout_callback, \ - net_timeout_ms, proto_header_size, 0, task_arg_size, false, NULL, NULL) + net_timeout_ms, proto_header_size, 0, task_arg_size, false, false, \ + NULL, NULL) int sf_service_destroy_ex(SFContext *sf_context); diff --git a/src/sf_types.h b/src/sf_types.h index 7230c56..ef52625 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -81,7 +81,8 @@ typedef void (*sf_close_connection_callback)(struct fast_task_info *task); typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, SFCommAction *action); typedef ssize_t (*sf_recv_data_callback)(struct fast_task_info *task, - SFCommAction *action); + const bool call_post_recv, SFCommAction *action); +typedef int (*sf_post_recv_callback)(struct fast_task_info *task); struct sf_network_handler; typedef struct sf_listener { @@ -99,6 +100,7 @@ typedef struct sf_listener { struct sf_context; typedef struct sf_network_handler { bool enabled; + bool explicit_post_recv; FCCommunicationType comm_type; struct sf_context *ctx; struct ibv_pd *pd; @@ -123,6 +125,7 @@ typedef struct sf_network_handler { sf_send_data_callback send_data; sf_recv_data_callback recv_data; + sf_post_recv_callback post_recv; //for rdma } SFNetworkHandler; typedef struct sf_nio_callbacks {