From be9b71422ff1e97a70dcd8db187b8d70bbeff995 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 22 Sep 2023 18:27:12 +0800 Subject: [PATCH] nio support callback connect_done for client --- src/sf_global.c | 9 ++-- src/sf_nio.c | 123 +++++++++++++++++++++++++++-------------------- src/sf_nio.h | 39 +++++++++------ src/sf_service.c | 11 +++-- src/sf_service.h | 12 ++++- src/sf_types.h | 31 +++++++----- 6 files changed, 137 insertions(+), 88 deletions(-) diff --git a/src/sf_global.c b/src/sf_global.c index d69c770..f2b800c 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -47,9 +47,8 @@ SFGlobalVariables g_sf_global_vars = { SFContext g_sf_context = {{'\0'}, NULL, 0, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, - 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, - {false, 0, 0}, NULL, NULL, NULL, NULL, NULL, - sf_task_finish_clean_up, NULL + 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, true, + {false, 0, 0}, {sf_task_finish_clean_up} }; static inline void set_config_str_value(const char *value, @@ -459,7 +458,7 @@ static int load_rdma_apis(SFNetworkHandler *handler) LOAD_API_EX(handler, , close_server); LOAD_API(handler, accept_connection); LOAD_API_EX(handler, , async_connect_server); - LOAD_API_EX(handler, , connect_server_done); + LOAD_API_EX(handler, , async_connect_check); LOAD_API(handler, close_connection); LOAD_API(handler, send_data); LOAD_API(handler, recv_data); @@ -483,7 +482,7 @@ static int init_network_handler(SFNetworkHandler *handler, handler->close_server = sf_socket_close_server; handler->accept_connection = sf_socket_accept_connection; handler->async_connect_server = sf_socket_async_connect_server; - handler->connect_server_done = sf_socket_connect_server_done; + handler->async_connect_check = sf_socket_async_connect_check; handler->close_connection = sf_socket_close_connection; handler->send_data = sf_socket_send_data; handler->recv_data = sf_socket_recv_data; diff --git a/src/sf_nio.c b/src/sf_nio.c index d3e1b2b..3fec70f 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -45,18 +45,18 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback) { sf_context->header_size = header_size; - sf_context->set_body_length = set_body_length_func; - sf_context->alloc_recv_buffer = alloc_recv_buffer_func; - sf_context->send_done_callback = send_done_callback; - sf_context->deal_task = deal_func; - sf_context->task_cleanup_func = cleanup_func; - sf_context->timeout_callback = timeout_callback; - sf_context->release_buffer_callback = release_buffer_callback; + sf_context->callbacks.set_body_length = set_body_length_func; + sf_context->callbacks.alloc_recv_buffer = alloc_recv_buffer_func; + sf_context->callbacks.send_done = send_done_callback; + sf_context->callbacks.deal_task = deal_func; + sf_context->callbacks.task_cleanup = cleanup_func; + sf_context->callbacks.task_timeout = timeout_callback; + sf_context->callbacks.release_buffer = release_buffer_callback; } void sf_task_detach_thread(struct fast_task_info *task) @@ -84,8 +84,8 @@ void sf_task_switch_thread(struct fast_task_info *task, static inline void release_iovec_buffer(struct fast_task_info *task) { if (task->iovec_array.iovs != NULL) { - if (SF_CTX->release_buffer_callback != NULL) { - SF_CTX->release_buffer_callback(task); + if (SF_CTX->callbacks.release_buffer != NULL) { + SF_CTX->callbacks.release_buffer(task); } task->iovec_array.iovs = NULL; task->iovec_array.count = 0; @@ -189,7 +189,7 @@ static inline int sf_nio_init(struct fast_task_info *task) task->network_timeout); } -int sf_socket_connect_server_done(struct fast_task_info *task) +int sf_socket_async_connect_check(struct fast_task_info *task) { int result; socklen_t len; @@ -201,34 +201,46 @@ int sf_socket_connect_server_done(struct fast_task_info *task) return result; } -static int sf_client_sock_connect(int sock, short event, void *arg) +static int sf_client_connect_done(int sock, short event, void *arg) { int result; struct fast_task_info *task; task = (struct fast_task_info *)arg; + if (task->canceled) { + return ENOTCONN; + } + if (event & IOEVENT_TIMEOUT) { result = ETIMEDOUT; } else { - result = task->handler->connect_server_done(task); + result = task->handler->async_connect_check(task); if (result == EINPROGRESS) { return 0; } } + if (SF_CTX->callbacks.connect_done != NULL) { + SF_CTX->callbacks.connect_done(task, result); + } + if (result != 0) { - logError("file: "__FILE__", line: %d, " - "connect to server %s:%u fail, errno: %d, " - "error info: %s", __LINE__, task->server_ip, - task->port, result, STRERROR(result)); + if (SF_CTX->connect_need_log) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%u fail, errno: %d, " + "error info: %s", __LINE__, task->server_ip, + task->port, result, STRERROR(result)); + } ioevent_add_to_deleted_list(task); return -1; } - logInfo("file: "__FILE__", line: %d, " - "connect to server %s:%u successfully", - __LINE__, task->server_ip, task->port); - return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); + if (SF_CTX->connect_need_log) { + logInfo("file: "__FILE__", line: %d, " + "connect to server %s:%u successfully", + __LINE__, task->server_ip, task->port); + } + return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE); } int sf_socket_async_connect_server(struct fast_task_info *task) @@ -248,30 +260,39 @@ static int sf_async_connect_server(struct fast_task_info *task) { int result; - result = task->handler->async_connect_server(task); - if (result == 0) { - if ((result=sf_ioevent_add(task, (IOEventCallback) - sf_client_sock_read, task->network_timeout)) != 0) - { - return result; - } - - logInfo("file: "__FILE__", line: %d, " - "connect to server %s:%u successfully", - __LINE__, task->server_ip, task->port); - return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); - } else if (result == EINPROGRESS) { + if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) { result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) - sf_client_sock_connect, task->connect_timeout); + sf_client_connect_done, task->connect_timeout); return result > 0 ? -1 * result : result; } else { - task->handler->close_connection(task); - logError("file: "__FILE__", line: %d, " - "connect to server %s:%u fail, errno: %d, " - "error info: %s", __LINE__, task->server_ip, - task->port, result, STRERROR(result)); - return result > 0 ? -1 * result : result; + if (SF_CTX->callbacks.connect_done != NULL) { + SF_CTX->callbacks.connect_done(task, result); + } + + if (result == 0) { + if ((result=sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_read, task->network_timeout)) != 0) + { + return result; + } + + if (SF_CTX->connect_need_log) { + logInfo("file: "__FILE__", line: %d, " + "connect to server %s:%u successfully", + __LINE__, task->server_ip, task->port); + } + return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE); + } else { + task->handler->close_connection(task); + if (SF_CTX->connect_need_log) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%u fail, errno: %d, " + "error info: %s", __LINE__, task->server_ip, + task->port, result, STRERROR(result)); + } + return result > 0 ? -1 * result : result; + } } } @@ -300,14 +321,14 @@ static int sf_nio_deal_task(struct fast_task_info *task, const int stage) result = sf_send_add_event(task); break; case SF_NIO_STAGE_CONTINUE: //continue deal - result = SF_CTX->deal_task(task, SF_NIO_STAGE_CONTINUE); + result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_CONTINUE); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread if ((result=sf_ioevent_add(task, (IOEventCallback) sf_client_sock_read, task->network_timeout)) == 0) { - result = SF_CTX->deal_task(task, SF_NIO_STAGE_SEND); + result = SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND); } break; case SF_NIO_STAGE_CLOSE: @@ -665,8 +686,8 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) return -1; } - if (SF_CTX->alloc_recv_buffer != NULL) { - task->recv_body = SF_CTX->alloc_recv_buffer(task, + if (SF_CTX->callbacks.alloc_recv_buffer != NULL) { + task->recv_body = SF_CTX->callbacks.alloc_recv_buffer(task, task->length - SF_CTX->header_size, &new_alloc); if (new_alloc && task->recv_body == NULL) { return -1; @@ -823,7 +844,7 @@ int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) if (action == sf_comm_action_finish) { task->req_count++; task->nio_stages.current = SF_NIO_STAGE_SEND; - if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error + if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error ioevent_add_to_deleted_list(task); } } else { @@ -851,8 +872,8 @@ int sf_client_sock_read(int sock, short event, void *arg) if (event & IOEVENT_TIMEOUT) { if (task->offset == 0 && task->req_count > 0) { - if (SF_CTX->timeout_callback != NULL) { - if (SF_CTX->timeout_callback(task) != 0) { + if (SF_CTX->callbacks.task_timeout != NULL) { + if (SF_CTX->callbacks.task_timeout(task) != 0) { ioevent_add_to_deleted_list(task); return -1; } @@ -898,7 +919,7 @@ int sf_client_sock_read(int sock, short event, void *arg) if (action == sf_comm_action_finish) { task->req_count++; task->nio_stages.current = SF_NIO_STAGE_SEND; - if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error + if (SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error ioevent_add_to_deleted_list(task); return -1; } @@ -966,8 +987,8 @@ int sf_client_sock_write(int sock, short event, void *arg) return -1; } - if (SF_CTX->send_done_callback != NULL) { - if (SF_CTX->send_done_callback(task, length) != 0) { + if (SF_CTX->callbacks.send_done != NULL) { + if (SF_CTX->callbacks.send_done(task, length) != 0) { ioevent_add_to_deleted_list(task); return -1; } diff --git a/src/sf_nio.h b/src/sf_nio.h index a64a406..d4a8a1f 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -37,7 +37,7 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback, sf_release_buffer_callback release_buffer_callback); @@ -47,17 +47,28 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, set_body_length_func, alloc_recv_buffer_func, \ deal_func, cleanup_func, timeout_callback, NULL) -static inline void sf_set_deal_task_func_ex(SFContext *sf_context, - sf_deal_task_func deal_func) +static inline void sf_set_deal_task_callback_ex(SFContext *sf_context, + sf_deal_task_callback deal_func) { - sf_context->deal_task = deal_func; + sf_context->callbacks.deal_task = deal_func; } -#define sf_set_deal_task_func(deal_func) \ - sf_set_deal_task_func_ex(&g_sf_context, deal_func) +#define sf_set_deal_task_callback(deal_func) \ + sf_set_deal_task_callback_ex(&g_sf_context, deal_func) -static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context, - const bool enabled) + +static inline void sf_set_connect_done_callback_ex(SFContext *sf_context, + sf_connect_done_callback done_callback) +{ + sf_context->callbacks.connect_done = done_callback; +} + +#define sf_set_connect_done_callback(done_callback) \ + sf_set_connect_done_callback_ex(&g_sf_context, done_callback) + + +static inline void sf_set_remove_from_ready_list_ex( + SFContext *sf_context, const bool enabled) { sf_context->remove_from_ready_list = enabled; } @@ -65,14 +76,14 @@ static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context, #define sf_set_remove_from_ready_list(enabled) \ sf_set_remove_from_ready_list_ex(&g_sf_context, enabled); -static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex( +static inline TaskCleanUpCallback sf_get_task_cleanup_callback_ex( SFContext *sf_context) { - return sf_context->task_cleanup_func; + return sf_context->callbacks.task_cleanup; } -#define sf_get_task_cleanup_func() \ - sf_get_task_cleanup_func_ex(&g_sf_context) +#define sf_get_task_cleanup_callback() \ + sf_get_task_cleanup_callback_ex(&g_sf_context) #define sf_nio_task_is_idle(task) \ (task->offset == 0 && task->length == 0) @@ -95,7 +106,7 @@ void sf_task_detach_thread(struct fast_task_info *task); static inline int sf_set_body_length(struct fast_task_info *task) { - if (SF_CTX->set_body_length(task) != 0) { + if (SF_CTX->callbacks.set_body_length(task) != 0) { return -1; } if (task->length < 0) { @@ -120,7 +131,7 @@ static inline int sf_set_body_length(struct fast_task_info *task) } int sf_socket_async_connect_server(struct fast_task_info *task); -int sf_socket_connect_server_done(struct fast_task_info *task); +int sf_socket_async_connect_check(struct fast_task_info *task); ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); diff --git a/src/sf_service.c b/src/sf_service.c index a953419..e883a16 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -117,7 +117,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, TaskInitCallback init_callback, @@ -134,9 +134,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, pthread_attr_t thread_attr; snprintf(sf_context->name, sizeof(sf_context->name), "%s", name); + sf_context->connect_need_log = true; sf_context->realloc_task_buffer = g_sf_global_vars. min_buff_size < g_sf_global_vars.max_buff_size; - sf_context->accept_done_func = accept_done_callback; + sf_context->callbacks.accept_done = accept_done_callback; sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func, alloc_recv_buffer_func, send_done_callback, deal_func, task_cleanup_func, @@ -335,7 +336,7 @@ static void *worker_thread_entrance(void *arg) ioevent_loop(thread_ctx->thread_data, sf_recv_notify_read, - thread_ctx->sf_context->task_cleanup_func, + thread_ctx->sf_context->callbacks.task_cleanup, &g_sf_global_vars.continue_flag); ioevent_destroy(&thread_ctx->thread_data->ev_puller); @@ -530,8 +531,8 @@ static void accept_run(SFListener *listener) task->thread_data = listener->handler->ctx->thread_data + task->event.fd % listener->handler->ctx->work_threads; - if (listener->handler->ctx->accept_done_func != NULL) { - if (listener->handler->ctx->accept_done_func(task, + if (listener->handler->ctx->callbacks.accept_done != NULL) { + if (listener->handler->ctx->callbacks.accept_done(task, listener->inaddr.sin_addr.s_addr, listener->is_inner) != 0) { diff --git a/src/sf_service.h b/src/sf_service.h index b94b88e..fa19e8b 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -41,7 +41,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, sf_send_done_callback send_done_callback, - sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_deal_task_callback deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_padding_size, const int task_arg_size, TaskInitCallback init_callback, @@ -81,10 +81,18 @@ static inline void sf_service_set_smart_polling_ex(SFContext *sf_context, { sf_context->smart_polling = *smart_polling; } - #define sf_service_set_smart_polling(smart_polling) \ sf_service_set_smart_polling_ex(&g_sf_context, smart_polling) +static inline void sf_service_set_connect_need_log_ex( + SFContext *sf_context, const bool need_log) +{ + sf_context->connect_need_log = need_log; +} +#define sf_service_set_connect_need_log(need_log) \ + sf_service_set_connect_need_log_ex(&g_sf_context, need_log) + + int sf_setup_signal_handler(); int sf_startup_schedule(pthread_t *schedule_tid); diff --git a/src/sf_types.h b/src/sf_types.h index 0253b3f..36ce1ed 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -43,10 +43,12 @@ typedef int (*sf_accept_done_callback)(struct fast_task_info *task, typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, const int buff_size, bool *new_alloc); -typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); +typedef int (*sf_deal_task_callback)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef int (*sf_send_done_callback)(struct fast_task_info *task, const int length); +typedef void (*sf_connect_done_callback)(struct fast_task_info *task, + const int err_no); /* calback for release iovec buffer */ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); @@ -73,7 +75,7 @@ typedef void (*sf_close_server_callback)(struct sf_listener *listener); typedef struct fast_task_info * (*sf_accept_connection_callback)( struct sf_listener *listener); typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task); -typedef int (*sf_connect_server_done_callback)(struct fast_task_info *task); +typedef int (*sf_async_connect_check_callback)(struct fast_task_info *task); typedef void (*sf_close_connection_callback)(struct fast_task_info *task); typedef ssize_t (*sf_send_data_callback)(struct fast_task_info *task, @@ -114,7 +116,7 @@ typedef struct sf_network_handler { /* for client side */ sf_async_connect_server_callback async_connect_server; - sf_connect_server_done_callback connect_server_done; + sf_async_connect_check_callback async_connect_check; /* server and client both */ sf_close_connection_callback close_connection; @@ -123,6 +125,18 @@ typedef struct sf_network_handler { sf_recv_data_callback recv_data; } SFNetworkHandler; +typedef struct sf_nio_callbacks { + TaskCleanUpCallback task_cleanup; + sf_deal_task_callback deal_task; + sf_set_body_length_callback set_body_length; + sf_alloc_recv_buffer_callback alloc_recv_buffer; + sf_accept_done_callback accept_done; + sf_connect_done_callback connect_done; + sf_send_done_callback send_done; + sf_recv_timeout_callback task_timeout; + sf_release_buffer_callback release_buffer; +} SFNIOCallbacks; + typedef struct sf_context { char name[64]; struct nio_thread_data *thread_data; @@ -140,15 +154,10 @@ typedef struct sf_context { int header_size; bool remove_from_ready_list; bool realloc_task_buffer; + bool connect_need_log; //for client connect FCSmartPollingConfig smart_polling; - sf_deal_task_func deal_task; - sf_set_body_length_callback set_body_length; - sf_alloc_recv_buffer_callback alloc_recv_buffer; - sf_accept_done_callback accept_done_func; - sf_send_done_callback send_done_callback; - TaskCleanUpCallback task_cleanup_func; - sf_recv_timeout_callback timeout_callback; - sf_release_buffer_callback release_buffer_callback; + + SFNIOCallbacks callbacks; } SFContext; typedef struct {