From aaeb421e6d28dbdf47fa15c4d9465add71b6b49c Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 6 Sep 2020 18:54:46 +0800 Subject: [PATCH] support connect for client mode --- src/sf_define.h | 9 +++--- src/sf_nio.c | 83 ++++++++++++++++++++++++++++++++++++++++++------ src/sf_service.c | 11 ++++++- src/sf_service.h | 2 ++ 4 files changed, 91 insertions(+), 14 deletions(-) diff --git a/src/sf_define.h b/src/sf_define.h index 84d6dfb..bfdc15c 100644 --- a/src/sf_define.h +++ b/src/sf_define.h @@ -11,10 +11,11 @@ #define SF_DEF_MAX_BUFF_SIZE (64 * 1024) #define SF_NIO_STAGE_INIT 0 //set ioevent -#define SF_NIO_STAGE_RECV 1 //recv -#define SF_NIO_STAGE_SEND 2 //send -#define SF_NIO_STAGE_FORWARDED 3 //deal the forwarded request -#define SF_NIO_STAGE_CONTINUE 4 //notify the thread continue deal +#define SF_NIO_STAGE_CONNECT 1 //do connect +#define SF_NIO_STAGE_RECV 2 //do recv +#define SF_NIO_STAGE_SEND 3 //do send +#define SF_NIO_STAGE_FORWARDED 4 //deal the forwarded request +#define SF_NIO_STAGE_CONTINUE 5 //notify the thread continue deal #define SF_NIO_STAGE_CLOSE 9 //cleanup the task #ifdef __cplusplus diff --git a/src/sf_nio.c b/src/sf_nio.c index 0fb04c6..ba6e2e3 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -135,17 +135,17 @@ int sf_set_read_event(struct fast_task_info *task) return 0; } -static int sf_ioevent_add(struct fast_task_info *task) +static inline int sf_ioevent_add(struct fast_task_info *task, + IOEventCallback callback, const int timeout) { int result; result = ioevent_set(task, task->thread_data, task->event.fd, - IOEVENT_READ, (IOEventCallback)sf_client_sock_read, - g_sf_global_vars.network_timeout); + IOEVENT_READ, callback, timeout); return result > 0 ? -1 * result : result; } -static int sf_nio_init(struct fast_task_info *task) +static inline int sf_nio_init(struct fast_task_info *task) { int current_connections; @@ -155,7 +155,66 @@ static int sf_nio_init(struct fast_task_info *task) g_sf_global_vars.connection_stat.max_count = current_connections; } - return sf_ioevent_add(task); + return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read, + task->network_timeout); +} + +static int sf_client_sock_connect(int sock, short event, void *arg) +{ + int result; + socklen_t len; + struct fast_task_info *task; + + task = (struct fast_task_info *)arg; + if (event & IOEVENT_TIMEOUT) { + result = ETIMEDOUT; + } else { + len = sizeof(result); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { + result = errno != 0 ? errno : EACCES; + } + } + + if (result != 0) { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%d fail, errno: %d, " + "error info: %s", __LINE__, task->server_ip, + task->port, result, STRERROR(result)); + iovent_add_to_deleted_list(task); + return -1; + } + + task->nio_stage = SF_NIO_STAGE_RECV; + task->event.callback = (IOEventCallback)sf_client_sock_read; + return 0; +} + +static int sf_connect_server(struct fast_task_info *task) +{ + int result; + + if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip, + O_NONBLOCK, NULL, &result)) < 0) + { + return result > 0 ? -1 * result : result; + } + + result = asyncconnectserverbyip(task->event.fd, + task->server_ip, task->port); + if (result == 0) { + task->nio_stage = SF_NIO_STAGE_RECV; + return sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_read, task->network_timeout); + } else if (result == EINPROGRESS) { + return sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_connect, task->connect_timeout); + } else { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%d fail, errno: %d, " + "error info: %s", __LINE__, task->server_ip, + task->port, result, STRERROR(result)); + return result > 0 ? -1 * result : result; + } } static int sf_nio_deal_task(struct fast_task_info *task) @@ -166,6 +225,9 @@ static int sf_nio_deal_task(struct fast_task_info *task) task->nio_stage = SF_NIO_STAGE_RECV; result = sf_nio_init(task); break; + case SF_NIO_STAGE_CONNECT: + result = sf_connect_server(task); + break; case SF_NIO_STAGE_RECV: if ((result=sf_set_read_event(task)) == 0) { @@ -180,7 +242,10 @@ static int sf_nio_deal_task(struct fast_task_info *task) result = SF_CTX->deal_task(task); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread - if ((result=sf_ioevent_add(task)) == 0) { + if ((result=sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_read, + task->network_timeout)) == 0) + { result = SF_CTX->deal_task(task); } break; @@ -304,7 +369,7 @@ int sf_client_sock_read(int sock, short event, void *arg) } task->event.timer.expires = g_current_time + - g_sf_global_vars.network_timeout; + task->network_timeout; fast_timer_add(&task->thread_data->timer, &task->event.timer); } @@ -342,7 +407,7 @@ int sf_client_sock_read(int sock, short event, void *arg) while (1) { fast_timer_modify(&task->thread_data->timer, &task->event.timer, g_current_time + - g_sf_global_vars.network_timeout); + task->network_timeout); if (task->length == 0) { //recv header recv_bytes = SF_CTX->header_size - task->offset; } @@ -515,7 +580,7 @@ int sf_client_sock_write(int sock, short event, void *arg) while (1) { fast_timer_modify(&task->thread_data->timer, &task->event.timer, g_current_time + - g_sf_global_vars.network_timeout); + task->network_timeout); bytes = write(sock, task->data + task->offset, task->length - task->offset); diff --git a/src/sf_service.c b/src/sf_service.c index 31fc448..ccf610a 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -46,6 +46,14 @@ struct accept_thread_context { int server_sock; }; + +int sf_init_task(struct fast_task_info *task) +{ + task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side + task->network_timeout = SF_G_NETWORK_TIMEOUT; + return 0; +} + static void *worker_thread_entrance(void *arg); static int sf_init_free_queues(const int task_arg_size, @@ -82,7 +90,8 @@ static int sf_init_free_queues(const int task_arg_size, if ((result=free_queue_init_ex2(g_sf_global_vars.max_connections, init_connections, alloc_conn_once, g_sf_global_vars. min_buff_size, g_sf_global_vars.max_buff_size, - task_arg_size, init_callback)) != 0) + task_arg_size, init_callback != NULL ? + init_callback : sf_init_task)) != 0) { return result; } diff --git a/src/sf_service.h b/src/sf_service.h index b36950d..0c4266c 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -83,6 +83,8 @@ struct nio_thread_data *sf_get_random_thread_data_ex(SFContext *sf_context); void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler); +int sf_init_task(struct fast_task_info *task); + #ifdef __cplusplus } #endif