Merge branch 'master' into master

pull/47/head
YuQing 2023-11-23 08:53:07 +08:00 committed by GitHub
commit 7018f4e337
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1442 additions and 924 deletions

View File

@ -1,6 +1,13 @@
Version 1.70 2023-08-27
Version 1.70 2023-09-30
* get full mac address of infiniband NIC under Linux
* struct fast_task_info add field conn for RDMA connection
* server_id_func.[hc]: support communication type
* connection_pool.[hc] support callbacks for RDMA
* nio thread data support busy_polling_callback
* connection_pool.[hc] support thread local for performance
* struct fast_task_info support send and recv double buffers
* add functions: fc_queue_push_with_check and fc_queue_peek
Version 1.69 2023-08-05
* bugfixed: array_allocator_alloc MUST init the array

18
debian/changelog vendored
View File

@ -1,3 +1,21 @@
libfastcommon (1.0.70-3) unstable; urgency=medium
* upgrade to 1.0.70-3
-- YuQing <384681@qq.com> Tue, 21 Nov 2023 14:35:29 +0000
libfastcommon (1.0.70-2) unstable; urgency=medium
* upgrade to 1.0.70-2
-- YuQing <384681@qq.com> Mon, 20 Nov 2023 13:23:17 +0000
libfastcommon (1.0.70-1) unstable; urgency=medium
* upgrade to 1.0.70-1
-- YuQing <384681@qq.com> Sun, 19 Nov 2023 14:45:34 +0000
libfastcommon (1.0.69-1) unstable; urgency=medium
* upgrade to 1.0.69-1

View File

@ -3,7 +3,7 @@
%define CommitVersion %(echo $COMMIT_VERSION)
Name: libfastcommon
Version: 1.0.69
Version: 1.0.70
Release: 1%{?dist}
Summary: c common functions library extracted from my open source projects FastDFS
License: LGPL

View File

@ -16,30 +16,85 @@
#include <netdb.h>
#include <unistd.h>
#include <errno.h>
#include <dlfcn.h>
#include "logger.h"
#include "sockopt.h"
#include "shared_func.h"
#include "sched_thread.h"
#include "server_id_func.h"
#include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks = {
false, {{conn_pool_connect_server_ex1,
conn_pool_disconnect_server,
conn_pool_is_connected},
{NULL, NULL, NULL}}, {NULL}
};
static int node_init_for_socket(ConnectionNode *node,
ConnectionPool *cp)
{
node->conn = (ConnectionInfo *)(node + 1);
return 0;
}
static int node_init_for_rdma(ConnectionNode *node,
ConnectionPool *cp)
{
node->conn = (ConnectionInfo *)(node + 1);
node->conn->arg1 = node->conn->args + cp->extra_data_size;
return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn,
cp->extra_params.rdma.double_buffers, cp->extra_params.
rdma.buffer_size, cp->extra_params.rdma.pd);
}
static void cp_tls_destroy(void *ptr)
{
ConnectionThreadHashTable *htable;
ConnectionNode **pp;
ConnectionNode **end;
ConnectionNode *current;
ConnectionNode *node;
htable = ptr;
end = htable->buckets + htable->cp->extra_params.tls.htable_capacity;
for (pp=htable->buckets; pp<end; pp++) {
if (*pp == NULL) {
continue;
}
node = *pp;
do {
current = node;
node = node->next;
conn_pool_close_connection(htable->cp, current->conn);
} while (node != NULL);
}
free(ptr);
}
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain, const int htable_init_capacity,
fc_connection_callback_func connect_done_func, void *connect_done_args,
fc_connection_callback_func validate_func, void *validate_args,
const int extra_data_size)
const int extra_data_size, const ConnectionExtraParams *extra_params)
{
const int64_t alloc_elements_limit = 0;
int result;
int init_capacity;
int extra_connection_size;
fast_mblock_object_init_func obj_init_func;
if ((result=init_pthread_lock(&cp->lock)) != 0)
{
return result;
}
cp->connect_timeout = connect_timeout;
cp->connect_timeout_ms = connect_timeout * 1000;
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
cp->extra_data_size = extra_data_size;
cp->socket_domain = socket_domain;
cp->connect_done_callback.func = connect_done_func;
cp->connect_done_callback.args = connect_done_args;
@ -54,14 +109,40 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
return result;
}
if (extra_params != NULL) {
extra_connection_size = G_RDMA_CONNECTION_CALLBACKS.
get_connection_size();
obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma;
cp->extra_params = *extra_params;
} else {
extra_connection_size = 0;
cp->extra_params.tls.enabled = false;
cp->extra_params.tls.htable_capacity = 0;
cp->extra_params.rdma.buffer_size = 0;
cp->extra_params.rdma.pd = NULL;
obj_init_func = (fast_mblock_object_init_func)node_init_for_socket;
}
if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node",
sizeof(ConnectionNode) + sizeof(ConnectionInfo) +
extra_data_size, init_capacity, alloc_elements_limit,
NULL, NULL, true)) != 0)
extra_data_size + extra_connection_size, init_capacity,
alloc_elements_limit, obj_init_func, cp, true)) != 0)
{
return result;
}
logDebug("cp: %p, tls.enabled: %d, htable_capacity: %d",
cp, cp->extra_params.tls.enabled,
cp->extra_params.tls.htable_capacity);
if (cp->extra_params.tls.enabled) {
if ((result=pthread_key_create(&cp->tls_key, cp_tls_destroy)) != 0) {
logError("file: "__FILE__", line: %d, "
"pthread_key_create fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
return result;
}
}
return fc_hash_init(&(cp->hash_array), fc_simple_hash, init_capacity, 0.75);
}
@ -84,7 +165,8 @@ static int coon_pool_close_connections(const int index,
deleted = node;
node = node->next;
conn_pool_disconnect_server(deleted->conn);
G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type].
close_connection(deleted->conn);
fast_mblock_free_object(&cp->node_allocator, deleted);
}
@ -104,17 +186,22 @@ void conn_pool_destroy(ConnectionPool *cp)
pthread_mutex_destroy(&cp->lock);
}
void conn_pool_disconnect_server(ConnectionInfo *pConnection)
void conn_pool_disconnect_server(ConnectionInfo *conn)
{
if (pConnection->sock >= 0)
{
close(pConnection->sock);
pConnection->sock = -1;
}
if (conn->sock >= 0)
{
close(conn->sock);
conn->sock = -1;
}
}
bool conn_pool_is_connected(ConnectionInfo *conn)
{
return (conn->sock >= 0);
}
int conn_pool_connect_server_ex1(ConnectionInfo *conn,
const char *service_name, const int connect_timeout,
const char *service_name, const int connect_timeout_ms,
const char *bind_ipaddr, const bool log_connect_error)
{
int result;
@ -131,7 +218,7 @@ int conn_pool_connect_server_ex1(ConnectionInfo *conn,
}
if ((result=connectserverbyip_nb(conn->sock, conn->ip_addr,
conn->port, connect_timeout)) != 0)
conn->port, connect_timeout_ms / 1000)) != 0)
{
if (log_connect_error)
{
@ -184,23 +271,21 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
static inline void conn_pool_get_key(const ConnectionInfo *conn,
char *key, int *key_len)
{
*key_len = sprintf(key, "%s_%u", conn->ip_addr, conn->port);
*key_len = sprintf(key, "%s-%u", conn->ip_addr, conn->port);
}
ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
const ConnectionInfo *conn, const char *service_name, int *err_no)
static ConnectionInfo *get_connection(ConnectionPool *cp,
const ConnectionInfo *conn, const string_t *key,
const char *service_name, int *err_no)
{
char key[INET6_ADDRSTRLEN + 8];
int key_len;
ConnectionManager *cm;
ConnectionNode *node;
ConnectionInfo *ci;
time_t current_time;
conn_pool_get_key(conn, key, &key_len);
pthread_mutex_lock(&cp->lock);
cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len);
cm = (ConnectionManager *)fc_hash_find(
&cp->hash_array, key->str, key->len);
if (cm == NULL)
{
cm = (ConnectionManager *)fast_mblock_alloc_object(
@ -223,7 +308,7 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
pthread_mutex_unlock(&cp->lock);
return NULL;
}
fc_hash_insert(&cp->hash_array, key, key_len, cm);
fc_hash_insert(&cp->hash_array, key->str, key->len, cm);
}
pthread_mutex_unlock(&cp->lock);
@ -258,7 +343,6 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
return NULL;
}
node->conn = (ConnectionInfo *)(node + 1);
node->manager = cm;
node->next = NULL;
node->atime = 0;
@ -266,12 +350,15 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
cm->total_count++;
pthread_mutex_unlock(&cm->lock);
memcpy(node->conn, conn, sizeof(ConnectionInfo));
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
node->conn->port = conn->port;
node->conn->comm_type = conn->comm_type;
node->conn->socket_domain = cp->socket_domain;
node->conn->sock = -1;
node->conn->validate_flag = false;
*err_no = conn_pool_connect_server_ex1(node->conn,
service_name, cp->connect_timeout, NULL, true);
*err_no = G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
make_connection(node->conn, service_name,
cp->connect_timeout_ms, NULL, true);
if (*err_no == 0 && cp->connect_done_callback.func != NULL)
{
*err_no = cp->connect_done_callback.func(node->conn,
@ -279,11 +366,8 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
}
if (*err_no != 0)
{
if (node->conn->sock >= 0)
{
close(node->conn->sock);
node->conn->sock = -1;
}
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
close_connection(node->conn);
pthread_mutex_lock(&cm->lock);
cm->total_count--; //rollback
fast_mblock_free_object(&cp->node_allocator, node);
@ -340,17 +424,16 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
{
cm->total_count--;
logDebug("file: "__FILE__", line: %d, " \
"server %s:%u, connection: %d idle " \
"time: %d exceeds max idle time: %d, "\
"total_count: %d, free_count: %d", \
__LINE__, conn->ip_addr, conn->port, \
ci->sock, \
(int)(current_time - node->atime), \
cp->max_idle_time, cm->total_count, \
cm->free_count);
logDebug("file: "__FILE__", line: %d, "
"server %s:%u, connection: %d idle "
"time: %d exceeds max idle time: %d, "
"total_count: %d, free_count: %d", __LINE__,
conn->ip_addr, conn->port, ci->sock, (int)
(current_time - node->atime), cp->max_idle_time,
cm->total_count, cm->free_count);
conn_pool_disconnect_server(ci);
G_COMMON_CONNECTION_CALLBACKS[ci->comm_type].
close_connection(ci);
fast_mblock_free_object(&cp->node_allocator, node);
continue;
}
@ -367,18 +450,84 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
}
}
int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
const bool bForce)
ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
const ConnectionInfo *conn, const char *service_name, int *err_no)
{
string_t key;
int bytes;
uint32_t hash_code;
ConnectionNode **bucket;
ConnectionNode *node;
ConnectionInfo *ci;
char key_buff[INET6_ADDRSTRLEN + 8];
ConnectionThreadHashTable *htable;
key.str = key_buff;
conn_pool_get_key(conn, key.str, &key.len);
if (!cp->extra_params.tls.enabled) {
return get_connection(cp, conn, &key, service_name, err_no);
}
htable = pthread_getspecific(cp->tls_key);
if (htable == NULL) {
bytes = sizeof(ConnectionThreadHashTable) + sizeof(ConnectionNode *) *
cp->extra_params.tls.htable_capacity;
htable = fc_malloc(bytes);
memset(htable, 0, bytes);
htable->buckets = (ConnectionNode **)(htable + 1);
htable->cp = cp;
if ((*err_no=pthread_setspecific(cp->tls_key, htable)) != 0) {
logError("file: "__FILE__", line: %d, "
"pthread_setspecific fail, errno: %d, error info: %s",
__LINE__, *err_no, STRERROR(*err_no));
return NULL;
}
}
hash_code = fc_simple_hash(key.str, key.len);
bucket = htable->buckets + hash_code % cp->
extra_params.tls.htable_capacity;
if (*bucket == NULL) {
node = NULL;
} else if (FC_CONNECTION_SERVER_EQUAL1(*conn, *(*bucket)->conn)) {
node = *bucket;
} else {
node = (*bucket)->next;
while (node != NULL) {
if (FC_CONNECTION_SERVER_EQUAL1(*conn, *node->conn)) {
break;
}
node = node->next;
}
}
if (node != NULL) {
*err_no = 0;
return node->conn;
} else {
if ((ci=get_connection(cp, conn, &key, service_name,
err_no)) == NULL)
{
return NULL;
}
node = (ConnectionNode *)((char *)ci - sizeof(ConnectionNode));
node->next = *bucket;
*bucket = node;
*err_no = 0;
return ci;
}
}
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
const string_t *key, const bool bForce)
{
char key[INET6_ADDRSTRLEN + 8];
int key_len;
ConnectionManager *cm;
ConnectionNode *node;
conn_pool_get_key(conn, key, &key_len);
pthread_mutex_lock(&cp->lock);
cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len);
cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key->str, key->len);
pthread_mutex_unlock(&cp->lock);
if (cm == NULL)
{
@ -388,7 +537,7 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
return ENOENT;
}
node = (ConnectionNode *)(((char *)conn) - sizeof(ConnectionNode));
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
if (node->manager != cm)
{
logError("file: "__FILE__", line: %d, " \
@ -408,7 +557,8 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
__LINE__, conn->ip_addr, conn->port,
conn->sock, cm->total_count, cm->free_count);
conn_pool_disconnect_server(conn);
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
close_connection(conn);
fast_mblock_free_object(&cp->node_allocator, node);
node = cm->head;
@ -436,6 +586,64 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
return 0;
}
int conn_pool_close_connection_ex(ConnectionPool *cp,
ConnectionInfo *conn, const bool bForce)
{
string_t key;
uint32_t hash_code;
ConnectionNode **bucket;
ConnectionNode *previous;
ConnectionNode *node;
char key_buff[INET6_ADDRSTRLEN + 8];
ConnectionThreadHashTable *htable;
key.str = key_buff;
conn_pool_get_key(conn, key.str, &key.len);
if (!cp->extra_params.tls.enabled) {
return close_connection(cp, conn, &key, bForce);
}
if (!bForce) {
return 0;
}
htable = pthread_getspecific(cp->tls_key);
if (htable == NULL) {
return close_connection(cp, conn, &key, bForce);
}
hash_code = fc_simple_hash(key.str, key.len);
bucket = htable->buckets + hash_code % cp->
extra_params.tls.htable_capacity;
if (*bucket == NULL) {
node = NULL;
previous = NULL;
} else if ((*bucket)->conn == conn) {
node = *bucket;
previous = NULL;
} else {
previous = *bucket;
node = (*bucket)->next;
while (node != NULL) {
if (node->conn == conn) {
break;
}
previous = node;
node = node->next;
}
}
if (node != NULL) {
if (previous == NULL) {
*bucket = node->next;
} else {
previous->next = node->next;
}
}
return close_connection(cp, conn, &key, bForce);
}
static int _conn_count_walk(const int index, const HashData *data, void *args)
{
int *count;
@ -513,6 +721,7 @@ int conn_pool_parse_server_info(const char *pServerStr,
pServerInfo->socket_domain = AF_UNSPEC;
pServerInfo->sock = -1;
pServerInfo->comm_type = fc_comm_type_sock;
return 0;
}
@ -532,3 +741,157 @@ int conn_pool_load_server_info(IniContext *pIniContext, const char *filename,
return conn_pool_parse_server_info(pServerStr, pServerInfo, default_port);
}
#define API_PREFIX_NAME "fast_rdma_client_"
#define LOAD_API(callbacks, fname) \
do { \
callbacks.fname = dlsym(dlhandle, API_PREFIX_NAME#fname); \
if (callbacks.fname == NULL) { \
logError("file: "__FILE__", line: %d, " \
"dlsym api %s fail, error info: %s", \
__LINE__, API_PREFIX_NAME#fname, dlerror()); \
return ENOENT; \
} \
} while (0)
int conn_pool_global_init_for_rdma()
{
const char *library = "libfastrdma.so";
void *dlhandle;
if (g_connection_callbacks.inited) {
return 0;
}
dlhandle = dlopen(library, RTLD_LAZY);
if (dlhandle == NULL) {
logError("file: "__FILE__", line: %d, "
"dlopen %s fail, error info: %s",
__LINE__, library, dlerror());
return EFAULT;
}
LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma],
make_connection);
LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma],
close_connection);
LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma],
is_connected);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, set_busy_polling);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, alloc_pd);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_connection_size);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, init_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, make_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, close_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, destroy_connection);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, is_connected);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, send_done);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_recv_buffer);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf1);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf2);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_iov);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_mix);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, send_by_buf1);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, recv_data);
LOAD_API(G_RDMA_CONNECTION_CALLBACKS, post_recv);
g_connection_callbacks.inited = true;
return 0;
}
ConnectionInfo *conn_pool_alloc_connection_ex(
const FCCommunicationType comm_type,
const int extra_data_size,
const ConnectionExtraParams *extra_params,
int *err_no)
{
ConnectionInfo *conn;
int bytes;
if (comm_type == fc_comm_type_rdma) {
bytes = sizeof(ConnectionInfo) + extra_data_size +
G_RDMA_CONNECTION_CALLBACKS.get_connection_size();
} else {
bytes = sizeof(ConnectionInfo) + extra_data_size;
}
if ((conn=fc_malloc(bytes)) == NULL) {
*err_no = ENOMEM;
return NULL;
}
memset(conn, 0, bytes);
if (comm_type == fc_comm_type_rdma) {
conn->arg1 = conn->args + extra_data_size;
if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection(
conn, extra_params->rdma.double_buffers,
extra_params->rdma.buffer_size,
extra_params->rdma.pd)) != 0)
{
free(conn);
return NULL;
}
} else {
*err_no = 0;
}
conn->comm_type = comm_type;
conn->sock = -1;
return conn;
}
int conn_pool_set_rdma_extra_params_ex(ConnectionExtraParams *extra_params,
struct fc_server_config *server_cfg, const int server_group_index,
const bool double_buffers)
{
const int padding_size = 1024;
FCServerGroupInfo *server_group;
FCServerInfo *first_server;
int result;
if ((server_group=fc_server_get_group_by_index(server_cfg,
server_group_index)) == NULL)
{
return ENOENT;
}
switch (server_cfg->connection_thread_local) {
case fc_connection_thread_local_auto:
if (server_group->comm_type == fc_comm_type_sock) {
extra_params->tls.enabled = false;
} else {
extra_params->tls.enabled = (FC_SID_SERVER_COUNT(
*server_cfg) <= 64);
}
break;
case fc_connection_thread_local_yes:
extra_params->tls.enabled = true;
break;
default:
extra_params->tls.enabled = false;
break;
}
if (extra_params->tls.enabled) {
extra_params->tls.htable_capacity = fc_ceil_prime(
FC_SID_SERVER_COUNT(*server_cfg));
} else {
extra_params->tls.htable_capacity = 0;
}
if (server_group->comm_type == fc_comm_type_sock) {
extra_params->rdma.double_buffers = false;
extra_params->rdma.buffer_size = 0;
extra_params->rdma.pd = NULL;
return 0;
} else {
first_server = FC_SID_SERVERS(*server_cfg);
extra_params->rdma.double_buffers = double_buffers;
extra_params->rdma.buffer_size = server_cfg->buffer_size + padding_size;
extra_params->rdma.pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS.
alloc_pd, &first_server->group_addrs[server_group_index].
address_array, &result);
return result;
}
}

View File

@ -40,16 +40,104 @@ extern "C" {
(strcmp((conn1).ip_addr, (conn2).ip_addr) == 0 && \
(conn1).port == (conn2).port)
typedef struct
{
int sock;
uint16_t port;
typedef enum {
fc_comm_type_sock = 0,
fc_comm_type_rdma,
fc_comm_type_both
} FCCommunicationType;
typedef struct {
int sock;
uint16_t port;
short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect
FCCommunicationType comm_type;
bool validate_flag; //for connection pool
char ip_addr[IP_ADDRESS_SIZE];
char ip_addr[IP_ADDRESS_SIZE];
void *arg1; //for RDMA
char args[0]; //for extra data
} ConnectionInfo;
struct fc_server_config;
struct ibv_pd;
typedef void (*fc_set_busy_polling_callback)(const bool busy_polling);
typedef struct ibv_pd *(*fc_alloc_pd_callback)(const char **ip_addrs,
const int count, const int port);
typedef int (*fc_get_connection_size_callback)();
typedef int (*fc_init_connection_callback)(ConnectionInfo *conn,
const bool double_buffers, const int buffer_size, void *arg);
typedef int (*fc_make_connection_callback)(ConnectionInfo *conn,
const char *service_name, const int timeout_ms,
const char *bind_ipaddr, const bool log_connect_error);
typedef bool (*fc_is_connected_callback)(ConnectionInfo *conn);
typedef bool (*fc_send_done_callback)(ConnectionInfo *conn);
typedef void (*fc_close_connection_callback)(ConnectionInfo *conn);
typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn);
typedef BufferInfo *(*fc_rdma_get_recv_buffer_callback)(ConnectionInfo *conn);
typedef int (*fc_rdma_request_by_buf1_callback)(ConnectionInfo *conn,
const char *data, const int length, const int timeout_ms);
typedef int (*fc_rdma_request_by_buf2_callback)(ConnectionInfo *conn,
const char *data1, const int length1, const char *data2,
const int length2, const int timeout_ms);
typedef int (*fc_rdma_request_by_iov_callback)(ConnectionInfo *conn,
const struct iovec *iov, const int iovcnt,
const int timeout_ms);
typedef int (*fc_rdma_request_by_mix_callback)(ConnectionInfo *conn,
const char *data, const int length, const struct iovec *iov,
const int iovcnt, const int timeout_ms);
typedef int (*fc_rdma_send_by_buf1_callback)(ConnectionInfo *conn,
const char *data, const int length);
typedef int (*fc_rdma_recv_data_callback)(ConnectionInfo *conn,
const bool call_post_recv, const int timeout_ms);
typedef int (*fc_rdma_post_recv_callback)(ConnectionInfo *conn);
typedef struct {
fc_make_connection_callback make_connection;
fc_close_connection_callback close_connection;
fc_is_connected_callback is_connected;
} CommonConnectionCallbacks;
typedef struct {
fc_set_busy_polling_callback set_busy_polling;
fc_alloc_pd_callback alloc_pd;
fc_get_connection_size_callback get_connection_size;
fc_init_connection_callback init_connection;
fc_make_connection_callback make_connection;
fc_close_connection_callback close_connection;
fc_destroy_connection_callback destroy_connection;
fc_is_connected_callback is_connected;
fc_send_done_callback send_done;
fc_rdma_get_recv_buffer_callback get_recv_buffer;
fc_rdma_request_by_buf1_callback request_by_buf1;
fc_rdma_request_by_buf2_callback request_by_buf2;
fc_rdma_request_by_iov_callback request_by_iov;
fc_rdma_request_by_mix_callback request_by_mix;
fc_rdma_send_by_buf1_callback send_by_buf1;
fc_rdma_recv_data_callback recv_data;
fc_rdma_post_recv_callback post_recv;
} RDMAConnectionCallbacks;
typedef struct {
bool inited;
CommonConnectionCallbacks common_callbacks[2];
RDMAConnectionCallbacks rdma_callbacks;
} ConnectionCallbacks;
typedef struct {
struct {
bool enabled;
int htable_capacity;
} tls; //for thread local
struct {
bool double_buffers;
int buffer_size;
struct ibv_pd *pd;
} rdma;
} ConnectionExtraParams;
typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args);
struct tagConnectionManager;
@ -68,10 +156,17 @@ typedef struct tagConnectionManager {
pthread_mutex_t lock;
} ConnectionManager;
struct tagConnectionPool;
typedef struct {
ConnectionNode **buckets;
struct tagConnectionPool *cp;
} ConnectionThreadHashTable;
typedef struct tagConnectionPool {
HashArray hash_array; //key is ip:port, value is ConnectionManager
HashArray hash_array; //key is ip-port, value is ConnectionManager
pthread_mutex_t lock;
int connect_timeout;
int connect_timeout_ms;
int max_count_per_entry; //0 means no limit
/*
@ -93,8 +188,19 @@ typedef struct tagConnectionPool {
fc_connection_callback_func func;
void *args;
} validate_callback;
int extra_data_size;
ConnectionExtraParams extra_params;
pthread_key_t tls_key; //for ConnectionThreadHashTable
} ConnectionPool;
extern ConnectionCallbacks g_connection_callbacks;
int conn_pool_global_init_for_rdma();
#define G_COMMON_CONNECTION_CALLBACKS g_connection_callbacks.common_callbacks
#define G_RDMA_CONNECTION_CALLBACKS g_connection_callbacks.rdma_callbacks
/**
* init ex function
* parameters:
@ -109,6 +215,7 @@ typedef struct tagConnectionPool {
* validate_func: the validate connection callback
* validate_args: the args for validate connection callback
* extra_data_size: the extra data size of connection
* extra_params: for RDMA
* return 0 for success, != 0 for error
*/
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
@ -116,7 +223,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int socket_domain, const int htable_init_capacity,
fc_connection_callback_func connect_done_func, void *connect_done_args,
fc_connection_callback_func validate_func, void *validate_args,
const int extra_data_size);
const int extra_data_size, const ConnectionExtraParams *extra_params);
/**
* init ex function
@ -134,9 +241,10 @@ static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
{
const int htable_init_capacity = 0;
const int extra_data_size = 0;
const ConnectionExtraParams *extra_params = NULL;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity,
NULL, NULL, NULL, NULL, extra_data_size);
NULL, NULL, NULL, NULL, extra_data_size, extra_params);
}
/**
@ -154,9 +262,10 @@ static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout,
const int socket_domain = AF_UNSPEC;
const int htable_init_capacity = 0;
const int extra_data_size = 0;
const ConnectionExtraParams *extra_params = NULL;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity,
NULL, NULL, NULL, NULL, extra_data_size);
NULL, NULL, NULL, NULL, extra_data_size, extra_params);
}
/**
@ -193,82 +302,84 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp,
* bForce: set true to close the socket, else only push back to connection pool
* return 0 for success, != 0 for error
*/
int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
const bool bForce);
int conn_pool_close_connection_ex(ConnectionPool *cp,
ConnectionInfo *conn, const bool bForce);
/**
* disconnect from the server
* parameters:
* pConnection: the connection
* conn: the connection
* return 0 for success, != 0 for error
*/
void conn_pool_disconnect_server(ConnectionInfo *pConnection);
void conn_pool_disconnect_server(ConnectionInfo *conn);
bool conn_pool_is_connected(ConnectionInfo *conn);
/**
* connect to the server
* parameters:
* pConnection: the connection
* service_name: the service name to log
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in milliseconds
* bind_ipaddr: the ip address to bind, NULL or empty for any
* log_connect_error: if log error info when connect fail
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 for success, != 0 for error
*/
int conn_pool_connect_server_ex1(ConnectionInfo *conn,
const char *service_name, const int connect_timeout,
const char *service_name, const int connect_timeout_ms,
const char *bind_ipaddr, const bool log_connect_error);
/**
* connect to the server
* parameters:
* pConnection: the connection
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in milliseconds
* bind_ipaddr: the ip address to bind, NULL or empty for any
* log_connect_error: if log error info when connect fail
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 for success, != 0 for error
*/
static inline int conn_pool_connect_server_ex(ConnectionInfo *pConnection,
const int connect_timeout, const char *bind_ipaddr,
const int connect_timeout_ms, const char *bind_ipaddr,
const bool log_connect_error)
{
const char *service_name = NULL;
return conn_pool_connect_server_ex1(pConnection, service_name,
connect_timeout, bind_ipaddr, log_connect_error);
connect_timeout_ms, bind_ipaddr, log_connect_error);
}
/**
* connect to the server
* parameters:
* pConnection: the connection
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in seconds
* NOTE: pConnection->sock will be closed when it >= 0 before connect
* return 0 for success, != 0 for error
*/
static inline int conn_pool_connect_server(ConnectionInfo *pConnection,
const int connect_timeout)
const int connect_timeout_ms)
{
const char *service_name = NULL;
const char *bind_ipaddr = NULL;
return conn_pool_connect_server_ex1(pConnection, service_name,
connect_timeout, bind_ipaddr, true);
connect_timeout_ms, bind_ipaddr, true);
}
/**
* connect to the server
* parameters:
* pConnection: the connection
* connect_timeout: the connect timeout in seconds
* connect_timeout_ms: the connect timeout in seconds
* return 0 for success, != 0 for error
*/
static inline int conn_pool_connect_server_anyway(ConnectionInfo *pConnection,
const int connect_timeout)
const int connect_timeout_ms)
{
const char *service_name = NULL;
const char *bind_ipaddr = NULL;
pConnection->sock = -1;
return conn_pool_connect_server_ex1(pConnection, service_name,
connect_timeout, bind_ipaddr, true);
connect_timeout_ms, bind_ipaddr, true);
}
/**
@ -347,6 +458,55 @@ static inline int conn_pool_compare_ip_and_port(const char *ip1,
return port1 - port2;
}
ConnectionInfo *conn_pool_alloc_connection_ex(
const FCCommunicationType comm_type,
const int extra_data_size,
const ConnectionExtraParams *extra_params,
int *err_no);
static inline ConnectionInfo *conn_pool_alloc_connection(
const FCCommunicationType comm_type,
const ConnectionExtraParams *extra_params,
int *err_no)
{
const int extra_data_size = 0;
return conn_pool_alloc_connection_ex(comm_type,
extra_data_size, extra_params, err_no);
}
static inline void conn_pool_free_connection(ConnectionInfo *conn)
{
free(conn);
}
int conn_pool_set_rdma_extra_params_ex(ConnectionExtraParams *extra_params,
struct fc_server_config *server_cfg, const int server_group_index,
const bool double_buffers);
static inline int conn_pool_set_rdma_extra_params(
ConnectionExtraParams *extra_params,
struct fc_server_config *server_cfg,
const int server_group_index)
{
const bool double_buffers = false;
return conn_pool_set_rdma_extra_params_ex(extra_params,
server_cfg, server_group_index, double_buffers);
}
static inline const char *fc_comm_type_str(const FCCommunicationType type)
{
switch (type) {
case fc_comm_type_sock:
return "socket";
case fc_comm_type_rdma:
return "rdma";
case fc_comm_type_both:
return "both";
default:
return "unkown";
}
}
#ifdef __cplusplus
}
#endif

View File

@ -25,652 +25,188 @@
#include "fc_memory.h"
#include "fast_task_queue.h"
static struct fast_task_queue g_free_queue;
struct mpool_node {
struct fast_task_info *blocks;
struct fast_task_info *last_block; //last block
struct mpool_node *next;
};
struct mpool_chain {
struct mpool_node *head;
struct mpool_node *tail;
};
static struct mpool_chain g_mpool = {NULL, NULL};
int task_queue_init(struct fast_task_queue *pQueue)
static int task_alloc_init(struct fast_task_info *task,
struct fast_task_queue *queue)
{
int result;
if ((result=init_pthread_lock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_lock fail, errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
pQueue->head = NULL;
pQueue->tail = NULL;
return 0;
}
static void free_mpool(struct mpool_node *mpool, char *end)
{
char *pt;
for (pt=(char *)mpool->blocks; pt < end; pt += g_free_queue.block_size)
{
free(((struct fast_task_info *)pt)->data);
}
free(mpool->blocks);
free(mpool);
}
static struct mpool_node *malloc_mpool(const int total_alloc_size)
{
struct fast_task_info *pTask;
char *p;
char *pCharEnd;
struct mpool_node *mpool;
mpool = (struct mpool_node *)fc_malloc(sizeof(struct mpool_node));
if (mpool == NULL)
{
return NULL;
}
mpool->next = NULL;
mpool->blocks = (struct fast_task_info *)fc_malloc(total_alloc_size);
if (mpool->blocks == NULL)
{
free(mpool);
return NULL;
}
memset(mpool->blocks, 0, total_alloc_size);
pCharEnd = ((char *)mpool->blocks) + total_alloc_size;
for (p=(char *)mpool->blocks; p<pCharEnd; p += g_free_queue.block_size)
{
pTask = (struct fast_task_info *)p;
pTask->size = g_free_queue.min_buff_size;
pTask->arg = p + ALIGNED_TASK_INFO_SIZE;
if (g_free_queue.malloc_whole_block)
{
pTask->data = (char *)pTask->arg + \
g_free_queue.arg_size;
}
else
{
pTask->data = (char *)fc_malloc(pTask->size);
if (pTask->data == NULL)
{
free_mpool(mpool, p);
return NULL;
}
}
if (g_free_queue.init_callback != NULL)
{
if (g_free_queue.init_callback(pTask) != 0)
{
free_mpool(mpool, p);
return NULL;
}
}
}
mpool->last_block = (struct fast_task_info *)
(pCharEnd - g_free_queue.block_size);
for (p=(char *)mpool->blocks; p<(char *)mpool->last_block;
p += g_free_queue.block_size)
{
pTask = (struct fast_task_info *)p;
pTask->next = (struct fast_task_info *)(p + g_free_queue.block_size);
}
mpool->last_block->next = NULL;
return mpool;
}
int free_queue_init_ex2(const int max_connections, const int init_connections,
const int alloc_task_once, const int min_buff_size,
const int max_buff_size, const int arg_size,
TaskInitCallback init_callback)
{
#define MAX_DATA_SIZE (256 * 1024 * 1024)
int64_t total_size;
struct mpool_node *mpool;
int alloc_size;
int alloc_once;
int result;
int loop_count;
int aligned_min_size;
int aligned_max_size;
int aligned_arg_size;
rlim_t max_data_size;
if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_lock fail, errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
aligned_min_size = MEM_ALIGN(min_buff_size);
aligned_max_size = MEM_ALIGN(max_buff_size);
aligned_arg_size = MEM_ALIGN(arg_size);
g_free_queue.block_size = ALIGNED_TASK_INFO_SIZE + aligned_arg_size;
alloc_size = g_free_queue.block_size * init_connections;
if (aligned_max_size > aligned_min_size)
{
total_size = alloc_size;
g_free_queue.malloc_whole_block = false;
max_data_size = 0;
}
else
{
struct rlimit rlimit_data;
if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0)
{
logError("file: "__FILE__", line: %d, " \
"call getrlimit fail, " \
"errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EPERM;
}
if (rlimit_data.rlim_cur == RLIM_INFINITY)
{
max_data_size = MAX_DATA_SIZE;
}
else
{
max_data_size = rlimit_data.rlim_cur;
if (max_data_size > MAX_DATA_SIZE)
{
max_data_size = MAX_DATA_SIZE;
}
}
if (max_data_size >= (int64_t)(g_free_queue.block_size +
aligned_min_size) * (int64_t)init_connections)
{
total_size = alloc_size + (int64_t)aligned_min_size *
init_connections;
g_free_queue.malloc_whole_block = true;
g_free_queue.block_size += aligned_min_size;
}
else
{
total_size = alloc_size;
g_free_queue.malloc_whole_block = false;
max_data_size = 0;
}
}
g_free_queue.max_connections = max_connections;
g_free_queue.alloc_connections = init_connections;
if (alloc_task_once <= 0)
{
g_free_queue.alloc_task_once = 256;
alloc_once = MAX_DATA_SIZE / g_free_queue.block_size;
if (g_free_queue.alloc_task_once > alloc_once)
{
g_free_queue.alloc_task_once = alloc_once > 0 ? alloc_once : 1;
}
}
else
{
g_free_queue.alloc_task_once = alloc_task_once;
}
g_free_queue.min_buff_size = aligned_min_size;
g_free_queue.max_buff_size = aligned_max_size;
g_free_queue.arg_size = aligned_arg_size;
g_free_queue.init_callback = init_callback;
logDebug("file: "__FILE__", line: %d, "
"max_connections: %d, init_connections: %d, alloc_task_once: %d, "
"min_buff_size: %d, max_buff_size: %d, block_size: %d, "
"arg_size: %d, max_data_size: %d, total_size: %"PRId64,
__LINE__, max_connections, init_connections,
g_free_queue.alloc_task_once, aligned_min_size, aligned_max_size,
g_free_queue.block_size, aligned_arg_size, (int)max_data_size, total_size);
if ((!g_free_queue.malloc_whole_block) || (total_size <= max_data_size))
{
loop_count = 1;
mpool = malloc_mpool(total_size);
if (mpool == NULL)
{
return errno != 0 ? errno : ENOMEM;
}
g_mpool.head = mpool;
g_mpool.tail = mpool;
}
else
{
int remain_count;
int alloc_count;
int current_alloc_size;
loop_count = 0;
remain_count = init_connections;
alloc_once = max_data_size / g_free_queue.block_size;
while (remain_count > 0)
{
alloc_count = (remain_count > alloc_once) ?
alloc_once : remain_count;
current_alloc_size = g_free_queue.block_size * alloc_count;
mpool = malloc_mpool(current_alloc_size);
if (mpool == NULL)
{
free_queue_destroy();
return errno != 0 ? errno : ENOMEM;
}
if (g_mpool.tail == NULL)
{
g_mpool.head = mpool;
}
else
{
g_mpool.tail->next = mpool;
g_mpool.tail->last_block->next = mpool->blocks; //link previous mpool to current
}
g_mpool.tail = mpool;
remain_count -= alloc_count;
loop_count++;
}
logDebug("file: "__FILE__", line: %d, " \
"alloc_once: %d", __LINE__, alloc_once);
}
logDebug("file: "__FILE__", line: %d, " \
"malloc task info as whole: %d, malloc loop count: %d", \
__LINE__, g_free_queue.malloc_whole_block, loop_count);
if (g_mpool.head != NULL)
{
g_free_queue.head = g_mpool.head->blocks;
g_free_queue.tail = g_mpool.tail->last_block;
}
return 0;
}
void free_queue_destroy()
{
struct mpool_node *mpool;
struct mpool_node *mp;
if (g_mpool.head == NULL)
{
return;
}
if (!g_free_queue.malloc_whole_block)
{
char *p;
char *pCharEnd;
struct fast_task_info *pTask;
mpool = g_mpool.head;
while (mpool != NULL)
{
pCharEnd = (char *)mpool->last_block + g_free_queue.block_size;
for (p=(char *)mpool->blocks; p<pCharEnd; p += g_free_queue.block_size)
{
pTask = (struct fast_task_info *)p;
if (pTask->data != NULL)
{
free(pTask->data);
pTask->data = NULL;
}
}
mpool = mpool->next;
}
}
mpool = g_mpool.head;
while (mpool != NULL)
{
mp = mpool;
mpool = mpool->next;
free(mp->blocks);
free(mp);
}
g_mpool.head = g_mpool.tail = NULL;
pthread_mutex_destroy(&(g_free_queue.lock));
}
static int free_queue_realloc()
{
struct mpool_node *mpool;
struct fast_task_info *head;
struct fast_task_info *tail;
int remain_count;
int alloc_count;
int current_alloc_size;
head = tail = NULL;
remain_count = g_free_queue.max_connections -
g_free_queue.alloc_connections;
alloc_count = (remain_count > g_free_queue.alloc_task_once) ?
g_free_queue.alloc_task_once : remain_count;
if (alloc_count > 0)
{
current_alloc_size = g_free_queue.block_size * alloc_count;
mpool = malloc_mpool(current_alloc_size);
if (mpool == NULL)
{
task->arg = (char *)task + ALIGNED_TASK_INFO_SIZE + queue->padding_size;
task->send.ptr = &task->send.holder;
task->send.ptr->size = queue->min_buff_size;
if (queue->malloc_whole_block) {
task->send.ptr->data = (char *)task->arg + queue->arg_size;
} else {
task->send.ptr->data = (char *)fc_malloc(task->send.ptr->size);
if (task->send.ptr->data == NULL) {
return ENOMEM;
}
if (g_mpool.tail == NULL)
{
g_mpool.head = mpool;
}
else
{
g_mpool.tail->next = mpool;
}
g_mpool.tail = mpool;
head = mpool->blocks;
tail = mpool->last_block;
remain_count -= alloc_count;
}
else {
return ENOSPC;
}
}
if (g_free_queue.head == NULL)
{
g_free_queue.head = head;
if (queue->double_buffers) {
task->recv.ptr = &task->recv.holder;
task->recv.ptr->size = queue->min_buff_size;
task->recv.ptr->data = (char *)fc_malloc(task->recv.ptr->size);
if (task->recv.ptr->data == NULL) {
return ENOMEM;
}
} else {
task->recv.ptr = &task->send.holder;
}
if (g_free_queue.tail != NULL)
{
g_free_queue.tail->next = head;
task->free_queue = queue;
if (queue->init_callback != NULL) {
return queue->init_callback(task);
}
g_free_queue.tail = tail;
g_free_queue.alloc_connections += alloc_count;
logDebug("file: "__FILE__", line: %d, "
"alloc_connections: %d, realloc %d elements", __LINE__,
g_free_queue.alloc_connections, alloc_count);
return 0;
}
struct fast_task_info *free_queue_pop()
int free_queue_init_ex2(struct fast_task_queue *queue, const char *name,
const bool double_buffers, const int max_connections,
const int alloc_task_once, const int min_buff_size,
const int max_buff_size, const int padding_size,
const int arg_size, TaskInitCallback init_callback)
{
struct fast_task_info *pTask;
int i;
#define MAX_DATA_SIZE (256 * 1024 * 1024)
int alloc_once;
int aligned_min_size;
int aligned_max_size;
int aligned_padding_size;
int aligned_arg_size;
rlim_t max_data_size;
char aname[64];
if ((pTask=task_queue_pop(&g_free_queue)) != NULL)
{
return pTask;
aligned_min_size = MEM_ALIGN(min_buff_size);
aligned_max_size = MEM_ALIGN(max_buff_size);
aligned_padding_size = MEM_ALIGN(padding_size);
aligned_arg_size = MEM_ALIGN(arg_size);
queue->block_size = ALIGNED_TASK_INFO_SIZE +
aligned_padding_size + aligned_arg_size;
if (alloc_task_once <= 0) {
alloc_once = FC_MIN(MAX_DATA_SIZE / queue->block_size, 256);
if (alloc_once == 0) {
alloc_once = 1;
}
} else {
alloc_once = alloc_task_once;
}
if (g_free_queue.alloc_connections >= g_free_queue.max_connections)
{
return NULL;
}
if (aligned_max_size > aligned_min_size) {
queue->malloc_whole_block = false;
max_data_size = 0;
} else {
struct rlimit rlimit_data;
for (i=0; i<10; i++)
{
pthread_mutex_lock(&g_free_queue.lock);
if (g_free_queue.alloc_connections >= g_free_queue.max_connections)
{
if (g_free_queue.head == NULL)
{
pthread_mutex_unlock(&g_free_queue.lock);
return NULL;
if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) {
logError("file: "__FILE__", line: %d, "
"call getrlimit fail, "
"errno: %d, error info: %s",
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EPERM;
}
if (rlimit_data.rlim_cur == RLIM_INFINITY) {
max_data_size = MAX_DATA_SIZE;
} else {
max_data_size = rlimit_data.rlim_cur;
if (max_data_size > MAX_DATA_SIZE) {
max_data_size = MAX_DATA_SIZE;
}
}
else
{
if (g_free_queue.head == NULL && free_queue_realloc() != 0)
{
pthread_mutex_unlock(&g_free_queue.lock);
return NULL;
}
}
pthread_mutex_unlock(&g_free_queue.lock);
if ((pTask=task_queue_pop(&g_free_queue)) != NULL)
if (max_data_size >= (int64_t)(queue->block_size +
aligned_min_size) * (int64_t)alloc_once)
{
return pTask;
queue->malloc_whole_block = true;
queue->block_size += aligned_min_size;
} else {
queue->malloc_whole_block = false;
max_data_size = 0;
}
}
return NULL;
queue->double_buffers = double_buffers;
queue->min_buff_size = aligned_min_size;
queue->max_buff_size = aligned_max_size;
queue->padding_size = aligned_padding_size;
queue->arg_size = aligned_arg_size;
queue->init_callback = init_callback;
queue->release_callback = NULL;
/*
logInfo("file: "__FILE__", line: %d, [%s] double_buffers: %d, "
"max_connections: %d, alloc_once: %d, malloc_whole_block: %d, "
"min_buff_size: %d, max_buff_size: %d, block_size: %d, "
"padding_size: %d, arg_size: %d, max_data_size: %"PRId64,
__LINE__, name, double_buffers, max_connections, alloc_once,
queue->malloc_whole_block, aligned_min_size, aligned_max_size,
queue->block_size, aligned_padding_size, aligned_arg_size,
(int64_t)max_data_size);
*/
snprintf(aname, sizeof(aname), "%s-task", name);
return fast_mblock_init_ex1(&queue->allocator, aname,
queue->block_size, alloc_once, max_connections,
(fast_mblock_object_init_func)task_alloc_init,
queue, true);
}
static int _realloc_buffer(struct fast_task_info *pTask, const int new_size,
const bool copy_data)
void free_queue_destroy(struct fast_task_queue *queue)
{
fast_mblock_destroy(&queue->allocator);
}
static int _realloc_buffer(struct fast_net_buffer *buffer,
const int new_size, const bool copy_data)
{
char *new_buff;
new_buff = (char *)fc_malloc(new_size);
if (new_buff == NULL)
{
if (new_buff == NULL) {
return ENOMEM;
}
else
{
if (copy_data && pTask->offset > 0) {
memcpy(new_buff, pTask->data, pTask->offset);
}
free(pTask->data);
pTask->size = new_size;
pTask->data = new_buff;
return 0;
if (copy_data && buffer->offset > 0) {
memcpy(new_buff, buffer->data, buffer->offset);
}
free(buffer->data);
buffer->size = new_size;
buffer->data = new_buff;
return 0;
}
int free_queue_push(struct fast_task_info *pTask)
void free_queue_push(struct fast_task_info *task)
{
int result;
if (task->free_queue->release_callback != NULL) {
task->free_queue->release_callback(task);
}
*(pTask->client_ip) = '\0';
pTask->length = 0;
pTask->offset = 0;
pTask->req_count = 0;
*(task->client_ip) = '\0';
task->send.ptr->length = 0;
task->send.ptr->offset = 0;
task->req_count = 0;
if (task->send.ptr->size > task->free_queue->min_buff_size) {//need thrink
_realloc_buffer(task->send.ptr, task->free_queue->min_buff_size, false);
}
if (pTask->size > g_free_queue.min_buff_size) //need thrink
{
_realloc_buffer(pTask, g_free_queue.min_buff_size, false);
}
if (task->free_queue->double_buffers) {
task->recv.ptr->length = 0;
task->recv.ptr->offset = 0;
if (task->recv.ptr->size > task->free_queue->min_buff_size) {
_realloc_buffer(task->recv.ptr, task->free_queue->
min_buff_size, false);
}
}
if ((result=pthread_mutex_lock(&g_free_queue.lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
pTask->next = g_free_queue.head;
g_free_queue.head = pTask;
if (g_free_queue.tail == NULL)
{
g_free_queue.tail = pTask;
}
if ((result=pthread_mutex_unlock(&g_free_queue.lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return result;
fast_mblock_free_object(&task->free_queue->allocator, task);
}
int free_queue_count()
{
return task_queue_count(&g_free_queue);
}
int free_queue_alloc_connections()
{
return g_free_queue.alloc_connections;
}
int free_queue_set_buffer_size(struct fast_task_info *pTask,
const int expect_size)
{
return task_queue_set_buffer_size(&g_free_queue, pTask, expect_size);
}
int free_queue_realloc_buffer(struct fast_task_info *pTask,
const int expect_size)
{
return task_queue_realloc_buffer(&g_free_queue, pTask, expect_size);
}
int free_queue_set_max_buffer_size(struct fast_task_info *pTask)
{
return task_queue_set_buffer_size(&g_free_queue, pTask,
g_free_queue.max_buff_size);
}
int free_queue_realloc_max_buffer(struct fast_task_info *pTask)
{
return task_queue_realloc_buffer(&g_free_queue, pTask,
g_free_queue.max_buff_size);
}
int task_queue_push(struct fast_task_queue *pQueue, \
struct fast_task_info *pTask)
{
int result;
if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
pTask->next = NULL;
if (pQueue->tail == NULL)
{
pQueue->head = pTask;
}
else
{
pQueue->tail->next = pTask;
}
pQueue->tail = pTask;
if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return 0;
}
struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue)
{
struct fast_task_info *pTask;
int result;
if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return NULL;
}
pTask = pQueue->head;
if (pTask != NULL)
{
pQueue->head = pTask->next;
if (pQueue->head == NULL)
{
pQueue->tail = NULL;
}
}
if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return pTask;
}
int task_queue_count(struct fast_task_queue *pQueue)
{
struct fast_task_info *pTask;
int count;
int result;
if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return 0;
}
count = 0;
pTask = pQueue->head;
while (pTask != NULL)
{
pTask = pTask->next;
count++;
}
if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return count;
}
int task_queue_get_new_buffer_size(const int min_buff_size,
int free_queue_get_new_buffer_size(const int min_buff_size,
const int max_buff_size, const int expect_size, int *new_size)
{
if (min_buff_size == max_buff_size)
{
if (min_buff_size == max_buff_size) {
logError("file: "__FILE__", line: %d, "
"can't change buffer size because NOT supported", __LINE__);
return EOPNOTSUPP;
}
if (expect_size > max_buff_size)
{
if (expect_size > max_buff_size) {
logError("file: "__FILE__", line: %d, "
"can't change buffer size because expect buffer size: %d "
"exceeds max buffer size: %d", __LINE__, expect_size,
@ -679,14 +215,11 @@ int task_queue_get_new_buffer_size(const int min_buff_size,
}
*new_size = min_buff_size;
if (expect_size > min_buff_size)
{
while (*new_size < expect_size)
{
if (expect_size > min_buff_size) {
while (*new_size < expect_size) {
*new_size *= 2;
}
if (*new_size > max_buff_size)
{
if (*new_size > max_buff_size) {
*new_size = max_buff_size;
}
}
@ -694,41 +227,43 @@ int task_queue_get_new_buffer_size(const int min_buff_size,
return 0;
}
#define _get_new_buffer_size(pQueue, expect_size, new_size) \
task_queue_get_new_buffer_size(pQueue->min_buff_size, \
pQueue->max_buff_size, expect_size, new_size)
#define _get_new_buffer_size(queue, expect_size, new_size) \
free_queue_get_new_buffer_size(queue->min_buff_size, \
queue->max_buff_size, expect_size, new_size)
int task_queue_set_buffer_size(struct fast_task_queue *pQueue,
struct fast_task_info *pTask, const int expect_size)
int free_queue_set_buffer_size(struct fast_task_info *task,
struct fast_net_buffer *buffer, const int expect_size)
{
int result;
int new_size;
if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) {
if ((result=_get_new_buffer_size(task->free_queue,
expect_size, &new_size)) != 0)
{
return result;
}
if (pTask->size == new_size) //do NOT need change buffer size
{
if (buffer->size == new_size) { //do NOT need change buffer size
return 0;
}
return _realloc_buffer(pTask, new_size, false);
return _realloc_buffer(buffer, new_size, false);
}
int task_queue_realloc_buffer(struct fast_task_queue *pQueue,
struct fast_task_info *pTask, const int expect_size)
int free_queue_realloc_buffer(struct fast_task_info *task,
struct fast_net_buffer *buffer, const int expect_size)
{
int result;
int new_size;
if (pTask->size >= expect_size) //do NOT need change buffer size
{
if (buffer->size >= expect_size) { //do NOT need change buffer size
return 0;
}
if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) {
if ((result=_get_new_buffer_size(task->free_queue,
expect_size, &new_size)) != 0)
{
return result;
}
return _realloc_buffer(pTask, new_size, true);
return _realloc_buffer(buffer, new_size, true);
}

View File

@ -23,8 +23,10 @@
#include <string.h>
#include <pthread.h>
#include "common_define.h"
#include "fc_list.h"
#include "ioevent.h"
#include "fast_timer.h"
#include "fast_mblock.h"
#define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0]
#define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1]
@ -35,13 +37,15 @@ struct nio_thread_data;
struct fast_task_info;
typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData);
typedef int (*TaskFinishCallback) (struct fast_task_info *pTask);
typedef void (*TaskCleanUpCallback) (struct fast_task_info *pTask);
typedef int (*TaskInitCallback)(struct fast_task_info *pTask);
typedef int (*TaskFinishCallback) (struct fast_task_info *task);
typedef void (*TaskCleanUpCallback) (struct fast_task_info *task);
typedef int (*TaskInitCallback)(struct fast_task_info *task);
typedef void (*TaskReleaseCallback)(struct fast_task_info *task);
typedef void (*IOEventCallback) (int sock, short event, void *arg);
typedef int (*TaskContinueCallback)(struct fast_task_info *task);
struct sf_network_handler;
struct fast_task_info;
typedef struct ioevent_entry
@ -58,6 +62,7 @@ struct nio_thread_data
int pipe_fds[2]; //for notify
struct fast_task_info *deleted_list; //tasks for cleanup
ThreadLoopCallback thread_loop_callback;
ThreadLoopCallback busy_polling_callback;
void *arg; //extra argument pointer
struct {
struct fast_task_info *head;
@ -69,6 +74,9 @@ struct nio_thread_data
bool enabled;
volatile int64_t counter;
} notify; //for thread notify
int timeout_ms; //for restore
struct fc_list_head polling_queue; //for RDMA busy polling
};
struct ioevent_notify_entry
@ -77,15 +85,29 @@ struct ioevent_notify_entry
struct nio_thread_data *thread_data;
};
struct fast_net_buffer
{
int size; //alloc size
int length; //data length
int offset; //current offset
char *data; //buffer for write or read
};
struct fast_net_buffer_wrapper
{
struct fast_net_buffer holder;
struct fast_net_buffer *ptr;
};
struct fast_task_queue;
struct fast_task_info
{
IOEventEntry event; //must first
IOEventEntry event; //must first
union {
char server_ip[IP_ADDRESS_SIZE];
char client_ip[IP_ADDRESS_SIZE];
};
void *arg; //extra argument pointer
char *data; //buffer for write or read
void *arg; //extra argument pointer
char *recv_body; //for extra (dynamic) recv buffer
struct {
@ -93,9 +115,9 @@ struct fast_task_info
int count;
} iovec_array; //for writev
int size; //alloc size
int length; //data length
int offset; //current offset
struct fast_net_buffer_wrapper send; //send buffer
struct fast_net_buffer_wrapper recv; //recv buffer
uint16_t port; //peer port
struct {
uint8_t current;
@ -105,83 +127,163 @@ struct fast_task_info
volatile int8_t canceled; //if task canceled
short connect_timeout; //for client side
short network_timeout;
int64_t req_count; //request count
int pending_send_count;
int64_t req_count; //request count
struct {
int64_t last_req_count;
uint32_t last_calc_time;
uint16_t continuous_count;
bool in_queue;
struct fc_list_head dlink; //for polling queue
} polling; //for RDMA busy polling
TaskContinueCallback continue_callback; //for continue stage
TaskFinishCallback finish_callback;
struct nio_thread_data *thread_data;
void *ctx; //context pointer for libserverframe nio
struct fast_task_info *next; //for free queue and deleted list
struct fast_task_info *notify_next; //for nio notify queue
TaskFinishCallback finish_callback;
struct nio_thread_data *thread_data;
struct sf_network_handler *handler; //network handler for libserverframe nio
struct fast_task_info *next; //for free queue and deleted list
struct fast_task_info *notify_next; //for nio notify queue
struct fast_task_queue *free_queue; //task allocator
char conn[0]; //for RDMA connection
};
struct fast_task_queue
{
struct fast_task_info *head;
struct fast_task_info *tail;
pthread_mutex_t lock;
int max_connections;
int alloc_connections;
int alloc_task_once;
int min_buff_size;
int max_buff_size;
int arg_size;
int block_size;
bool malloc_whole_block;
int min_buff_size;
int max_buff_size;
int padding_size; //for last field: conn[0]
int arg_size; //for arg pointer
int block_size;
bool malloc_whole_block;
bool double_buffers; //if send buffer and recv buffer are independent
struct fast_mblock_man allocator;
TaskInitCallback init_callback;
TaskReleaseCallback release_callback;
};
#ifdef __cplusplus
extern "C" {
#endif
int free_queue_init_ex2(const int max_connections, const int init_connections,
int free_queue_init_ex2(struct fast_task_queue *queue, const char *name,
const bool double_buffers, const int max_connections,
const int alloc_task_once, const int min_buff_size,
const int max_buff_size, const int arg_size,
TaskInitCallback init_callback);
const int max_buff_size, const int padding_size,
const int arg_size, TaskInitCallback init_callback);
static inline int free_queue_init_ex(const int max_connections,
const int init_connections, const int alloc_task_once,
const int min_buff_size, const int max_buff_size, const int arg_size)
static inline int free_queue_init_ex(struct fast_task_queue *queue,
const char *name, const bool double_buffers,
const int max_connections, const int alloc_task_once,
const int min_buff_size, const int max_buff_size,
const int arg_size)
{
return free_queue_init_ex2(max_connections, init_connections,
alloc_task_once, min_buff_size, max_buff_size, arg_size, NULL);
const int padding_size = 0;
return free_queue_init_ex2(queue, name, double_buffers, max_connections,
alloc_task_once, min_buff_size, max_buff_size, padding_size,
arg_size, NULL);
}
static inline int free_queue_init(const int max_connections,
const int min_buff_size, const int max_buff_size, const int arg_size)
static inline void free_queue_set_release_callback(
struct fast_task_queue *queue,
TaskReleaseCallback callback)
{
return free_queue_init_ex2(max_connections, max_connections,
0, min_buff_size, max_buff_size, arg_size, NULL);
queue->release_callback = callback;
}
void free_queue_destroy();
void free_queue_destroy(struct fast_task_queue *queue);
int free_queue_push(struct fast_task_info *pTask);
struct fast_task_info *free_queue_pop();
int free_queue_count();
int free_queue_alloc_connections();
int free_queue_set_buffer_size(struct fast_task_info *pTask,
const int expect_size);
int free_queue_realloc_buffer(struct fast_task_info *pTask,
const int expect_size);
static inline struct fast_task_info *free_queue_pop(
struct fast_task_queue *queue)
{
return fast_mblock_alloc_object(&queue->allocator);
}
int free_queue_set_max_buffer_size(struct fast_task_info *pTask);
void free_queue_push(struct fast_task_info *task);
int free_queue_realloc_max_buffer(struct fast_task_info *pTask);
static inline int free_queue_count(struct fast_task_queue *queue)
{
return queue->allocator.info.element_total_count -
queue->allocator.info.element_used_count;
}
int task_queue_init(struct fast_task_queue *pQueue);
int task_queue_push(struct fast_task_queue *pQueue, \
struct fast_task_info *pTask);
struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue);
int task_queue_count(struct fast_task_queue *pQueue);
int task_queue_set_buffer_size(struct fast_task_queue *pQueue,
struct fast_task_info *pTask, const int expect_size);
int task_queue_realloc_buffer(struct fast_task_queue *pQueue,
struct fast_task_info *pTask, const int expect_size);
static inline int free_queue_alloc_connections(struct fast_task_queue *queue)
{
return queue->allocator.info.element_total_count;
}
int task_queue_get_new_buffer_size(const int min_buff_size,
int free_queue_get_new_buffer_size(const int min_buff_size,
const int max_buff_size, const int expect_size, int *new_size);
int free_queue_set_buffer_size(struct fast_task_info *task,
struct fast_net_buffer *buffer, const int expect_size);
static inline int free_queue_set_max_buffer_size(
struct fast_task_info *task,
struct fast_net_buffer *buffer)
{
return free_queue_set_buffer_size(task, buffer,
task->free_queue->max_buff_size);
}
int free_queue_realloc_buffer(struct fast_task_info *task,
struct fast_net_buffer *buffer, const int expect_size);
static inline int free_queue_realloc_max_buffer(
struct fast_task_info *task,
struct fast_net_buffer *buffer)
{
return free_queue_realloc_buffer(task, buffer,
task->free_queue->max_buff_size);
}
/* send and recv buffer operations */
static inline int free_queue_set_send_buffer_size(
struct fast_task_info *task, const int expect_size)
{
return free_queue_set_buffer_size(task, task->send.ptr, expect_size);
}
static inline int free_queue_set_recv_buffer_size(
struct fast_task_info *task, const int expect_size)
{
return free_queue_set_buffer_size(task, task->recv.ptr, expect_size);
}
static inline int free_queue_set_send_max_buffer_size(
struct fast_task_info *task)
{
return free_queue_set_max_buffer_size(task, task->send.ptr);
}
static inline int free_queue_set_recv_max_buffer_size(
struct fast_task_info *task)
{
return free_queue_set_max_buffer_size(task, task->recv.ptr);
}
static inline int free_queue_realloc_send_buffer(
struct fast_task_info *task, const int expect_size)
{
return free_queue_realloc_buffer(task, task->send.ptr, expect_size);
}
static inline int free_queue_realloc_recv_buffer(
struct fast_task_info *task, const int expect_size)
{
return free_queue_realloc_buffer(task, task->recv.ptr, expect_size);
}
static inline int free_queue_realloc_send_max_buffer(
struct fast_task_info *task)
{
return free_queue_realloc_max_buffer(task, task->send.ptr);
}
static inline int free_queue_realloc_recv_max_buffer(
struct fast_task_info *task)
{
return free_queue_realloc_max_buffer(task, task->recv.ptr);
}
#ifdef __cplusplus
}
#endif

View File

@ -50,10 +50,53 @@ void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify)
*notify = false;
}
queue->tail = data;
PTHREAD_MUTEX_UNLOCK(&queue->lcp.lock);
}
static inline bool fc_queue_exists(struct fc_queue *queue, void *data)
{
void *current;
if (queue->head == NULL) {
return false;
}
current = queue->head;
do {
if (current == data) {
return true;
}
current = FC_QUEUE_NEXT_PTR(queue, current);
} while (current != NULL);
return false;
}
int fc_queue_push_with_check_ex(struct fc_queue *queue,
void *data, bool *notify)
{
int result;
PTHREAD_MUTEX_LOCK(&queue->lcp.lock);
if (fc_queue_exists(queue, data)) {
result = EEXIST;
*notify = false;
} else {
result = 0;
FC_QUEUE_NEXT_PTR(queue, data) = NULL;
if (queue->tail == NULL) {
queue->head = data;
*notify = true;
} else {
FC_QUEUE_NEXT_PTR(queue, queue->tail) = data;
*notify = false;
}
queue->tail = data;
}
PTHREAD_MUTEX_UNLOCK(&queue->lcp.lock);
return result;
}
void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked)
{
void *data;

View File

@ -68,6 +68,8 @@ static inline void fc_queue_terminate_all(
//notify by the caller
void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify);
int fc_queue_push_with_check_ex(struct fc_queue *queue,
void *data, bool *notify);
static inline void fc_queue_push(struct fc_queue *queue, void *data)
{
@ -79,6 +81,19 @@ static inline void fc_queue_push(struct fc_queue *queue, void *data)
}
}
static inline int fc_queue_push_with_check(struct fc_queue *queue, void *data)
{
int result;
bool notify;
result = fc_queue_push_with_check_ex(queue, data, &notify);
if (notify) {
pthread_cond_signal(&(queue->lcp.cond));
}
return result;
}
static inline void fc_queue_push_silence(struct fc_queue *queue, void *data)
{
bool notify;
@ -171,6 +186,16 @@ static inline int fc_queue_count(struct fc_queue *queue)
return count;
}
static inline void *fc_queue_peek(struct fc_queue *queue)
{
void *data;
pthread_mutex_lock(&queue->lcp.lock);
data = queue->head;
pthread_mutex_unlock(&queue->lcp.lock);
return data;
}
void *fc_queue_timedpop(struct fc_queue *queue,
const int timeout, const int time_unit);

View File

@ -91,7 +91,7 @@ static void deal_timeouts(FastTimerEntry *head)
}
}
int ioevent_loop(struct nio_thread_data *pThreadData,
int ioevent_loop(struct nio_thread_data *thread_data,
IOEventCallback recv_notify_callback, TaskCleanUpCallback
clean_up_callback, volatile bool *continue_flag)
{
@ -102,15 +102,17 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
time_t last_check_time;
int save_extra_events;
int count;
uint32_t sched_counter;
bool sched_pull;
memset(&ev_notify, 0, sizeof(ev_notify));
ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData);
ev_notify.event.fd = FC_NOTIFY_READ_FD(thread_data);
ev_notify.event.callback = recv_notify_callback;
ev_notify.thread_data = pThreadData;
ev_notify.thread_data = thread_data;
save_extra_events = pThreadData->ev_puller.extra_events;
pThreadData->ev_puller.extra_events = 0; //disable edge trigger temporarily
if (ioevent_attach(&pThreadData->ev_puller, ev_notify.
save_extra_events = thread_data->ev_puller.extra_events;
thread_data->ev_puller.extra_events = 0; //disable edge trigger temporarily
if (ioevent_attach(&thread_data->ev_puller, ev_notify.
event.fd, IOEVENT_READ, &ev_notify) != 0)
{
result = errno != 0 ? errno : ENOMEM;
@ -119,39 +121,67 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
__LINE__, result, STRERROR(result));
return result;
}
pThreadData->ev_puller.extra_events = save_extra_events; //restore
thread_data->ev_puller.extra_events = save_extra_events; //restore
pThreadData->deleted_list = NULL;
sched_counter = 0;
thread_data->deleted_list = NULL;
last_check_time = g_current_time;
while (*continue_flag)
{
pThreadData->ev_puller.iterator.count = ioevent_poll(
&pThreadData->ev_puller);
if (pThreadData->ev_puller.iterator.count > 0)
{
deal_ioevents(&pThreadData->ev_puller);
}
else if (pThreadData->ev_puller.iterator.count < 0)
{
result = errno != 0 ? errno : EINVAL;
if (result != EINTR)
{
logError("file: "__FILE__", line: %d, " \
"ioevent_poll fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
}
#ifdef OS_LINUX
if (thread_data->ev_puller.timeout == 0) {
sched_pull = (sched_counter++ & 8) != 0;
} else {
sched_pull = true;
}
#else
sched_pull = true;
#endif
if (pThreadData->deleted_list != NULL)
if (sched_pull)
{
thread_data->ev_puller.iterator.count = ioevent_poll(
&thread_data->ev_puller);
if (thread_data->ev_puller.iterator.count > 0)
{
deal_ioevents(&thread_data->ev_puller);
}
else if (thread_data->ev_puller.iterator.count < 0)
{
result = errno != 0 ? errno : EINVAL;
if (result != EINTR)
{
logError("file: "__FILE__", line: %d, " \
"ioevent_poll fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
}
}
if (thread_data->busy_polling_callback != NULL)
{
thread_data->busy_polling_callback(thread_data);
}
if (thread_data->deleted_list != NULL)
{
count = 0;
while (pThreadData->deleted_list != NULL)
while (thread_data->deleted_list != NULL)
{
task = pThreadData->deleted_list;
pThreadData->deleted_list = task->next;
task = thread_data->deleted_list;
thread_data->deleted_list = task->next;
if (task->polling.in_queue)
{
fc_list_del_init(&task->polling.dlink);
task->polling.in_queue = false;
if (fc_list_empty(&task->thread_data->polling_queue)) {
ioevent_set_timeout(&task->thread_data->ev_puller,
task->thread_data->timeout_ms);
}
}
clean_up_callback(task);
count++;
}
@ -162,31 +192,31 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
{
last_check_time = g_current_time;
count = fast_timer_timeouts_get(
&pThreadData->timer, g_current_time, &head);
&thread_data->timer, g_current_time, &head);
if (count > 0)
{
deal_timeouts(&head);
}
}
if (pThreadData->notify.enabled)
if (thread_data->notify.enabled)
{
int64_t n;
if ((n=__sync_fetch_and_add(&pThreadData->notify.counter, 0)) != 0)
if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0)
{
__sync_fetch_and_sub(&pThreadData->notify.counter, n);
__sync_fetch_and_sub(&thread_data->notify.counter, n);
/*
logInfo("file: "__FILE__", line: %d, "
"n ==== %"PRId64", now: %"PRId64,
__LINE__, n, __sync_fetch_and_add(
&pThreadData->notify.counter, 0));
&thread_data->notify.counter, 0));
*/
}
}
if (pThreadData->thread_loop_callback != NULL)
if (thread_data->thread_loop_callback != NULL)
{
pThreadData->thread_loop_callback(pThreadData);
thread_data->thread_loop_callback(thread_data);
}
}
@ -201,14 +231,13 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
task->thread_data = pThread;
task->event.fd = sock;
task->event.callback = callback;
if (ioevent_attach(&pThread->ev_puller,
sock, event, task) < 0)
if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0)
{
result = errno != 0 ? errno : ENOENT;
logError("file: "__FILE__", line: %d, " \
"ioevent_attach fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
logError("file: "__FILE__", line: %d, "
"ioevent_attach fail, fd: %d, "
"errno: %d, error info: %s",
__LINE__, sock, result, STRERROR(result));
return result;
}
@ -216,3 +245,19 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
fast_timer_add(&pThread->timer, &task->event.timer);
return 0;
}
int ioevent_reset(struct fast_task_info *task, int new_fd, short event)
{
if (task->event.fd == new_fd)
{
return 0;
}
if (task->event.fd >= 0)
{
ioevent_detach(&task->thread_data->ev_puller, task->event.fd);
}
task->event.fd = new_fd;
return ioevent_attach(&task->thread_data->ev_puller, new_fd, event, task);
}

View File

@ -22,7 +22,7 @@
extern "C" {
#endif
int ioevent_loop(struct nio_thread_data *pThreadData,
int ioevent_loop(struct nio_thread_data *thread_data,
IOEventCallback recv_notify_callback, TaskCleanUpCallback
clean_up_callback, volatile bool *continue_flag);
@ -32,6 +32,8 @@ int ioevent_remove(IOEventPoller *ioevent, void *data);
int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread,
int sock, short event, IOEventCallback callback, const int timeout);
int ioevent_reset(struct fast_task_info *task, int new_fd, short event);
static inline bool ioevent_is_canceled(struct fast_task_info *task)
{
return __sync_fetch_and_add(&task->canceled, 0) != 0;

View File

@ -225,28 +225,41 @@ int log_reopen_ex(LogContext *pContext)
int log_set_prefix_ex(LogContext *pContext, const char *base_path,
const char *filename_prefix)
{
int result;
int result;
char log_filename[MAX_PATH_SIZE];
if ((result=check_and_mk_log_dir(base_path)) != 0)
{
return result;
}
if ((result=check_and_mk_log_dir(base_path)) != 0)
{
return result;
}
snprintf(pContext->log_filename, MAX_PATH_SIZE,
"%s/logs/%s.log", base_path, filename_prefix);
return log_open(pContext);
snprintf(log_filename, MAX_PATH_SIZE, "%s/logs/%s.log",
base_path, filename_prefix);
return log_set_filename_ex(pContext, log_filename);
}
int log_set_filename_ex(LogContext *pContext, const char *log_filename)
{
if (log_filename == NULL) {
fprintf(stderr, "file: "__FILE__", line: %d, " \
"log_filename is NULL!\n", __LINE__);
if (log_filename == NULL || *log_filename == '\0')
{
fprintf(stderr, "file: "__FILE__", line: %d, "
"log_filename is NULL or empty!\n", __LINE__);
return EINVAL;
}
snprintf(pContext->log_filename, MAX_PATH_SIZE, "%s", log_filename);
return log_open(pContext);
if (*(pContext->log_filename) == '\0')
{
snprintf(pContext->log_filename, MAX_PATH_SIZE, "%s", log_filename);
return log_open(pContext);
}
if (strcmp(log_filename, pContext->log_filename) == 0)
{
return 0;
}
snprintf(pContext->log_filename, MAX_PATH_SIZE, "%s", log_filename);
return log_reopen_ex(pContext);
}
void log_set_cache_ex(LogContext *pContext, const bool bLogCache)

View File

@ -314,35 +314,96 @@ static inline void fc_server_set_ip_prefix(FCServerGroupInfo *ginfo,
}
}
static int fc_server_load_one_group(FCServerConfig *ctx,
const char *config_filename, IniContext *ini_context,
const int group_count, const char *section_name)
static inline int fc_server_set_comm_type(FCCommunicationType *comm_type,
const char *config_filename, const char *section_name,
const char *comm_type_str, const FCCommunicationType default_comm_type)
{
if (comm_type_str == NULL) {
*comm_type = default_comm_type;
return 0;
} else if (strcasecmp(comm_type_str, "socket") == 0) {
*comm_type = fc_comm_type_sock;
return 0;
} else if (strcasecmp(comm_type_str, "rdma") == 0) {
*comm_type = fc_comm_type_rdma;
return 0;
} else {
logError("file: "__FILE__", line: %d, "
"config filename: %s, section: %s, "
"invalid communication: %s!", __LINE__,
config_filename, section_name, comm_type_str);
return EINVAL;
}
}
static int load_comm_type_and_smart_polling(IniFullContext *ini_ctx,
FCCommunicationType *comm_type, FCSmartPollingConfig *smart_polling,
const FCCommunicationType default_comm_type,
const FCSmartPollingConfig *default_smart_polling)
{
int result;
char *comm_type_str;
comm_type_str = iniGetStrValue(ini_ctx->section_name,
"communication", ini_ctx->context);
if (comm_type_str == NULL) {
comm_type_str = iniGetStrValue(ini_ctx->section_name,
"comm_type", ini_ctx->context);
}
if ((result=fc_server_set_comm_type(comm_type, ini_ctx->filename,
ini_ctx->section_name, comm_type_str,
default_comm_type)) != 0)
{
return result;
}
if (*comm_type == fc_comm_type_sock) {
smart_polling->enabled = false;
smart_polling->switch_on_iops = 0;
smart_polling->switch_on_count = 0;
} else {
smart_polling->enabled = iniGetBoolValue(ini_ctx->section_name,
"smart_polling", ini_ctx->context,
default_smart_polling->enabled);
smart_polling->switch_on_iops = iniGetIntValue(ini_ctx->section_name,
"polling_switch_on_iops", ini_ctx->context,
default_smart_polling->switch_on_iops);
smart_polling->switch_on_count = iniGetIntValue(ini_ctx->section_name,
"polling_switch_on_count", ini_ctx->context,
default_smart_polling->switch_on_count);
}
return 0;
}
static int fc_server_load_one_group(FCServerConfig *ctx,
IniFullContext *ini_ctx, const int group_count)
{
int result;
FCServerGroupInfo *group;
char new_name[FAST_INI_ITEM_NAME_SIZE];
char *port_str;
char *net_type;
char *ip_prefix;
strcpy(new_name, section_name);
strcpy(new_name, ini_ctx->section_name);
group = ctx->group_array.groups + ctx->group_array.count;
fc_server_set_group_ptr_name(group, new_name + GROUP_SECTION_PREFIX_LEN);
if (group->group_name.len == 0) {
logError("file: "__FILE__", line: %d, "
"config filename: %s, section: %s, no group name!",
__LINE__, config_filename, section_name);
__LINE__, ini_ctx->filename, ini_ctx->section_name);
return EINVAL;
}
port_str = iniGetStrValue(section_name, SERVER_ITEM_PORT_STR, ini_context);
port_str = iniGetStrValue(ini_ctx->section_name,
SERVER_ITEM_PORT_STR, ini_ctx->context);
if (port_str == NULL) {
if (group_count == 1) {
group->port = ctx->default_port;
} else {
logError("file: "__FILE__", line: %d, "
"config filename: %s, section: %s, no item: %s!",
__LINE__, config_filename, section_name,
__LINE__, ini_ctx->filename, ini_ctx->section_name,
SERVER_ITEM_PORT_STR);
return ENOENT;
}
@ -352,24 +413,33 @@ static int fc_server_load_one_group(FCServerConfig *ctx,
if (group->port <= 0 || (endptr != NULL && *endptr != '\0')) {
logError("file: "__FILE__", line: %d, "
"config filename: %s, section: %s, item: %s, "
"invalid port: %s", __LINE__, config_filename,
section_name, SERVER_ITEM_PORT_STR, port_str);
"invalid port: %s", __LINE__, ini_ctx->filename,
ini_ctx->section_name, SERVER_ITEM_PORT_STR, port_str);
return EINVAL;
}
}
net_type = iniGetStrValue(section_name, "net_type", ini_context);
net_type = iniGetStrValue(ini_ctx->section_name,
"net_type", ini_ctx->context);
group->filter.net_type = fc_get_net_type_by_name(net_type);
if (group->filter.net_type == FC_NET_TYPE_NONE) {
logError("file: "__FILE__", line: %d, "
"config filename: %s, section: %s, invalid net_type: %s",
__LINE__, config_filename, group->group_name.str, net_type);
__LINE__, ini_ctx->filename, group->group_name.str, net_type);
return EINVAL;
}
ip_prefix = iniGetStrValue(section_name, "ip_prefix", ini_context);
ip_prefix = iniGetStrValue(ini_ctx->section_name,
"ip_prefix", ini_ctx->context);
fc_server_set_ip_prefix(group, ip_prefix);
if ((result=load_comm_type_and_smart_polling(ini_ctx,
&group->comm_type, &group->smart_polling,
ctx->comm_type, &ctx->smart_polling)) != 0)
{
return result;
}
ctx->group_array.count++;
return 0;
}
@ -429,7 +499,7 @@ static void fc_server_sort_groups(FCServerConfig *ctx)
}
static int fc_server_load_groups(FCServerConfig *ctx,
const char *config_filename, IniContext *ini_context)
IniFullContext *ini_ctx)
{
int result;
int count;
@ -437,13 +507,13 @@ static int fc_server_load_groups(FCServerConfig *ctx,
IniSectionInfo *section;
IniSectionInfo *end;
if ((result=iniGetSectionNamesByPrefix(ini_context,
if ((result=iniGetSectionNamesByPrefix(ini_ctx->context,
GROUP_SECTION_PREFIX_STR, sections,
FC_MAX_GROUP_COUNT, &count)) != 0)
{
logError("file: "__FILE__", line: %d, "
"config filename: %s, get sections by prefix %s fail, "
"errno: %d, error info: %s", __LINE__, config_filename,
"errno: %d, error info: %s", __LINE__, ini_ctx->filename,
GROUP_SECTION_PREFIX_STR, result, STRERROR(result));
return result;
}
@ -452,15 +522,14 @@ static int fc_server_load_groups(FCServerConfig *ctx,
ctx->group_array.count = 1;
fc_server_set_group_ptr_name(ctx->group_array.groups + 0, "");
ctx->group_array.groups[0].port = iniGetIntValue(NULL, "port",
ini_context, ctx->default_port);
ini_ctx->context, ctx->default_port);
return 0;
}
end = sections + count;
for (section=sections; section<end; section++) {
if ((result=fc_server_load_one_group(ctx, config_filename,
ini_context, count, section->section_name)) != 0)
{
ini_ctx->section_name = section->section_name;
if ((result=fc_server_load_one_group(ctx, ini_ctx, count)) != 0) {
return result;
}
}
@ -794,6 +863,7 @@ static int fc_server_load_group_server(FCServerConfig *ctx,
return result;
}
address.conn.comm_type = group->comm_type;
if ((result=fc_server_set_group_server_address(server,
group_addr, &address)) != 0)
{
@ -835,9 +905,16 @@ static int fc_server_set_host(FCServerConfig *ctx, FCServerInfo *server,
if (addr->conn.port == 0) {
addr_holder = *addr;
addr_holder.conn.port = FC_SERVER_GROUP_PORT(group);
addr_holder.conn.comm_type = group->comm_type;
new_addr = &addr_holder;
} else {
new_addr = addr;
if (addr->conn.comm_type == group->comm_type) {
new_addr = addr;
} else {
addr_holder = *addr;
addr_holder.conn.comm_type = group->comm_type;
new_addr = &addr_holder;
}
}
if ((result=fc_server_set_group_server_address(server,
@ -1167,17 +1244,77 @@ static int fc_server_load_servers(FCServerConfig *ctx,
return result;
}
static void load_connection_thread_local(FCServerConfig *ctx,
IniContext *ini_context, const char *config_filename)
{
char *connection_thread_local;
connection_thread_local = iniGetStrValue(NULL,
"connection_thread_local", ini_context);
if (connection_thread_local == NULL || *connection_thread_local == '\0') {
ctx->connection_thread_local = fc_connection_thread_local_auto;
} else if (strcasecmp(connection_thread_local, "auto") == 0) {
ctx->connection_thread_local = fc_connection_thread_local_auto;
} else if (strcasecmp(connection_thread_local, "yes") == 0) {
ctx->connection_thread_local = fc_connection_thread_local_yes;
} else if (strcasecmp(connection_thread_local, "no") == 0) {
ctx->connection_thread_local = fc_connection_thread_local_no;
} else {
logWarning("file: "__FILE__", line: %d, "
"config file: %s, invalid connection_thread_local: %s, "
"set to auto!", __LINE__, config_filename,
connection_thread_local);
ctx->connection_thread_local = fc_connection_thread_local_auto;
}
}
static int fc_server_load_data(FCServerConfig *ctx,
IniContext *ini_context, const char *config_filename)
{
int result;
int buffer_size;
bool have_rdma;
IniFullContext full_ini_ctx;
FCSmartPollingConfig default_smart_polling;
FCServerGroupInfo *group;
FCServerGroupInfo *end;
if ((result=fc_server_load_groups(ctx, config_filename,
ini_context)) != 0)
FAST_INI_SET_FULL_CTX_EX(full_ini_ctx,
config_filename, NULL, ini_context);
default_smart_polling.enabled = true;
default_smart_polling.switch_on_iops = 10240;
default_smart_polling.switch_on_count = 3;
if ((result=load_comm_type_and_smart_polling(&full_ini_ctx,
&ctx->comm_type, &ctx->smart_polling,
fc_comm_type_sock, &default_smart_polling)) != 0)
{
return result;
}
if ((result=fc_server_load_groups(ctx, &full_ini_ctx)) != 0) {
return result;
}
have_rdma = false;
end = ctx->group_array.groups + ctx->group_array.count;
for (group=ctx->group_array.groups; group<end; group++) {
if (group->comm_type != fc_comm_type_sock) {
have_rdma = true;
break;
}
}
if (have_rdma) {
full_ini_ctx.section_name = NULL;
buffer_size = iniGetByteValue(NULL, "buffer_size",
ini_context, 256 * 1024);
ctx->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx,
"buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024);
} else {
ctx->buffer_size = 0;
}
load_connection_thread_local(ctx, ini_context, config_filename);
if ((result=fc_server_load_servers(ctx, config_filename,
ini_context)) != 0)
{
@ -1340,13 +1477,27 @@ static int fc_groups_to_string(FCServerConfig *ctx, FastBuffer *buffer)
fast_buffer_append(buffer,
"[%s%.*s]\n"
"port = %d\n"
"net_type = %s\n"
"ip_prefix = %.*s\n\n",
"port = %d\n",
GROUP_SECTION_PREFIX_STR,
group->group_name.len, group->group_name.str,
group->port, net_type_caption,
group->filter.ip_prefix.len,
group->port);
if (group->comm_type != fc_comm_type_sock) {
fast_buffer_append(buffer,
"communication = %s\n"
"smart_polling = %d\n"
"polling_switch_on_iops = %d\n"
"polling_switch_on_count = %d\n",
fc_comm_type_str(group->comm_type),
group->smart_polling.enabled,
group->smart_polling.switch_on_iops,
group->smart_polling.switch_on_count);
}
fast_buffer_append(buffer,
"net_type = %s\n"
"ip_prefix = %.*s\n\n",
net_type_caption, group->filter.ip_prefix.len,
group->filter.ip_prefix.str);
}
return 0;
@ -1423,6 +1574,14 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer)
{
int result;
if (ctx->buffer_size > 0) {
if ((result=fast_buffer_check(buffer, 1024)) != 0) {
return result;
}
fast_buffer_append(buffer, "buffer_size = %d KB",
ctx->buffer_size / 1024);
}
fc_server_clear_server_port(&ctx->group_array);
if ((result=fc_groups_to_string(ctx, buffer)) != 0) {
return result;
@ -1435,13 +1594,27 @@ static void fc_server_log_groups(FCServerConfig *ctx)
{
FCServerGroupInfo *group;
FCServerGroupInfo *end;
char buff[1024];
char *p;
end = ctx->group_array.groups + ctx->group_array.count;
for (group=ctx->group_array.groups; group<end; group++) {
logInfo("group_name: %.*s, port: %d, net_type: %s, ip_prefix: %.*s",
group->group_name.len, group->group_name.str, group->port,
p = buff + sprintf(buff, "group_name: %.*s, port: %d",
group->group_name.len, group->group_name.str,
group->port);
if (group->comm_type != fc_comm_type_sock) {
p += sprintf(p, ", communication: %s, smart_polling: %d, "
"polling_switch_on_iops: %d, polling_switch_on_count: %d",
fc_comm_type_str(group->comm_type),
group->smart_polling.enabled,
group->smart_polling.switch_on_iops,
group->smart_polling.switch_on_count);
}
p += sprintf(p, ", net_type: %s, ip_prefix: %.*s",
get_net_type_caption(group->filter.net_type),
group->filter.ip_prefix.len, group->filter.ip_prefix.str);
log_it1(LOG_INFO, buff, p - buff);
}
}
@ -1491,67 +1664,20 @@ static void fc_server_log_servers(FCServerConfig *ctx)
void fc_server_to_log(FCServerConfig *ctx)
{
char buff[256];
char *p;
p = buff + sprintf(buff, "connection_thread_local: %s",
fc_connection_thread_local_str(ctx->connection_thread_local));
if (ctx->buffer_size > 0) {
p += sprintf(p, ", buffer_size: %d KB", ctx->buffer_size / 1024);
}
log_it1(LOG_INFO, buff, p - buff);
fc_server_log_groups(ctx);
fc_server_log_servers(ctx);
}
ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array,
const char *service_name, const int connect_timeout,
const char *bind_ipaddr, const bool log_connect_error, int *err_no)
{
FCAddressInfo **current;
FCAddressInfo **addr;
FCAddressInfo **end;
if (addr_array->count <= 0) {
*err_no = ENOENT;
return NULL;
}
current = addr_array->addrs + addr_array->index;
if ((*current)->conn.sock >= 0) {
return &(*current)->conn;
}
if ((*err_no=conn_pool_connect_server_ex1(&(*current)->conn,
service_name, connect_timeout, bind_ipaddr,
log_connect_error)) == 0)
{
return &(*current)->conn;
}
if (addr_array->count == 1) {
return NULL;
}
end = addr_array->addrs + addr_array->count;
for (addr=addr_array->addrs; addr<end; addr++) {
if (addr == current) {
continue;
}
if ((*err_no=conn_pool_connect_server_ex1(&(*addr)->conn,
service_name, connect_timeout, bind_ipaddr,
log_connect_error)) == 0)
{
addr_array->index = addr - addr_array->addrs;
return &(*addr)->conn;
}
}
return NULL;
}
void fc_server_disconnect(FCAddressPtrArray *addr_array)
{
FCAddressInfo **current;
current = addr_array->addrs + addr_array->index;
if ((*current)->conn.sock >= 0) {
close((*current)->conn.sock);
(*current)->conn.sock = -1;
}
}
int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
ConnectionInfo *conn, const char *service_name,
const int connect_timeout, const char *bind_ipaddr,
@ -1567,10 +1693,11 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
}
current = addr_array->addrs + addr_array->index;
*conn = (*current)->conn;
conn->sock = -1;
if ((result=conn_pool_connect_server_ex1(conn,
service_name, connect_timeout,
conn_pool_set_server_info(conn, (*current)->conn.ip_addr,
(*current)->conn.port);
conn->comm_type = (*current)->conn.comm_type;
if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
make_connection(conn, service_name, connect_timeout * 1000,
bind_ipaddr, log_connect_error)) == 0)
{
return 0;
@ -1586,10 +1713,10 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
continue;
}
*conn = (*addr)->conn;
conn->sock = -1;
if ((result=conn_pool_connect_server_ex1(conn,
service_name, connect_timeout,
conn_pool_set_server_info(conn, (*addr)->conn.ip_addr,
(*addr)->conn.port);
if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
make_connection(conn, service_name, connect_timeout * 1000,
bind_ipaddr, log_connect_error)) == 0)
{
addr_array->index = addr - addr_array->addrs;
@ -1624,3 +1751,34 @@ const FCAddressInfo *fc_server_get_address_by_peer(
return *(addr_array->addrs);
}
struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd,
FCAddressPtrArray *address_array, int *result)
{
char *ip_addrs[FC_MAX_SERVER_IP_COUNT];
char **ip_addr;
FCAddressInfo **addr;
FCAddressInfo **end;
struct ibv_pd *pd;
int port;
if (address_array->count == 0) {
port = 0;
} else {
port = address_array->addrs[0]->conn.port;
}
end = address_array->addrs + address_array->count;
for (addr=address_array->addrs, ip_addr=ip_addrs; addr<end; addr++) {
*ip_addr = (*addr)->conn.ip_addr;
}
if ((pd=alloc_pd((const char **)ip_addrs, address_array->
count, port)) != NULL)
{
*result = 0;
} else {
*result = ENODEV;
}
return pd;
}

View File

@ -52,11 +52,20 @@ typedef struct {
FCAddressInfo **addrs;
} FCAddressPtrArray;
typedef struct
{
bool enabled;
int switch_on_iops;
int switch_on_count;
} FCSmartPollingConfig;
typedef struct
{
string_t group_name;
int port; //default port
int server_port; //port in server section
FCCommunicationType comm_type;
FCSmartPollingConfig smart_polling;
struct {
int net_type;
string_t ip_prefix;
@ -111,11 +120,21 @@ typedef struct
FCServerMap *maps;
} FCServerMapArray;
typedef struct
typedef enum {
fc_connection_thread_local_auto,
fc_connection_thread_local_yes,
fc_connection_thread_local_no
} FCServerConnThreadLocal;
typedef struct fc_server_config
{
int default_port;
int min_hosts_each_group;
bool share_between_groups; //if an address shared between different groups
int buffer_size; //for RDMA
FCCommunicationType comm_type;
FCSmartPollingConfig smart_polling;
FCServerConnThreadLocal connection_thread_local;
FCServerGroupArray group_array;
struct {
FCServerInfoArray by_id; //sorted by server id
@ -140,6 +159,16 @@ static inline FCServerInfo *fc_server_get_by_ip_port(FCServerConfig *ctx,
FCServerGroupInfo *fc_server_get_group_by_name(FCServerConfig *ctx,
const string_t *group_name);
static inline FCServerGroupInfo *fc_server_get_group_by_index(
FCServerConfig *ctx, const int index)
{
if (index < 0 || index >= ctx->group_array.count) {
return NULL;
}
return ctx->group_array.groups + index;
}
static inline int fc_server_get_group_index_ex(FCServerConfig *ctx,
const string_t *group_name)
{
@ -211,17 +240,6 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer);
void fc_server_to_log(FCServerConfig *ctx);
ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array,
const char *service_name, const int connect_timeout,
const char *bind_ipaddr, const bool log_connect_error, int *err_no);
#define fc_server_check_connect(addr_array, service_name, \
connect_timeout, err_no) \
fc_server_check_connect_ex(addr_array, service_name, \
connect_timeout, NULL, true, err_no)
void fc_server_disconnect(FCAddressPtrArray *addr_array);
const FCAddressInfo *fc_server_get_address_by_peer(
FCAddressPtrArray *addr_array, const char *peer_ip);
@ -235,6 +253,35 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array,
fc_server_make_connection_ex(addr_array, conn, \
service_name, connect_timeout, NULL, true)
static inline void fc_server_close_connection(ConnectionInfo *conn)
{
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].close_connection(conn);
}
static inline void fc_server_destroy_connection(ConnectionInfo *conn)
{
fc_server_close_connection(conn);
conn_pool_free_connection(conn);
}
struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd,
FCAddressPtrArray *address_array, int *result);
static inline const char *fc_connection_thread_local_str(
const FCServerConnThreadLocal value)
{
switch (value) {
case fc_connection_thread_local_auto:
return "auto";
case fc_connection_thread_local_yes:
return "yes";
case fc_connection_thread_local_no:
return "no";
default:
return "unkown";
}
}
#ifdef __cplusplus
}
#endif