connect by ip and connection pool support ipv6

pull/10/head
yuqing 2016-05-18 10:40:36 +08:00
parent 094f3900c1
commit 3eb39e834c
5 changed files with 120 additions and 52 deletions

View File

@ -1,7 +1,8 @@
Version 1.28 2016-05-17
Version 1.28 2016-05-18
* id generator support extra bits
* change inet_aton to inet_pton
* connect by ip and connection pool support ipv6
Version 1.27 2016-04-15
* add function fd_set_cloexec

View File

@ -15,8 +15,9 @@
#include "sched_thread.h"
#include "connection_pool.h"
int conn_pool_init(ConnectionPool *cp, int connect_timeout, \
const int max_count_per_entry, const int max_idle_time)
int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, \
const int max_count_per_entry, const int max_idle_time,
const int socket_domain)
{
int result;
@ -27,6 +28,7 @@ int conn_pool_init(ConnectionPool *cp, int connect_timeout, \
cp->connect_timeout = connect_timeout;
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
cp->socket_domain = socket_domain;
return hash_init(&(cp->hash_array), simple_hash, 1024, 0.75);
}
@ -79,13 +81,23 @@ int conn_pool_connect_server(ConnectionInfo *pConnection, \
const int connect_timeout)
{
int result;
int domain;
if (pConnection->sock >= 0)
{
close(pConnection->sock);
}
pConnection->sock = socket(AF_INET, SOCK_STREAM, 0);
if (pConnection->socket_domain == AF_INET ||
pConnection->socket_domain == AF_INET6)
{
domain = pConnection->socket_domain;
}
else
{
domain = is_ipv6_addr(pConnection->ip_addr) ? AF_INET6 : AF_INET;
}
pConnection->sock = socket(domain, SOCK_STREAM, 0);
if(pConnection->sock < 0)
{
logError("file: "__FILE__", line: %d, " \
@ -118,26 +130,15 @@ int conn_pool_connect_server(ConnectionInfo *pConnection, \
return 0;
}
static int conn_pool_get_key(const ConnectionInfo *conn, char *key, int *key_len)
static inline void conn_pool_get_key(const ConnectionInfo *conn, char *key, int *key_len)
{
struct in_addr sin_addr;
if (inet_pton(AF_INET, conn->ip_addr, &sin_addr) == 0)
{
*key_len = 0;
return EINVAL;
}
int2buff(sin_addr.s_addr, key);
*key_len = 4 + sprintf(key + 4, "%d", conn->port);
return 0;
*key_len = sprintf(key, "%s_%d", conn->ip_addr, conn->port);
}
ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
const ConnectionInfo *conn, int *err_no)
{
char key[32];
char key[INET6_ADDRSTRLEN + 8];
int key_len;
int bytes;
char *p;
@ -146,11 +147,7 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
ConnectionInfo *ci;
time_t current_time;
*err_no = conn_pool_get_key(conn, key, &key_len);
if (*err_no != 0)
{
return NULL;
}
conn_pool_get_key(conn, key, &key_len);
pthread_mutex_lock(&cp->lock);
cm = (ConnectionManager *)hash_find(&cp->hash_array, key, key_len);
@ -223,6 +220,7 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
pthread_mutex_unlock(&cm->lock);
memcpy(node->conn, conn, sizeof(ConnectionInfo));
node->conn->socket_domain = cp->socket_domain;
node->conn->sock = -1;
*err_no = conn_pool_connect_server(node->conn, \
cp->connect_timeout);
@ -284,17 +282,12 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
const bool bForce)
{
char key[32];
int result;
char key[INET6_ADDRSTRLEN + 8];
int key_len;
ConnectionManager *cm;
ConnectionNode *node;
result = conn_pool_get_key(conn, key, &key_len);
if (result != 0)
{
return result;
}
conn_pool_get_key(conn, key, &key_len);
pthread_mutex_lock(&cp->lock);
cm = (ConnectionManager *)hash_find(&cp->hash_array, key, key_len);

View File

@ -27,7 +27,8 @@ typedef struct
{
int sock;
int port;
char ip_addr[IP_ADDRESS_SIZE];
char ip_addr[INET6_ADDRSTRLEN];
int socket_domain; //socket domain, AF_INET, AF_INET6 or PF_UNSPEC for auto dedect
} ConnectionInfo;
struct tagConnectionManager;
@ -57,8 +58,23 @@ typedef struct tagConnectionPool {
unit: second
*/
int max_idle_time;
int socket_domain; //socket domain
} ConnectionPool;
/**
* init ex function
* parameters:
* cp: the ConnectionPool
* connect_timeout: the connect timeout in seconds
* max_count_per_entry: max connection count per host:port
* max_idle_time: reconnect the server after max idle time in seconds
* socket_domain: the socket domain
* return 0 for success, != 0 for error
*/
int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain);
/**
* init function
* parameters:
@ -68,8 +84,13 @@ typedef struct tagConnectionPool {
* max_idle_time: reconnect the server after max idle time in seconds
* return 0 for success, != 0 for error
*/
int conn_pool_init(ConnectionPool *cp, int connect_timeout, \
const int max_count_per_entry, const int max_idle_time);
static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time)
{
const int socket_domain = AF_INET;
return conn_pool_init_ex(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain);
}
/**
* destroy function

View File

@ -458,21 +458,55 @@ int tcpsenddata_nb(int sock, void* data, const int size, const int timeout)
return 0;
}
int setsockaddrbyip(const char *ip, const short port, struct sockaddr_in *addr,
struct sockaddr_in6 *addr6, void **output, int *size)
{
int domain;
void *dest;
if (is_ipv6_addr(ip))
{
*output = addr6;
*size = sizeof(*addr6);
dest = &addr6->sin6_addr;
domain = AF_INET6;
addr6->sin6_family = PF_INET6;
addr6->sin6_port = htons(port);
}
else //ipv4
{
*output = addr;
*size = sizeof(*addr);
dest = &addr->sin_addr;
domain = AF_INET;
addr->sin_family = PF_INET;
addr->sin_port = htons(port);
}
if (inet_pton(domain, ip, dest) == 0)
{
return EINVAL;
}
return 0;
}
int connectserverbyip(int sock, const char *server_ip, const short server_port)
{
int result;
int result;
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
void *dest;
int size;
addr.sin_family = PF_INET;
addr.sin_port = htons(server_port);
result = inet_pton(AF_INET, server_ip, &addr.sin_addr);
if (result == 0 )
{
return EINVAL;
}
if ((result=setsockaddrbyip(server_ip, server_port, &addr, &addr6,
&dest, &size)) != 0)
{
return result;
}
result = connect(sock, (const struct sockaddr*)&addr, sizeof(addr));
if (result < 0)
if (connect(sock, (const struct sockaddr*)dest, size) < 0)
{
return errno != 0 ? errno : EINTR;
}
@ -498,14 +532,15 @@ int connectserverbyip_nb_ex(int sock, const char *server_ip, \
#endif
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
void *dest;
int size;
addr.sin_family = PF_INET;
addr.sin_port = htons(server_port);
result = inet_pton(AF_INET, server_ip, &addr.sin_addr);
if (result == 0 )
{
return EINVAL;
}
if ((result=setsockaddrbyip(server_ip, server_port, &addr, &addr6,
&dest, &size)) != 0)
{
return result;
}
if (auto_detect)
{
@ -537,8 +572,7 @@ int connectserverbyip_nb_ex(int sock, const char *server_ip, \
do
{
if (connect(sock, (const struct sockaddr*)&addr, \
sizeof(addr)) < 0)
if (connect(sock, (const struct sockaddr*)dest, size) < 0)
{
result = errno != 0 ? errno : EINPROGRESS;
if (result != EINPROGRESS)

View File

@ -12,6 +12,7 @@
#define _SOCKETOPT_H_
#include <net/if.h>
#include <string.h>
#include "common_define.h"
#define FAST_WRITE_BUFF_SIZE 256 * 1024
@ -346,6 +347,24 @@ int gethostaddrs(char **if_alias_prefixes, const int prefix_count, \
*/
int getifconfigs(FastIFConfig *if_configs, const int max_count, int *count);
/** set socket address by ip
* parameters:
* ip: the ip address
* port: the port
* addr: ipv4 addr
* addr6: ipv6 addr
* output: return addr pointer
* size: return the size of addr
* return: error no, 0 success, != 0 fail
*/
int setsockaddrbyip(const char *ip, const short port, struct sockaddr_in *addr,
struct sockaddr_in6 *addr6, void **output, int *size);
static inline bool is_ipv6_addr(const char *ip)
{
return (*ip == ':' || strchr(ip, ':') != NULL); //ipv6
}
#ifdef __cplusplus
}
#endif