sockopt.[hc]: add function asyncconnectserverbyip

pull/37/head
YuQing 2020-09-06 18:49:15 +08:00
parent d9b914ea6e
commit a749b84ce2
6 changed files with 106 additions and 60 deletions

View File

@ -1,5 +1,5 @@
Version 1.44 2020-09-01
Version 1.44 2020-09-06
* add test file src/tests/test_pthread_lock.c
* add uniq_skiplist.[hc]
* add function split_string_ex
@ -36,6 +36,7 @@ Version 1.44 2020-09-01
* add files: thread_pool.[hc]
* shared_func.[hc]: add function fc_path_contains
* fast_mblock.[hc]: support alloc elements limit
* sockopt.[hc]: add function asyncconnectserverbyip
Version 1.43 2019-12-25
* replace function call system to getExecResult,

View File

@ -106,74 +106,73 @@ void conn_pool_disconnect_server(ConnectionInfo *pConnection)
}
}
int conn_pool_connect_server_ex(ConnectionInfo *pConnection,
int conn_pool_connect_server_ex(ConnectionInfo *conn,
const int connect_timeout, const char *bind_ipaddr,
const bool log_connect_error)
{
int result;
int domain;
if (pConnection->sock >= 0)
if (conn->sock >= 0)
{
close(pConnection->sock);
close(conn->sock);
}
if (pConnection->socket_domain == AF_INET ||
pConnection->socket_domain == AF_INET6)
if ((conn->sock=socketCreateEx2(conn->socket_domain, conn->ip_addr,
O_NONBLOCK, bind_ipaddr, &result)) < 0)
{
domain = pConnection->socket_domain;
}
else
{
domain = is_ipv6_addr(pConnection->ip_addr) ? AF_INET6 : AF_INET;
}
pConnection->sock = socket(domain, SOCK_STREAM, 0);
if(pConnection->sock < 0)
{
logError("file: "__FILE__", line: %d, "
"socket create fail, errno: %d, "
"error info: %s", __LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EPERM;
}
if (bind_ipaddr != NULL && *bind_ipaddr != '\0')
{
if ((result=socketBind2(domain, pConnection->sock, bind_ipaddr, 0)) != 0)
{
close(pConnection->sock);
pConnection->sock = -1;
return result;
}
return result;
}
SET_SOCKOPT_NOSIGPIPE(pConnection->sock);
if ((result=tcpsetnonblockopt(pConnection->sock)) != 0)
{
close(pConnection->sock);
pConnection->sock = -1;
return result;
}
if ((result=connectserverbyip_nb(pConnection->sock,
pConnection->ip_addr, pConnection->port,
connect_timeout)) != 0)
if ((result=connectserverbyip_nb(conn->sock, conn->ip_addr,
conn->port, connect_timeout)) != 0)
{
if (log_connect_error)
{
logError("file: "__FILE__", line: %d, "
"connect to server %s:%d fail, errno: %d, "
"error info: %s", __LINE__, pConnection->ip_addr,
pConnection->port, result, STRERROR(result));
"error info: %s", __LINE__, conn->ip_addr,
conn->port, result, STRERROR(result));
}
close(pConnection->sock);
pConnection->sock = -1;
close(conn->sock);
conn->sock = -1;
return result;
}
return 0;
}
int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
const char *bind_ipaddr)
{
int result;
if (conn->sock >= 0)
{
close(conn->sock);
}
if ((conn->sock=socketCreateEx2(conn->socket_domain,
conn->ip_addr, O_NONBLOCK, bind_ipaddr,
&result)) < 0)
{
return result;
}
result = asyncconnectserverbyip(conn->sock, conn->ip_addr, conn->port);
if (!(result == 0 || result == EINPROGRESS))
{
logError("file: "__FILE__", line: %d, "
"connect to server %s:%d fail, errno: %d, "
"error info: %s", __LINE__, conn->ip_addr,
conn->port, result, STRERROR(result));
close(conn->sock);
conn->sock = -1;
}
return result;
}
static inline void conn_pool_get_key(const ConnectionInfo *conn, char *key, int *key_len)
{
*key_len = sprintf(key, "%s_%d", conn->ip_addr, conn->port);

View File

@ -239,6 +239,21 @@ static inline int conn_pool_connect_server_anyway(ConnectionInfo *pConnection,
connect_timeout, bind_ipaddr, true);
}
/**
* async connect to the server
* parameters:
* pConnection: the connection
* bind_ipaddr: the ip address to bind, NULL or empty for any
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 or EINPROGRESS for success, others for error
*/
int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
const char *bind_ipaddr);
#define conn_pool_async_connect_server(conn) \
conn_pool_async_connect_server_ex(conn, NULL)
/**
* get connection count of the pool
* parameters:

View File

@ -66,20 +66,25 @@ struct nio_thread_data
struct fast_task_info
{
IOEventEntry event; //must first
char client_ip[IP_ADDRESS_SIZE];
union {
char server_ip[IP_ADDRESS_SIZE];
char client_ip[IP_ADDRESS_SIZE];
};
void *arg; //extra argument pointer
char *data; //buffer for write or recv
int size; //alloc size
int length; //data length
int offset; //current offset
uint16_t port; //client port
uint16_t port; //peer port
char nio_stage; //stage for network IO
bool canceled; //if task canceled
int connect_timeout; //for client side
int network_timeout;
int64_t req_count; //request count
TaskFinishCallback finish_callback;
struct nio_thread_data *thread_data;
struct fast_task_info *next;
void *ctx; //context pointer for libserverframe nio
struct fast_task_info *next;
};
struct fast_task_queue

View File

@ -635,15 +635,34 @@ int connectserverbyip_nb_ex(int sock, const char *server_ip, \
return result;
}
int asyncconnectserverbyip(int sock, const char *server_ip,
const short server_port)
{
int result;
sockaddr_convert_t convert;
if ((result=setsockaddrbyip(server_ip, server_port, &convert)) != 0)
{
return result;
}
if (connect(sock, &convert.sa.addr, convert.len) == 0) {
return 0;
}
else
{
return errno != 0 ? errno : EINPROGRESS;
}
}
int socketCreateEx2(int af, const char *server_ip,
const int timeout, const int flags,
const char *bind_ipaddr, int *err_no)
const int flags, const char *bind_ipaddr, int *err_no)
{
int sock;
if (af == AF_UNSPEC)
if (!(af == AF_INET || af == AF_INET6))
{
af = is_ipv6_addr(server_ip) ? AF_INET6 : AF_INET;
af = is_ipv6_addr(server_ip) ? AF_INET6 : AF_INET;
}
sock = socket(af, SOCK_STREAM, 0);
@ -689,7 +708,7 @@ int socketClientEx2(int af, const char *server_ip,
bool auto_detect;
sock = socketCreateEx2(af, server_ip,
timeout, flags, bind_ipaddr, err_no);
flags, bind_ipaddr, err_no);
if (sock < 0)
{
return sock;

View File

@ -208,6 +208,17 @@ int connectserverbyip_nb_ex(int sock, const char *server_ip, \
#define connectserverbyip_nb_auto(sock, server_ip, server_port, timeout) \
connectserverbyip_nb_ex(sock, server_ip, server_port, timeout, true)
/** async connect to server
* parameters:
* sock: the non-block socket
* server_ip: ip address of the server
* server_port: port of the server
* return: error no, 0 or EINPROGRESS for success, others for fail
*/
int asyncconnectserverbyip(int sock, const char *server_ip,
const short server_port);
/** accept client connect request
* parameters:
* sock: the server socket
@ -368,30 +379,26 @@ int socketServer2(int af, const char *bind_ipaddr, const int port, int *err_no);
* parameters:
* af: family, AF_UNSPEC (auto dectect), AF_INET or AF_INET6
* server_ip: ip address of the server to detect family when af == AF_UNSPEC
* timeout: connect timeout in seconds
* flags: socket flags such as O_NONBLOCK for non-block socket
* bind_ipaddr: the ip address to bind, NULL or empty for bind ANY
* err_no: store the error no
* return: >= 0 server socket, < 0 fail
*/
int socketCreateEx2(int af, const char *server_ip,
const int timeout, const int flags,
const char *bind_ipaddr, int *err_no);
const int flags, const char *bind_ipaddr, int *err_no);
/** create socket (NOT connect to server yet)
* parameters:
* server_ip: ip address of the server to detect family
* timeout: connect timeout in seconds
* flags: socket flags such as O_NONBLOCK for non-block socket
* bind_ipaddr: the ip address to bind, NULL or empty for bind ANY
* err_no: store the error no
* return: >= 0 server socket, < 0 fail
*/
static inline int socketCreateExAuto(const char *server_ip,
const int timeout, const int flags,
const char *bind_ipaddr, int *err_no)
const int flags, const char *bind_ipaddr, int *err_no)
{
return socketCreateEx2(AF_UNSPEC, server_ip, timeout, flags,
return socketCreateEx2(AF_UNSPEC, server_ip, flags,
bind_ipaddr, err_no);
}