From a749b84ce29098f8688826652dbd94fac1b3d62b Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 6 Sep 2020 18:49:15 +0800 Subject: [PATCH] sockopt.[hc]: add function asyncconnectserverbyip --- HISTORY | 3 +- src/connection_pool.c | 87 +++++++++++++++++++++---------------------- src/connection_pool.h | 15 ++++++++ src/fast_task_queue.h | 11 ++++-- src/sockopt.c | 29 ++++++++++++--- src/sockopt.h | 21 +++++++---- 6 files changed, 106 insertions(+), 60 deletions(-) diff --git a/HISTORY b/HISTORY index f4779ce..f208f0f 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-09-01 +Version 1.44 2020-09-06 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -36,6 +36,7 @@ Version 1.44 2020-09-01 * add files: thread_pool.[hc] * shared_func.[hc]: add function fc_path_contains * fast_mblock.[hc]: support alloc elements limit + * sockopt.[hc]: add function asyncconnectserverbyip Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/src/connection_pool.c b/src/connection_pool.c index a062e60..694894d 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -106,74 +106,73 @@ void conn_pool_disconnect_server(ConnectionInfo *pConnection) } } -int conn_pool_connect_server_ex(ConnectionInfo *pConnection, +int conn_pool_connect_server_ex(ConnectionInfo *conn, const int connect_timeout, const char *bind_ipaddr, const bool log_connect_error) { int result; - int domain; - if (pConnection->sock >= 0) + if (conn->sock >= 0) { - close(pConnection->sock); + close(conn->sock); } - if (pConnection->socket_domain == AF_INET || - pConnection->socket_domain == AF_INET6) + if ((conn->sock=socketCreateEx2(conn->socket_domain, conn->ip_addr, + O_NONBLOCK, bind_ipaddr, &result)) < 0) { - domain = pConnection->socket_domain; - } - else - { - domain = is_ipv6_addr(pConnection->ip_addr) ? AF_INET6 : AF_INET; - } - pConnection->sock = socket(domain, SOCK_STREAM, 0); - if(pConnection->sock < 0) - { - logError("file: "__FILE__", line: %d, " - "socket create fail, errno: %d, " - "error info: %s", __LINE__, errno, STRERROR(errno)); - return errno != 0 ? errno : EPERM; - } - - if (bind_ipaddr != NULL && *bind_ipaddr != '\0') - { - if ((result=socketBind2(domain, pConnection->sock, bind_ipaddr, 0)) != 0) - { - close(pConnection->sock); - pConnection->sock = -1; - return result; - } + return result; } - SET_SOCKOPT_NOSIGPIPE(pConnection->sock); - if ((result=tcpsetnonblockopt(pConnection->sock)) != 0) - { - close(pConnection->sock); - pConnection->sock = -1; - return result; - } - - if ((result=connectserverbyip_nb(pConnection->sock, - pConnection->ip_addr, pConnection->port, - connect_timeout)) != 0) + if ((result=connectserverbyip_nb(conn->sock, conn->ip_addr, + conn->port, connect_timeout)) != 0) { if (log_connect_error) { logError("file: "__FILE__", line: %d, " "connect to server %s:%d fail, errno: %d, " - "error info: %s", __LINE__, pConnection->ip_addr, - pConnection->port, result, STRERROR(result)); + "error info: %s", __LINE__, conn->ip_addr, + conn->port, result, STRERROR(result)); } - close(pConnection->sock); - pConnection->sock = -1; + close(conn->sock); + conn->sock = -1; return result; } return 0; } +int conn_pool_async_connect_server_ex(ConnectionInfo *conn, + const char *bind_ipaddr) +{ + int result; + + if (conn->sock >= 0) + { + close(conn->sock); + } + + if ((conn->sock=socketCreateEx2(conn->socket_domain, + conn->ip_addr, O_NONBLOCK, bind_ipaddr, + &result)) < 0) + { + return result; + } + + result = asyncconnectserverbyip(conn->sock, conn->ip_addr, conn->port); + if (!(result == 0 || result == EINPROGRESS)) + { + logError("file: "__FILE__", line: %d, " + "connect to server %s:%d fail, errno: %d, " + "error info: %s", __LINE__, conn->ip_addr, + conn->port, result, STRERROR(result)); + close(conn->sock); + conn->sock = -1; + } + + return result; +} + static inline void conn_pool_get_key(const ConnectionInfo *conn, char *key, int *key_len) { *key_len = sprintf(key, "%s_%d", conn->ip_addr, conn->port); diff --git a/src/connection_pool.h b/src/connection_pool.h index b21d561..251dfce 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -239,6 +239,21 @@ static inline int conn_pool_connect_server_anyway(ConnectionInfo *pConnection, connect_timeout, bind_ipaddr, true); } +/** +* async connect to the server +* parameters: +* pConnection: the connection +* bind_ipaddr: the ip address to bind, NULL or empty for any +* NOTE: pConnection->sock will be closed when it >= 0 before connect +* return 0 or EINPROGRESS for success, others for error +*/ +int conn_pool_async_connect_server_ex(ConnectionInfo *conn, + const char *bind_ipaddr); + +#define conn_pool_async_connect_server(conn) \ + conn_pool_async_connect_server_ex(conn, NULL) + + /** * get connection count of the pool * parameters: diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index e71b552..b15ba5c 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -66,20 +66,25 @@ struct nio_thread_data struct fast_task_info { IOEventEntry event; //must first - char client_ip[IP_ADDRESS_SIZE]; + union { + char server_ip[IP_ADDRESS_SIZE]; + char client_ip[IP_ADDRESS_SIZE]; + }; void *arg; //extra argument pointer char *data; //buffer for write or recv int size; //alloc size int length; //data length int offset; //current offset - uint16_t port; //client port + uint16_t port; //peer port char nio_stage; //stage for network IO bool canceled; //if task canceled + int connect_timeout; //for client side + int network_timeout; int64_t req_count; //request count TaskFinishCallback finish_callback; struct nio_thread_data *thread_data; - struct fast_task_info *next; void *ctx; //context pointer for libserverframe nio + struct fast_task_info *next; }; struct fast_task_queue diff --git a/src/sockopt.c b/src/sockopt.c index 4a1b762..feed6f5 100644 --- a/src/sockopt.c +++ b/src/sockopt.c @@ -635,15 +635,34 @@ int connectserverbyip_nb_ex(int sock, const char *server_ip, \ return result; } +int asyncconnectserverbyip(int sock, const char *server_ip, + const short server_port) +{ + int result; + sockaddr_convert_t convert; + + if ((result=setsockaddrbyip(server_ip, server_port, &convert)) != 0) + { + return result; + } + + if (connect(sock, &convert.sa.addr, convert.len) == 0) { + return 0; + } + else + { + return errno != 0 ? errno : EINPROGRESS; + } +} + int socketCreateEx2(int af, const char *server_ip, - const int timeout, const int flags, - const char *bind_ipaddr, int *err_no) + const int flags, const char *bind_ipaddr, int *err_no) { int sock; - if (af == AF_UNSPEC) + if (!(af == AF_INET || af == AF_INET6)) { - af = is_ipv6_addr(server_ip) ? AF_INET6 : AF_INET; + af = is_ipv6_addr(server_ip) ? AF_INET6 : AF_INET; } sock = socket(af, SOCK_STREAM, 0); @@ -689,7 +708,7 @@ int socketClientEx2(int af, const char *server_ip, bool auto_detect; sock = socketCreateEx2(af, server_ip, - timeout, flags, bind_ipaddr, err_no); + flags, bind_ipaddr, err_no); if (sock < 0) { return sock; diff --git a/src/sockopt.h b/src/sockopt.h index c293450..6bc7731 100644 --- a/src/sockopt.h +++ b/src/sockopt.h @@ -208,6 +208,17 @@ int connectserverbyip_nb_ex(int sock, const char *server_ip, \ #define connectserverbyip_nb_auto(sock, server_ip, server_port, timeout) \ connectserverbyip_nb_ex(sock, server_ip, server_port, timeout, true) + +/** async connect to server + * parameters: + * sock: the non-block socket + * server_ip: ip address of the server + * server_port: port of the server + * return: error no, 0 or EINPROGRESS for success, others for fail +*/ +int asyncconnectserverbyip(int sock, const char *server_ip, + const short server_port); + /** accept client connect request * parameters: * sock: the server socket @@ -368,30 +379,26 @@ int socketServer2(int af, const char *bind_ipaddr, const int port, int *err_no); * parameters: * af: family, AF_UNSPEC (auto dectect), AF_INET or AF_INET6 * server_ip: ip address of the server to detect family when af == AF_UNSPEC - * timeout: connect timeout in seconds * flags: socket flags such as O_NONBLOCK for non-block socket * bind_ipaddr: the ip address to bind, NULL or empty for bind ANY * err_no: store the error no * return: >= 0 server socket, < 0 fail */ int socketCreateEx2(int af, const char *server_ip, - const int timeout, const int flags, - const char *bind_ipaddr, int *err_no); + const int flags, const char *bind_ipaddr, int *err_no); /** create socket (NOT connect to server yet) * parameters: * server_ip: ip address of the server to detect family - * timeout: connect timeout in seconds * flags: socket flags such as O_NONBLOCK for non-block socket * bind_ipaddr: the ip address to bind, NULL or empty for bind ANY * err_no: store the error no * return: >= 0 server socket, < 0 fail */ static inline int socketCreateExAuto(const char *server_ip, - const int timeout, const int flags, - const char *bind_ipaddr, int *err_no) + const int flags, const char *bind_ipaddr, int *err_no) { - return socketCreateEx2(AF_UNSPEC, server_ip, timeout, flags, + return socketCreateEx2(AF_UNSPEC, server_ip, flags, bind_ipaddr, err_no); }