diff --git a/HISTORY b/HISTORY index 87a9bc1..a89b36c 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,13 @@ -Version 1.70 2023-08-27 +Version 1.70 2023-09-30 * get full mac address of infiniband NIC under Linux + * struct fast_task_info add field conn for RDMA connection + * server_id_func.[hc]: support communication type + * connection_pool.[hc] support callbacks for RDMA + * nio thread data support busy_polling_callback + * connection_pool.[hc] support thread local for performance + * struct fast_task_info support send and recv double buffers + * add functions: fc_queue_push_with_check and fc_queue_peek Version 1.69 2023-08-05 * bugfixed: array_allocator_alloc MUST init the array diff --git a/debian/changelog b/debian/changelog index ba7d190..a8d61bb 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,21 @@ +libfastcommon (1.0.70-3) unstable; urgency=medium + + * upgrade to 1.0.70-3 + + -- YuQing <384681@qq.com> Tue, 21 Nov 2023 14:35:29 +0000 + +libfastcommon (1.0.70-2) unstable; urgency=medium + + * upgrade to 1.0.70-2 + + -- YuQing <384681@qq.com> Mon, 20 Nov 2023 13:23:17 +0000 + +libfastcommon (1.0.70-1) unstable; urgency=medium + + * upgrade to 1.0.70-1 + + -- YuQing <384681@qq.com> Sun, 19 Nov 2023 14:45:34 +0000 + libfastcommon (1.0.69-1) unstable; urgency=medium * upgrade to 1.0.69-1 diff --git a/libfastcommon.spec b/libfastcommon.spec index a093c8b..c7cacdf 100644 --- a/libfastcommon.spec +++ b/libfastcommon.spec @@ -3,7 +3,7 @@ %define CommitVersion %(echo $COMMIT_VERSION) Name: libfastcommon -Version: 1.0.69 +Version: 1.0.70 Release: 1%{?dist} Summary: c common functions library extracted from my open source projects FastDFS License: LGPL diff --git a/src/connection_pool.c b/src/connection_pool.c index 4088ad9..366534d 100644 --- a/src/connection_pool.c +++ b/src/connection_pool.c @@ -16,30 +16,85 @@ #include #include #include +#include #include "logger.h" #include "sockopt.h" #include "shared_func.h" #include "sched_thread.h" +#include "server_id_func.h" #include "connection_pool.h" +ConnectionCallbacks g_connection_callbacks = { + false, {{conn_pool_connect_server_ex1, + conn_pool_disconnect_server, + conn_pool_is_connected}, + {NULL, NULL, NULL}}, {NULL} +}; + +static int node_init_for_socket(ConnectionNode *node, + ConnectionPool *cp) +{ + node->conn = (ConnectionInfo *)(node + 1); + return 0; +} + +static int node_init_for_rdma(ConnectionNode *node, + ConnectionPool *cp) +{ + node->conn = (ConnectionInfo *)(node + 1); + node->conn->arg1 = node->conn->args + cp->extra_data_size; + return G_RDMA_CONNECTION_CALLBACKS.init_connection(node->conn, + cp->extra_params.rdma.double_buffers, cp->extra_params. + rdma.buffer_size, cp->extra_params.rdma.pd); +} + +static void cp_tls_destroy(void *ptr) +{ + ConnectionThreadHashTable *htable; + ConnectionNode **pp; + ConnectionNode **end; + ConnectionNode *current; + ConnectionNode *node; + + htable = ptr; + end = htable->buckets + htable->cp->extra_params.tls.htable_capacity; + for (pp=htable->buckets; ppnext; + conn_pool_close_connection(htable->cp, current->conn); + } while (node != NULL); + } + + free(ptr); +} + int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int max_count_per_entry, const int max_idle_time, const int socket_domain, const int htable_init_capacity, fc_connection_callback_func connect_done_func, void *connect_done_args, fc_connection_callback_func validate_func, void *validate_args, - const int extra_data_size) + const int extra_data_size, const ConnectionExtraParams *extra_params) { const int64_t alloc_elements_limit = 0; int result; int init_capacity; + int extra_connection_size; + fast_mblock_object_init_func obj_init_func; if ((result=init_pthread_lock(&cp->lock)) != 0) { return result; } - cp->connect_timeout = connect_timeout; + cp->connect_timeout_ms = connect_timeout * 1000; cp->max_count_per_entry = max_count_per_entry; cp->max_idle_time = max_idle_time; + cp->extra_data_size = extra_data_size; cp->socket_domain = socket_domain; cp->connect_done_callback.func = connect_done_func; cp->connect_done_callback.args = connect_done_args; @@ -54,14 +109,40 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, return result; } + if (extra_params != NULL) { + extra_connection_size = G_RDMA_CONNECTION_CALLBACKS. + get_connection_size(); + obj_init_func = (fast_mblock_object_init_func)node_init_for_rdma; + cp->extra_params = *extra_params; + } else { + extra_connection_size = 0; + cp->extra_params.tls.enabled = false; + cp->extra_params.tls.htable_capacity = 0; + cp->extra_params.rdma.buffer_size = 0; + cp->extra_params.rdma.pd = NULL; + obj_init_func = (fast_mblock_object_init_func)node_init_for_socket; + } if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool-node", sizeof(ConnectionNode) + sizeof(ConnectionInfo) + - extra_data_size, init_capacity, alloc_elements_limit, - NULL, NULL, true)) != 0) + extra_data_size + extra_connection_size, init_capacity, + alloc_elements_limit, obj_init_func, cp, true)) != 0) { return result; } + logDebug("cp: %p, tls.enabled: %d, htable_capacity: %d", + cp, cp->extra_params.tls.enabled, + cp->extra_params.tls.htable_capacity); + + if (cp->extra_params.tls.enabled) { + if ((result=pthread_key_create(&cp->tls_key, cp_tls_destroy)) != 0) { + logError("file: "__FILE__", line: %d, " + "pthread_key_create fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + } + return fc_hash_init(&(cp->hash_array), fc_simple_hash, init_capacity, 0.75); } @@ -84,7 +165,8 @@ static int coon_pool_close_connections(const int index, deleted = node; node = node->next; - conn_pool_disconnect_server(deleted->conn); + G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type]. + close_connection(deleted->conn); fast_mblock_free_object(&cp->node_allocator, deleted); } @@ -104,17 +186,22 @@ void conn_pool_destroy(ConnectionPool *cp) pthread_mutex_destroy(&cp->lock); } -void conn_pool_disconnect_server(ConnectionInfo *pConnection) +void conn_pool_disconnect_server(ConnectionInfo *conn) { - if (pConnection->sock >= 0) - { - close(pConnection->sock); - pConnection->sock = -1; - } + if (conn->sock >= 0) + { + close(conn->sock); + conn->sock = -1; + } +} + +bool conn_pool_is_connected(ConnectionInfo *conn) +{ + return (conn->sock >= 0); } int conn_pool_connect_server_ex1(ConnectionInfo *conn, - const char *service_name, const int connect_timeout, + const char *service_name, const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error) { int result; @@ -131,7 +218,7 @@ int conn_pool_connect_server_ex1(ConnectionInfo *conn, } if ((result=connectserverbyip_nb(conn->sock, conn->ip_addr, - conn->port, connect_timeout)) != 0) + conn->port, connect_timeout_ms / 1000)) != 0) { if (log_connect_error) { @@ -184,23 +271,21 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn, static inline void conn_pool_get_key(const ConnectionInfo *conn, char *key, int *key_len) { - *key_len = sprintf(key, "%s_%u", conn->ip_addr, conn->port); + *key_len = sprintf(key, "%s-%u", conn->ip_addr, conn->port); } -ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, - const ConnectionInfo *conn, const char *service_name, int *err_no) +static ConnectionInfo *get_connection(ConnectionPool *cp, + const ConnectionInfo *conn, const string_t *key, + const char *service_name, int *err_no) { - char key[INET6_ADDRSTRLEN + 8]; - int key_len; ConnectionManager *cm; ConnectionNode *node; ConnectionInfo *ci; time_t current_time; - conn_pool_get_key(conn, key, &key_len); - pthread_mutex_lock(&cp->lock); - cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len); + cm = (ConnectionManager *)fc_hash_find( + &cp->hash_array, key->str, key->len); if (cm == NULL) { cm = (ConnectionManager *)fast_mblock_alloc_object( @@ -223,7 +308,7 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, pthread_mutex_unlock(&cp->lock); return NULL; } - fc_hash_insert(&cp->hash_array, key, key_len, cm); + fc_hash_insert(&cp->hash_array, key->str, key->len, cm); } pthread_mutex_unlock(&cp->lock); @@ -258,7 +343,6 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, return NULL; } - node->conn = (ConnectionInfo *)(node + 1); node->manager = cm; node->next = NULL; node->atime = 0; @@ -266,12 +350,15 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, cm->total_count++; pthread_mutex_unlock(&cm->lock); - memcpy(node->conn, conn, sizeof(ConnectionInfo)); + memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr)); + node->conn->port = conn->port; + node->conn->comm_type = conn->comm_type; node->conn->socket_domain = cp->socket_domain; node->conn->sock = -1; node->conn->validate_flag = false; - *err_no = conn_pool_connect_server_ex1(node->conn, - service_name, cp->connect_timeout, NULL, true); + *err_no = G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + make_connection(node->conn, service_name, + cp->connect_timeout_ms, NULL, true); if (*err_no == 0 && cp->connect_done_callback.func != NULL) { *err_no = cp->connect_done_callback.func(node->conn, @@ -279,11 +366,8 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, } if (*err_no != 0) { - if (node->conn->sock >= 0) - { - close(node->conn->sock); - node->conn->sock = -1; - } + G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + close_connection(node->conn); pthread_mutex_lock(&cm->lock); cm->total_count--; //rollback fast_mblock_free_object(&cp->node_allocator, node); @@ -340,17 +424,16 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, { cm->total_count--; - logDebug("file: "__FILE__", line: %d, " \ - "server %s:%u, connection: %d idle " \ - "time: %d exceeds max idle time: %d, "\ - "total_count: %d, free_count: %d", \ - __LINE__, conn->ip_addr, conn->port, \ - ci->sock, \ - (int)(current_time - node->atime), \ - cp->max_idle_time, cm->total_count, \ - cm->free_count); + logDebug("file: "__FILE__", line: %d, " + "server %s:%u, connection: %d idle " + "time: %d exceeds max idle time: %d, " + "total_count: %d, free_count: %d", __LINE__, + conn->ip_addr, conn->port, ci->sock, (int) + (current_time - node->atime), cp->max_idle_time, + cm->total_count, cm->free_count); - conn_pool_disconnect_server(ci); + G_COMMON_CONNECTION_CALLBACKS[ci->comm_type]. + close_connection(ci); fast_mblock_free_object(&cp->node_allocator, node); continue; } @@ -367,18 +450,84 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, } } -int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, - const bool bForce) +ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, + const ConnectionInfo *conn, const char *service_name, int *err_no) +{ + string_t key; + int bytes; + uint32_t hash_code; + ConnectionNode **bucket; + ConnectionNode *node; + ConnectionInfo *ci; + char key_buff[INET6_ADDRSTRLEN + 8]; + ConnectionThreadHashTable *htable; + + key.str = key_buff; + conn_pool_get_key(conn, key.str, &key.len); + if (!cp->extra_params.tls.enabled) { + return get_connection(cp, conn, &key, service_name, err_no); + } + + htable = pthread_getspecific(cp->tls_key); + if (htable == NULL) { + bytes = sizeof(ConnectionThreadHashTable) + sizeof(ConnectionNode *) * + cp->extra_params.tls.htable_capacity; + htable = fc_malloc(bytes); + memset(htable, 0, bytes); + + htable->buckets = (ConnectionNode **)(htable + 1); + htable->cp = cp; + if ((*err_no=pthread_setspecific(cp->tls_key, htable)) != 0) { + logError("file: "__FILE__", line: %d, " + "pthread_setspecific fail, errno: %d, error info: %s", + __LINE__, *err_no, STRERROR(*err_no)); + return NULL; + } + } + + hash_code = fc_simple_hash(key.str, key.len); + bucket = htable->buckets + hash_code % cp-> + extra_params.tls.htable_capacity; + if (*bucket == NULL) { + node = NULL; + } else if (FC_CONNECTION_SERVER_EQUAL1(*conn, *(*bucket)->conn)) { + node = *bucket; + } else { + node = (*bucket)->next; + while (node != NULL) { + if (FC_CONNECTION_SERVER_EQUAL1(*conn, *node->conn)) { + break; + } + node = node->next; + } + } + + if (node != NULL) { + *err_no = 0; + return node->conn; + } else { + if ((ci=get_connection(cp, conn, &key, service_name, + err_no)) == NULL) + { + return NULL; + } + + node = (ConnectionNode *)((char *)ci - sizeof(ConnectionNode)); + node->next = *bucket; + *bucket = node; + *err_no = 0; + return ci; + } +} + +static int close_connection(ConnectionPool *cp, ConnectionInfo *conn, + const string_t *key, const bool bForce) { - char key[INET6_ADDRSTRLEN + 8]; - int key_len; ConnectionManager *cm; ConnectionNode *node; - conn_pool_get_key(conn, key, &key_len); - pthread_mutex_lock(&cp->lock); - cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key, key_len); + cm = (ConnectionManager *)fc_hash_find(&cp->hash_array, key->str, key->len); pthread_mutex_unlock(&cp->lock); if (cm == NULL) { @@ -388,7 +537,7 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, return ENOENT; } - node = (ConnectionNode *)(((char *)conn) - sizeof(ConnectionNode)); + node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode)); if (node->manager != cm) { logError("file: "__FILE__", line: %d, " \ @@ -408,7 +557,8 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, __LINE__, conn->ip_addr, conn->port, conn->sock, cm->total_count, cm->free_count); - conn_pool_disconnect_server(conn); + G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + close_connection(conn); fast_mblock_free_object(&cp->node_allocator, node); node = cm->head; @@ -436,6 +586,64 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, return 0; } +int conn_pool_close_connection_ex(ConnectionPool *cp, + ConnectionInfo *conn, const bool bForce) +{ + string_t key; + uint32_t hash_code; + ConnectionNode **bucket; + ConnectionNode *previous; + ConnectionNode *node; + char key_buff[INET6_ADDRSTRLEN + 8]; + ConnectionThreadHashTable *htable; + + key.str = key_buff; + conn_pool_get_key(conn, key.str, &key.len); + if (!cp->extra_params.tls.enabled) { + return close_connection(cp, conn, &key, bForce); + } + + if (!bForce) { + return 0; + } + + htable = pthread_getspecific(cp->tls_key); + if (htable == NULL) { + return close_connection(cp, conn, &key, bForce); + } + + hash_code = fc_simple_hash(key.str, key.len); + bucket = htable->buckets + hash_code % cp-> + extra_params.tls.htable_capacity; + if (*bucket == NULL) { + node = NULL; + previous = NULL; + } else if ((*bucket)->conn == conn) { + node = *bucket; + previous = NULL; + } else { + previous = *bucket; + node = (*bucket)->next; + while (node != NULL) { + if (node->conn == conn) { + break; + } + previous = node; + node = node->next; + } + } + + if (node != NULL) { + if (previous == NULL) { + *bucket = node->next; + } else { + previous->next = node->next; + } + } + + return close_connection(cp, conn, &key, bForce); +} + static int _conn_count_walk(const int index, const HashData *data, void *args) { int *count; @@ -513,6 +721,7 @@ int conn_pool_parse_server_info(const char *pServerStr, pServerInfo->socket_domain = AF_UNSPEC; pServerInfo->sock = -1; + pServerInfo->comm_type = fc_comm_type_sock; return 0; } @@ -532,3 +741,157 @@ int conn_pool_load_server_info(IniContext *pIniContext, const char *filename, return conn_pool_parse_server_info(pServerStr, pServerInfo, default_port); } + +#define API_PREFIX_NAME "fast_rdma_client_" + +#define LOAD_API(callbacks, fname) \ + do { \ + callbacks.fname = dlsym(dlhandle, API_PREFIX_NAME#fname); \ + if (callbacks.fname == NULL) { \ + logError("file: "__FILE__", line: %d, " \ + "dlsym api %s fail, error info: %s", \ + __LINE__, API_PREFIX_NAME#fname, dlerror()); \ + return ENOENT; \ + } \ + } while (0) + +int conn_pool_global_init_for_rdma() +{ + const char *library = "libfastrdma.so"; + void *dlhandle; + + if (g_connection_callbacks.inited) { + return 0; + } + + dlhandle = dlopen(library, RTLD_LAZY); + if (dlhandle == NULL) { + logError("file: "__FILE__", line: %d, " + "dlopen %s fail, error info: %s", + __LINE__, library, dlerror()); + return EFAULT; + } + + LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma], + make_connection); + LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma], + close_connection); + LOAD_API(G_COMMON_CONNECTION_CALLBACKS[fc_comm_type_rdma], + is_connected); + + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, set_busy_polling); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, alloc_pd); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_connection_size); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, init_connection); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, make_connection); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, close_connection); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, destroy_connection); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, is_connected); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, send_done); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, get_recv_buffer); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf1); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_buf2); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_iov); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, request_by_mix); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, send_by_buf1); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, recv_data); + LOAD_API(G_RDMA_CONNECTION_CALLBACKS, post_recv); + + g_connection_callbacks.inited = true; + return 0; +} + +ConnectionInfo *conn_pool_alloc_connection_ex( + const FCCommunicationType comm_type, + const int extra_data_size, + const ConnectionExtraParams *extra_params, + int *err_no) +{ + ConnectionInfo *conn; + int bytes; + + if (comm_type == fc_comm_type_rdma) { + bytes = sizeof(ConnectionInfo) + extra_data_size + + G_RDMA_CONNECTION_CALLBACKS.get_connection_size(); + } else { + bytes = sizeof(ConnectionInfo) + extra_data_size; + } + if ((conn=fc_malloc(bytes)) == NULL) { + *err_no = ENOMEM; + return NULL; + } + memset(conn, 0, bytes); + + if (comm_type == fc_comm_type_rdma) { + conn->arg1 = conn->args + extra_data_size; + if ((*err_no=G_RDMA_CONNECTION_CALLBACKS.init_connection( + conn, extra_params->rdma.double_buffers, + extra_params->rdma.buffer_size, + extra_params->rdma.pd)) != 0) + { + free(conn); + return NULL; + } + } else { + *err_no = 0; + } + + conn->comm_type = comm_type; + conn->sock = -1; + return conn; +} + +int conn_pool_set_rdma_extra_params_ex(ConnectionExtraParams *extra_params, + struct fc_server_config *server_cfg, const int server_group_index, + const bool double_buffers) +{ + const int padding_size = 1024; + FCServerGroupInfo *server_group; + FCServerInfo *first_server; + int result; + + if ((server_group=fc_server_get_group_by_index(server_cfg, + server_group_index)) == NULL) + { + return ENOENT; + } + + switch (server_cfg->connection_thread_local) { + case fc_connection_thread_local_auto: + if (server_group->comm_type == fc_comm_type_sock) { + extra_params->tls.enabled = false; + } else { + extra_params->tls.enabled = (FC_SID_SERVER_COUNT( + *server_cfg) <= 64); + } + break; + case fc_connection_thread_local_yes: + extra_params->tls.enabled = true; + break; + default: + extra_params->tls.enabled = false; + break; + } + + if (extra_params->tls.enabled) { + extra_params->tls.htable_capacity = fc_ceil_prime( + FC_SID_SERVER_COUNT(*server_cfg)); + } else { + extra_params->tls.htable_capacity = 0; + } + + if (server_group->comm_type == fc_comm_type_sock) { + extra_params->rdma.double_buffers = false; + extra_params->rdma.buffer_size = 0; + extra_params->rdma.pd = NULL; + return 0; + } else { + first_server = FC_SID_SERVERS(*server_cfg); + extra_params->rdma.double_buffers = double_buffers; + extra_params->rdma.buffer_size = server_cfg->buffer_size + padding_size; + extra_params->rdma.pd = fc_alloc_rdma_pd(G_RDMA_CONNECTION_CALLBACKS. + alloc_pd, &first_server->group_addrs[server_group_index]. + address_array, &result); + return result; + } +} diff --git a/src/connection_pool.h b/src/connection_pool.h index b6ec8f5..32d7df0 100644 --- a/src/connection_pool.h +++ b/src/connection_pool.h @@ -40,16 +40,104 @@ extern "C" { (strcmp((conn1).ip_addr, (conn2).ip_addr) == 0 && \ (conn1).port == (conn2).port) -typedef struct -{ - int sock; - uint16_t port; +typedef enum { + fc_comm_type_sock = 0, + fc_comm_type_rdma, + fc_comm_type_both +} FCCommunicationType; + +typedef struct { + int sock; + uint16_t port; short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect + FCCommunicationType comm_type; bool validate_flag; //for connection pool - char ip_addr[IP_ADDRESS_SIZE]; + char ip_addr[IP_ADDRESS_SIZE]; + void *arg1; //for RDMA char args[0]; //for extra data } ConnectionInfo; +struct fc_server_config; +struct ibv_pd; +typedef void (*fc_set_busy_polling_callback)(const bool busy_polling); +typedef struct ibv_pd *(*fc_alloc_pd_callback)(const char **ip_addrs, + const int count, const int port); +typedef int (*fc_get_connection_size_callback)(); +typedef int (*fc_init_connection_callback)(ConnectionInfo *conn, + const bool double_buffers, const int buffer_size, void *arg); +typedef int (*fc_make_connection_callback)(ConnectionInfo *conn, + const char *service_name, const int timeout_ms, + const char *bind_ipaddr, const bool log_connect_error); +typedef bool (*fc_is_connected_callback)(ConnectionInfo *conn); +typedef bool (*fc_send_done_callback)(ConnectionInfo *conn); +typedef void (*fc_close_connection_callback)(ConnectionInfo *conn); +typedef void (*fc_destroy_connection_callback)(ConnectionInfo *conn); + +typedef BufferInfo *(*fc_rdma_get_recv_buffer_callback)(ConnectionInfo *conn); +typedef int (*fc_rdma_request_by_buf1_callback)(ConnectionInfo *conn, + const char *data, const int length, const int timeout_ms); +typedef int (*fc_rdma_request_by_buf2_callback)(ConnectionInfo *conn, + const char *data1, const int length1, const char *data2, + const int length2, const int timeout_ms); +typedef int (*fc_rdma_request_by_iov_callback)(ConnectionInfo *conn, + const struct iovec *iov, const int iovcnt, + const int timeout_ms); +typedef int (*fc_rdma_request_by_mix_callback)(ConnectionInfo *conn, + const char *data, const int length, const struct iovec *iov, + const int iovcnt, const int timeout_ms); +typedef int (*fc_rdma_send_by_buf1_callback)(ConnectionInfo *conn, + const char *data, const int length); +typedef int (*fc_rdma_recv_data_callback)(ConnectionInfo *conn, + const bool call_post_recv, const int timeout_ms); +typedef int (*fc_rdma_post_recv_callback)(ConnectionInfo *conn); + +typedef struct { + fc_make_connection_callback make_connection; + fc_close_connection_callback close_connection; + fc_is_connected_callback is_connected; +} CommonConnectionCallbacks; + +typedef struct { + fc_set_busy_polling_callback set_busy_polling; + fc_alloc_pd_callback alloc_pd; + fc_get_connection_size_callback get_connection_size; + fc_init_connection_callback init_connection; + fc_make_connection_callback make_connection; + fc_close_connection_callback close_connection; + fc_destroy_connection_callback destroy_connection; + fc_is_connected_callback is_connected; + fc_send_done_callback send_done; + + fc_rdma_get_recv_buffer_callback get_recv_buffer; + fc_rdma_request_by_buf1_callback request_by_buf1; + fc_rdma_request_by_buf2_callback request_by_buf2; + fc_rdma_request_by_iov_callback request_by_iov; + fc_rdma_request_by_mix_callback request_by_mix; + + fc_rdma_send_by_buf1_callback send_by_buf1; + fc_rdma_recv_data_callback recv_data; + fc_rdma_post_recv_callback post_recv; +} RDMAConnectionCallbacks; + +typedef struct { + bool inited; + CommonConnectionCallbacks common_callbacks[2]; + RDMAConnectionCallbacks rdma_callbacks; +} ConnectionCallbacks; + +typedef struct { + struct { + bool enabled; + int htable_capacity; + } tls; //for thread local + + struct { + bool double_buffers; + int buffer_size; + struct ibv_pd *pd; + } rdma; +} ConnectionExtraParams; + typedef int (*fc_connection_callback_func)(ConnectionInfo *conn, void *args); struct tagConnectionManager; @@ -68,10 +156,17 @@ typedef struct tagConnectionManager { pthread_mutex_t lock; } ConnectionManager; +struct tagConnectionPool; + +typedef struct { + ConnectionNode **buckets; + struct tagConnectionPool *cp; +} ConnectionThreadHashTable; + typedef struct tagConnectionPool { - HashArray hash_array; //key is ip:port, value is ConnectionManager + HashArray hash_array; //key is ip-port, value is ConnectionManager pthread_mutex_t lock; - int connect_timeout; + int connect_timeout_ms; int max_count_per_entry; //0 means no limit /* @@ -93,8 +188,19 @@ typedef struct tagConnectionPool { fc_connection_callback_func func; void *args; } validate_callback; + + int extra_data_size; + ConnectionExtraParams extra_params; + pthread_key_t tls_key; //for ConnectionThreadHashTable } ConnectionPool; +extern ConnectionCallbacks g_connection_callbacks; + +int conn_pool_global_init_for_rdma(); + +#define G_COMMON_CONNECTION_CALLBACKS g_connection_callbacks.common_callbacks +#define G_RDMA_CONNECTION_CALLBACKS g_connection_callbacks.rdma_callbacks + /** * init ex function * parameters: @@ -109,6 +215,7 @@ typedef struct tagConnectionPool { * validate_func: the validate connection callback * validate_args: the args for validate connection callback * extra_data_size: the extra data size of connection +* extra_params: for RDMA * return 0 for success, != 0 for error */ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, @@ -116,7 +223,7 @@ int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout, const int socket_domain, const int htable_init_capacity, fc_connection_callback_func connect_done_func, void *connect_done_args, fc_connection_callback_func validate_func, void *validate_args, - const int extra_data_size); + const int extra_data_size, const ConnectionExtraParams *extra_params); /** * init ex function @@ -134,9 +241,10 @@ static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, { const int htable_init_capacity = 0; const int extra_data_size = 0; + const ConnectionExtraParams *extra_params = NULL; return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, max_idle_time, socket_domain, htable_init_capacity, - NULL, NULL, NULL, NULL, extra_data_size); + NULL, NULL, NULL, NULL, extra_data_size, extra_params); } /** @@ -154,9 +262,10 @@ static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout, const int socket_domain = AF_UNSPEC; const int htable_init_capacity = 0; const int extra_data_size = 0; + const ConnectionExtraParams *extra_params = NULL; return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry, max_idle_time, socket_domain, htable_init_capacity, - NULL, NULL, NULL, NULL, extra_data_size); + NULL, NULL, NULL, NULL, extra_data_size, extra_params); } /** @@ -193,82 +302,84 @@ ConnectionInfo *conn_pool_get_connection_ex(ConnectionPool *cp, * bForce: set true to close the socket, else only push back to connection pool * return 0 for success, != 0 for error */ -int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn, - const bool bForce); +int conn_pool_close_connection_ex(ConnectionPool *cp, + ConnectionInfo *conn, const bool bForce); /** * disconnect from the server * parameters: -* pConnection: the connection +* conn: the connection * return 0 for success, != 0 for error */ -void conn_pool_disconnect_server(ConnectionInfo *pConnection); +void conn_pool_disconnect_server(ConnectionInfo *conn); + +bool conn_pool_is_connected(ConnectionInfo *conn); /** * connect to the server * parameters: * pConnection: the connection * service_name: the service name to log -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in milliseconds * bind_ipaddr: the ip address to bind, NULL or empty for any * log_connect_error: if log error info when connect fail * NOTE: pConnection->sock will be closed when it >= 0 before connect * return 0 for success, != 0 for error */ int conn_pool_connect_server_ex1(ConnectionInfo *conn, - const char *service_name, const int connect_timeout, + const char *service_name, const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error); /** * connect to the server * parameters: * pConnection: the connection -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in milliseconds * bind_ipaddr: the ip address to bind, NULL or empty for any * log_connect_error: if log error info when connect fail * NOTE: pConnection->sock will be closed when it >= 0 before connect * return 0 for success, != 0 for error */ static inline int conn_pool_connect_server_ex(ConnectionInfo *pConnection, - const int connect_timeout, const char *bind_ipaddr, + const int connect_timeout_ms, const char *bind_ipaddr, const bool log_connect_error) { const char *service_name = NULL; return conn_pool_connect_server_ex1(pConnection, service_name, - connect_timeout, bind_ipaddr, log_connect_error); + connect_timeout_ms, bind_ipaddr, log_connect_error); } /** * connect to the server * parameters: * pConnection: the connection -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in seconds * NOTE: pConnection->sock will be closed when it >= 0 before connect * return 0 for success, != 0 for error */ static inline int conn_pool_connect_server(ConnectionInfo *pConnection, - const int connect_timeout) + const int connect_timeout_ms) { const char *service_name = NULL; const char *bind_ipaddr = NULL; return conn_pool_connect_server_ex1(pConnection, service_name, - connect_timeout, bind_ipaddr, true); + connect_timeout_ms, bind_ipaddr, true); } /** * connect to the server * parameters: * pConnection: the connection -* connect_timeout: the connect timeout in seconds +* connect_timeout_ms: the connect timeout in seconds * return 0 for success, != 0 for error */ static inline int conn_pool_connect_server_anyway(ConnectionInfo *pConnection, - const int connect_timeout) + const int connect_timeout_ms) { const char *service_name = NULL; const char *bind_ipaddr = NULL; pConnection->sock = -1; return conn_pool_connect_server_ex1(pConnection, service_name, - connect_timeout, bind_ipaddr, true); + connect_timeout_ms, bind_ipaddr, true); } /** @@ -347,6 +458,55 @@ static inline int conn_pool_compare_ip_and_port(const char *ip1, return port1 - port2; } +ConnectionInfo *conn_pool_alloc_connection_ex( + const FCCommunicationType comm_type, + const int extra_data_size, + const ConnectionExtraParams *extra_params, + int *err_no); + +static inline ConnectionInfo *conn_pool_alloc_connection( + const FCCommunicationType comm_type, + const ConnectionExtraParams *extra_params, + int *err_no) +{ + const int extra_data_size = 0; + return conn_pool_alloc_connection_ex(comm_type, + extra_data_size, extra_params, err_no); +} + +static inline void conn_pool_free_connection(ConnectionInfo *conn) +{ + free(conn); +} + +int conn_pool_set_rdma_extra_params_ex(ConnectionExtraParams *extra_params, + struct fc_server_config *server_cfg, const int server_group_index, + const bool double_buffers); + +static inline int conn_pool_set_rdma_extra_params( + ConnectionExtraParams *extra_params, + struct fc_server_config *server_cfg, + const int server_group_index) +{ + const bool double_buffers = false; + return conn_pool_set_rdma_extra_params_ex(extra_params, + server_cfg, server_group_index, double_buffers); +} + +static inline const char *fc_comm_type_str(const FCCommunicationType type) +{ + switch (type) { + case fc_comm_type_sock: + return "socket"; + case fc_comm_type_rdma: + return "rdma"; + case fc_comm_type_both: + return "both"; + default: + return "unkown"; + } +} + #ifdef __cplusplus } #endif diff --git a/src/fast_task_queue.c b/src/fast_task_queue.c index 9baf905..86d9326 100644 --- a/src/fast_task_queue.c +++ b/src/fast_task_queue.c @@ -25,652 +25,188 @@ #include "fc_memory.h" #include "fast_task_queue.h" -static struct fast_task_queue g_free_queue; - -struct mpool_node { - struct fast_task_info *blocks; - struct fast_task_info *last_block; //last block - struct mpool_node *next; -}; - -struct mpool_chain { - struct mpool_node *head; - struct mpool_node *tail; -}; - -static struct mpool_chain g_mpool = {NULL, NULL}; - -int task_queue_init(struct fast_task_queue *pQueue) +static int task_alloc_init(struct fast_task_info *task, + struct fast_task_queue *queue) { - int result; - - if ((result=init_pthread_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "init_pthread_lock fail, errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - - pQueue->head = NULL; - pQueue->tail = NULL; - - return 0; -} - -static void free_mpool(struct mpool_node *mpool, char *end) -{ - char *pt; - for (pt=(char *)mpool->blocks; pt < end; pt += g_free_queue.block_size) - { - free(((struct fast_task_info *)pt)->data); - } - - free(mpool->blocks); - free(mpool); -} - -static struct mpool_node *malloc_mpool(const int total_alloc_size) -{ - struct fast_task_info *pTask; - char *p; - char *pCharEnd; - struct mpool_node *mpool; - - mpool = (struct mpool_node *)fc_malloc(sizeof(struct mpool_node)); - if (mpool == NULL) - { - return NULL; - } - - mpool->next = NULL; - mpool->blocks = (struct fast_task_info *)fc_malloc(total_alloc_size); - if (mpool->blocks == NULL) - { - free(mpool); - return NULL; - } - memset(mpool->blocks, 0, total_alloc_size); - - pCharEnd = ((char *)mpool->blocks) + total_alloc_size; - for (p=(char *)mpool->blocks; psize = g_free_queue.min_buff_size; - - pTask->arg = p + ALIGNED_TASK_INFO_SIZE; - if (g_free_queue.malloc_whole_block) - { - pTask->data = (char *)pTask->arg + \ - g_free_queue.arg_size; - } - else - { - pTask->data = (char *)fc_malloc(pTask->size); - if (pTask->data == NULL) - { - free_mpool(mpool, p); - return NULL; - } - } - - if (g_free_queue.init_callback != NULL) - { - if (g_free_queue.init_callback(pTask) != 0) - { - free_mpool(mpool, p); - return NULL; - } - } - } - - mpool->last_block = (struct fast_task_info *) - (pCharEnd - g_free_queue.block_size); - for (p=(char *)mpool->blocks; p<(char *)mpool->last_block; - p += g_free_queue.block_size) - { - pTask = (struct fast_task_info *)p; - pTask->next = (struct fast_task_info *)(p + g_free_queue.block_size); - } - mpool->last_block->next = NULL; - - return mpool; -} - -int free_queue_init_ex2(const int max_connections, const int init_connections, - const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int arg_size, - TaskInitCallback init_callback) -{ -#define MAX_DATA_SIZE (256 * 1024 * 1024) - int64_t total_size; - struct mpool_node *mpool; - int alloc_size; - int alloc_once; - int result; - int loop_count; - int aligned_min_size; - int aligned_max_size; - int aligned_arg_size; - rlim_t max_data_size; - - if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "init_pthread_lock fail, errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - - aligned_min_size = MEM_ALIGN(min_buff_size); - aligned_max_size = MEM_ALIGN(max_buff_size); - aligned_arg_size = MEM_ALIGN(arg_size); - g_free_queue.block_size = ALIGNED_TASK_INFO_SIZE + aligned_arg_size; - alloc_size = g_free_queue.block_size * init_connections; - if (aligned_max_size > aligned_min_size) - { - total_size = alloc_size; - g_free_queue.malloc_whole_block = false; - max_data_size = 0; - } - else - { - struct rlimit rlimit_data; - - if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) - { - logError("file: "__FILE__", line: %d, " \ - "call getrlimit fail, " \ - "errno: %d, error info: %s", \ - __LINE__, errno, STRERROR(errno)); - return errno != 0 ? errno : EPERM; - } - if (rlimit_data.rlim_cur == RLIM_INFINITY) - { - max_data_size = MAX_DATA_SIZE; - } - else - { - max_data_size = rlimit_data.rlim_cur; - if (max_data_size > MAX_DATA_SIZE) - { - max_data_size = MAX_DATA_SIZE; - } - } - - if (max_data_size >= (int64_t)(g_free_queue.block_size + - aligned_min_size) * (int64_t)init_connections) - { - total_size = alloc_size + (int64_t)aligned_min_size * - init_connections; - g_free_queue.malloc_whole_block = true; - g_free_queue.block_size += aligned_min_size; - } - else - { - total_size = alloc_size; - g_free_queue.malloc_whole_block = false; - max_data_size = 0; - } - } - - g_free_queue.max_connections = max_connections; - g_free_queue.alloc_connections = init_connections; - if (alloc_task_once <= 0) - { - g_free_queue.alloc_task_once = 256; - alloc_once = MAX_DATA_SIZE / g_free_queue.block_size; - if (g_free_queue.alloc_task_once > alloc_once) - { - g_free_queue.alloc_task_once = alloc_once > 0 ? alloc_once : 1; - } - } - else - { - g_free_queue.alloc_task_once = alloc_task_once; - } - g_free_queue.min_buff_size = aligned_min_size; - g_free_queue.max_buff_size = aligned_max_size; - g_free_queue.arg_size = aligned_arg_size; - g_free_queue.init_callback = init_callback; - - logDebug("file: "__FILE__", line: %d, " - "max_connections: %d, init_connections: %d, alloc_task_once: %d, " - "min_buff_size: %d, max_buff_size: %d, block_size: %d, " - "arg_size: %d, max_data_size: %d, total_size: %"PRId64, - __LINE__, max_connections, init_connections, - g_free_queue.alloc_task_once, aligned_min_size, aligned_max_size, - g_free_queue.block_size, aligned_arg_size, (int)max_data_size, total_size); - - if ((!g_free_queue.malloc_whole_block) || (total_size <= max_data_size)) - { - loop_count = 1; - mpool = malloc_mpool(total_size); - if (mpool == NULL) - { - return errno != 0 ? errno : ENOMEM; - } - g_mpool.head = mpool; - g_mpool.tail = mpool; - } - else - { - int remain_count; - int alloc_count; - int current_alloc_size; - - loop_count = 0; - remain_count = init_connections; - alloc_once = max_data_size / g_free_queue.block_size; - while (remain_count > 0) - { - alloc_count = (remain_count > alloc_once) ? - alloc_once : remain_count; - current_alloc_size = g_free_queue.block_size * alloc_count; - mpool = malloc_mpool(current_alloc_size); - if (mpool == NULL) - { - free_queue_destroy(); - return errno != 0 ? errno : ENOMEM; - } - - if (g_mpool.tail == NULL) - { - g_mpool.head = mpool; - } - else - { - g_mpool.tail->next = mpool; - g_mpool.tail->last_block->next = mpool->blocks; //link previous mpool to current - } - g_mpool.tail = mpool; - - remain_count -= alloc_count; - loop_count++; - } - - logDebug("file: "__FILE__", line: %d, " \ - "alloc_once: %d", __LINE__, alloc_once); - } - - logDebug("file: "__FILE__", line: %d, " \ - "malloc task info as whole: %d, malloc loop count: %d", \ - __LINE__, g_free_queue.malloc_whole_block, loop_count); - - if (g_mpool.head != NULL) - { - g_free_queue.head = g_mpool.head->blocks; - g_free_queue.tail = g_mpool.tail->last_block; - } - - return 0; -} - -void free_queue_destroy() -{ - struct mpool_node *mpool; - struct mpool_node *mp; - - if (g_mpool.head == NULL) - { - return; - } - - if (!g_free_queue.malloc_whole_block) - { - char *p; - char *pCharEnd; - struct fast_task_info *pTask; - - mpool = g_mpool.head; - while (mpool != NULL) - { - pCharEnd = (char *)mpool->last_block + g_free_queue.block_size; - for (p=(char *)mpool->blocks; pdata != NULL) - { - free(pTask->data); - pTask->data = NULL; - } - } - mpool = mpool->next; - } - } - - mpool = g_mpool.head; - while (mpool != NULL) - { - mp = mpool; - mpool = mpool->next; - - free(mp->blocks); - free(mp); - } - g_mpool.head = g_mpool.tail = NULL; - - pthread_mutex_destroy(&(g_free_queue.lock)); -} - -static int free_queue_realloc() -{ - struct mpool_node *mpool; - struct fast_task_info *head; - struct fast_task_info *tail; - int remain_count; - int alloc_count; - int current_alloc_size; - - head = tail = NULL; - remain_count = g_free_queue.max_connections - - g_free_queue.alloc_connections; - alloc_count = (remain_count > g_free_queue.alloc_task_once) ? - g_free_queue.alloc_task_once : remain_count; - if (alloc_count > 0) - { - current_alloc_size = g_free_queue.block_size * alloc_count; - mpool = malloc_mpool(current_alloc_size); - if (mpool == NULL) - { + task->arg = (char *)task + ALIGNED_TASK_INFO_SIZE + queue->padding_size; + task->send.ptr = &task->send.holder; + task->send.ptr->size = queue->min_buff_size; + if (queue->malloc_whole_block) { + task->send.ptr->data = (char *)task->arg + queue->arg_size; + } else { + task->send.ptr->data = (char *)fc_malloc(task->send.ptr->size); + if (task->send.ptr->data == NULL) { return ENOMEM; - } - - if (g_mpool.tail == NULL) - { - g_mpool.head = mpool; - } - else - { - g_mpool.tail->next = mpool; - } - g_mpool.tail = mpool; - - head = mpool->blocks; - tail = mpool->last_block; - - remain_count -= alloc_count; - } - else { - return ENOSPC; + } } - if (g_free_queue.head == NULL) - { - g_free_queue.head = head; + if (queue->double_buffers) { + task->recv.ptr = &task->recv.holder; + task->recv.ptr->size = queue->min_buff_size; + task->recv.ptr->data = (char *)fc_malloc(task->recv.ptr->size); + if (task->recv.ptr->data == NULL) { + return ENOMEM; + } + } else { + task->recv.ptr = &task->send.holder; } - if (g_free_queue.tail != NULL) - { - g_free_queue.tail->next = head; + + task->free_queue = queue; + if (queue->init_callback != NULL) { + return queue->init_callback(task); } - g_free_queue.tail = tail; - - g_free_queue.alloc_connections += alloc_count; - - logDebug("file: "__FILE__", line: %d, " - "alloc_connections: %d, realloc %d elements", __LINE__, - g_free_queue.alloc_connections, alloc_count); - return 0; } -struct fast_task_info *free_queue_pop() +int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, + const bool double_buffers, const int max_connections, + const int alloc_task_once, const int min_buff_size, + const int max_buff_size, const int padding_size, + const int arg_size, TaskInitCallback init_callback) { - struct fast_task_info *pTask; - int i; +#define MAX_DATA_SIZE (256 * 1024 * 1024) + int alloc_once; + int aligned_min_size; + int aligned_max_size; + int aligned_padding_size; + int aligned_arg_size; + rlim_t max_data_size; + char aname[64]; - if ((pTask=task_queue_pop(&g_free_queue)) != NULL) - { - return pTask; + aligned_min_size = MEM_ALIGN(min_buff_size); + aligned_max_size = MEM_ALIGN(max_buff_size); + aligned_padding_size = MEM_ALIGN(padding_size); + aligned_arg_size = MEM_ALIGN(arg_size); + queue->block_size = ALIGNED_TASK_INFO_SIZE + + aligned_padding_size + aligned_arg_size; + if (alloc_task_once <= 0) { + alloc_once = FC_MIN(MAX_DATA_SIZE / queue->block_size, 256); + if (alloc_once == 0) { + alloc_once = 1; + } + } else { + alloc_once = alloc_task_once; } - if (g_free_queue.alloc_connections >= g_free_queue.max_connections) - { - return NULL; - } + if (aligned_max_size > aligned_min_size) { + queue->malloc_whole_block = false; + max_data_size = 0; + } else { + struct rlimit rlimit_data; - for (i=0; i<10; i++) - { - pthread_mutex_lock(&g_free_queue.lock); - if (g_free_queue.alloc_connections >= g_free_queue.max_connections) - { - if (g_free_queue.head == NULL) - { - pthread_mutex_unlock(&g_free_queue.lock); - return NULL; + if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) { + logError("file: "__FILE__", line: %d, " + "call getrlimit fail, " + "errno: %d, error info: %s", + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EPERM; + } + if (rlimit_data.rlim_cur == RLIM_INFINITY) { + max_data_size = MAX_DATA_SIZE; + } else { + max_data_size = rlimit_data.rlim_cur; + if (max_data_size > MAX_DATA_SIZE) { + max_data_size = MAX_DATA_SIZE; } } - else - { - if (g_free_queue.head == NULL && free_queue_realloc() != 0) - { - pthread_mutex_unlock(&g_free_queue.lock); - return NULL; - } - } - pthread_mutex_unlock(&g_free_queue.lock); - if ((pTask=task_queue_pop(&g_free_queue)) != NULL) + if (max_data_size >= (int64_t)(queue->block_size + + aligned_min_size) * (int64_t)alloc_once) { - return pTask; + queue->malloc_whole_block = true; + queue->block_size += aligned_min_size; + } else { + queue->malloc_whole_block = false; + max_data_size = 0; } } - return NULL; + queue->double_buffers = double_buffers; + queue->min_buff_size = aligned_min_size; + queue->max_buff_size = aligned_max_size; + queue->padding_size = aligned_padding_size; + queue->arg_size = aligned_arg_size; + queue->init_callback = init_callback; + queue->release_callback = NULL; + + /* + logInfo("file: "__FILE__", line: %d, [%s] double_buffers: %d, " + "max_connections: %d, alloc_once: %d, malloc_whole_block: %d, " + "min_buff_size: %d, max_buff_size: %d, block_size: %d, " + "padding_size: %d, arg_size: %d, max_data_size: %"PRId64, + __LINE__, name, double_buffers, max_connections, alloc_once, + queue->malloc_whole_block, aligned_min_size, aligned_max_size, + queue->block_size, aligned_padding_size, aligned_arg_size, + (int64_t)max_data_size); + */ + + snprintf(aname, sizeof(aname), "%s-task", name); + return fast_mblock_init_ex1(&queue->allocator, aname, + queue->block_size, alloc_once, max_connections, + (fast_mblock_object_init_func)task_alloc_init, + queue, true); } -static int _realloc_buffer(struct fast_task_info *pTask, const int new_size, - const bool copy_data) +void free_queue_destroy(struct fast_task_queue *queue) +{ + fast_mblock_destroy(&queue->allocator); +} + +static int _realloc_buffer(struct fast_net_buffer *buffer, + const int new_size, const bool copy_data) { char *new_buff; new_buff = (char *)fc_malloc(new_size); - if (new_buff == NULL) - { + if (new_buff == NULL) { return ENOMEM; } - else - { - if (copy_data && pTask->offset > 0) { - memcpy(new_buff, pTask->data, pTask->offset); - } - free(pTask->data); - pTask->size = new_size; - pTask->data = new_buff; - return 0; + + if (copy_data && buffer->offset > 0) { + memcpy(new_buff, buffer->data, buffer->offset); } + free(buffer->data); + buffer->size = new_size; + buffer->data = new_buff; + return 0; } -int free_queue_push(struct fast_task_info *pTask) +void free_queue_push(struct fast_task_info *task) { - int result; + if (task->free_queue->release_callback != NULL) { + task->free_queue->release_callback(task); + } - *(pTask->client_ip) = '\0'; - pTask->length = 0; - pTask->offset = 0; - pTask->req_count = 0; + *(task->client_ip) = '\0'; + task->send.ptr->length = 0; + task->send.ptr->offset = 0; + task->req_count = 0; + if (task->send.ptr->size > task->free_queue->min_buff_size) {//need thrink + _realloc_buffer(task->send.ptr, task->free_queue->min_buff_size, false); + } - if (pTask->size > g_free_queue.min_buff_size) //need thrink - { - _realloc_buffer(pTask, g_free_queue.min_buff_size, false); - } + if (task->free_queue->double_buffers) { + task->recv.ptr->length = 0; + task->recv.ptr->offset = 0; + if (task->recv.ptr->size > task->free_queue->min_buff_size) { + _realloc_buffer(task->recv.ptr, task->free_queue-> + min_buff_size, false); + } + } - if ((result=pthread_mutex_lock(&g_free_queue.lock)) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - pTask->next = g_free_queue.head; - g_free_queue.head = pTask; - if (g_free_queue.tail == NULL) - { - g_free_queue.tail = pTask; - } - - if ((result=pthread_mutex_unlock(&g_free_queue.lock)) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return result; + fast_mblock_free_object(&task->free_queue->allocator, task); } -int free_queue_count() -{ - return task_queue_count(&g_free_queue); -} - -int free_queue_alloc_connections() -{ - return g_free_queue.alloc_connections; -} - -int free_queue_set_buffer_size(struct fast_task_info *pTask, - const int expect_size) -{ - return task_queue_set_buffer_size(&g_free_queue, pTask, expect_size); -} - -int free_queue_realloc_buffer(struct fast_task_info *pTask, - const int expect_size) -{ - return task_queue_realloc_buffer(&g_free_queue, pTask, expect_size); -} - -int free_queue_set_max_buffer_size(struct fast_task_info *pTask) -{ - return task_queue_set_buffer_size(&g_free_queue, pTask, - g_free_queue.max_buff_size); -} - -int free_queue_realloc_max_buffer(struct fast_task_info *pTask) -{ - return task_queue_realloc_buffer(&g_free_queue, pTask, - g_free_queue.max_buff_size); -} -int task_queue_push(struct fast_task_queue *pQueue, \ - struct fast_task_info *pTask) -{ - int result; - - if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - - pTask->next = NULL; - if (pQueue->tail == NULL) - { - pQueue->head = pTask; - } - else - { - pQueue->tail->next = pTask; - } - pQueue->tail = pTask; - - if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return 0; -} - -struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue) -{ - struct fast_task_info *pTask; - int result; - - if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return NULL; - } - - pTask = pQueue->head; - if (pTask != NULL) - { - pQueue->head = pTask->next; - if (pQueue->head == NULL) - { - pQueue->tail = NULL; - } - } - - if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return pTask; -} - -int task_queue_count(struct fast_task_queue *pQueue) -{ - struct fast_task_info *pTask; - int count; - int result; - - if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return 0; - } - - count = 0; - pTask = pQueue->head; - while (pTask != NULL) - { - pTask = pTask->next; - count++; - } - - if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return count; -} - -int task_queue_get_new_buffer_size(const int min_buff_size, +int free_queue_get_new_buffer_size(const int min_buff_size, const int max_buff_size, const int expect_size, int *new_size) { - if (min_buff_size == max_buff_size) - { + if (min_buff_size == max_buff_size) { logError("file: "__FILE__", line: %d, " "can't change buffer size because NOT supported", __LINE__); return EOPNOTSUPP; } - if (expect_size > max_buff_size) - { + if (expect_size > max_buff_size) { logError("file: "__FILE__", line: %d, " "can't change buffer size because expect buffer size: %d " "exceeds max buffer size: %d", __LINE__, expect_size, @@ -679,14 +215,11 @@ int task_queue_get_new_buffer_size(const int min_buff_size, } *new_size = min_buff_size; - if (expect_size > min_buff_size) - { - while (*new_size < expect_size) - { + if (expect_size > min_buff_size) { + while (*new_size < expect_size) { *new_size *= 2; } - if (*new_size > max_buff_size) - { + if (*new_size > max_buff_size) { *new_size = max_buff_size; } } @@ -694,41 +227,43 @@ int task_queue_get_new_buffer_size(const int min_buff_size, return 0; } -#define _get_new_buffer_size(pQueue, expect_size, new_size) \ - task_queue_get_new_buffer_size(pQueue->min_buff_size, \ - pQueue->max_buff_size, expect_size, new_size) +#define _get_new_buffer_size(queue, expect_size, new_size) \ + free_queue_get_new_buffer_size(queue->min_buff_size, \ + queue->max_buff_size, expect_size, new_size) -int task_queue_set_buffer_size(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size) +int free_queue_set_buffer_size(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size) { int result; int new_size; - if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) { + if ((result=_get_new_buffer_size(task->free_queue, + expect_size, &new_size)) != 0) + { return result; } - if (pTask->size == new_size) //do NOT need change buffer size - { + if (buffer->size == new_size) { //do NOT need change buffer size return 0; } - return _realloc_buffer(pTask, new_size, false); + return _realloc_buffer(buffer, new_size, false); } -int task_queue_realloc_buffer(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size) +int free_queue_realloc_buffer(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size) { int result; int new_size; - if (pTask->size >= expect_size) //do NOT need change buffer size - { + if (buffer->size >= expect_size) { //do NOT need change buffer size return 0; } - if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) { + if ((result=_get_new_buffer_size(task->free_queue, + expect_size, &new_size)) != 0) + { return result; } - return _realloc_buffer(pTask, new_size, true); + return _realloc_buffer(buffer, new_size, true); } diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index bebeafe..3e0f6f5 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -23,8 +23,10 @@ #include #include #include "common_define.h" +#include "fc_list.h" #include "ioevent.h" #include "fast_timer.h" +#include "fast_mblock.h" #define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0] #define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1] @@ -35,13 +37,15 @@ struct nio_thread_data; struct fast_task_info; typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData); -typedef int (*TaskFinishCallback) (struct fast_task_info *pTask); -typedef void (*TaskCleanUpCallback) (struct fast_task_info *pTask); -typedef int (*TaskInitCallback)(struct fast_task_info *pTask); +typedef int (*TaskFinishCallback) (struct fast_task_info *task); +typedef void (*TaskCleanUpCallback) (struct fast_task_info *task); +typedef int (*TaskInitCallback)(struct fast_task_info *task); +typedef void (*TaskReleaseCallback)(struct fast_task_info *task); typedef void (*IOEventCallback) (int sock, short event, void *arg); typedef int (*TaskContinueCallback)(struct fast_task_info *task); +struct sf_network_handler; struct fast_task_info; typedef struct ioevent_entry @@ -58,6 +62,7 @@ struct nio_thread_data int pipe_fds[2]; //for notify struct fast_task_info *deleted_list; //tasks for cleanup ThreadLoopCallback thread_loop_callback; + ThreadLoopCallback busy_polling_callback; void *arg; //extra argument pointer struct { struct fast_task_info *head; @@ -69,6 +74,9 @@ struct nio_thread_data bool enabled; volatile int64_t counter; } notify; //for thread notify + + int timeout_ms; //for restore + struct fc_list_head polling_queue; //for RDMA busy polling }; struct ioevent_notify_entry @@ -77,15 +85,29 @@ struct ioevent_notify_entry struct nio_thread_data *thread_data; }; +struct fast_net_buffer +{ + int size; //alloc size + int length; //data length + int offset; //current offset + char *data; //buffer for write or read +}; + +struct fast_net_buffer_wrapper +{ + struct fast_net_buffer holder; + struct fast_net_buffer *ptr; +}; + +struct fast_task_queue; struct fast_task_info { - IOEventEntry event; //must first + IOEventEntry event; //must first union { char server_ip[IP_ADDRESS_SIZE]; char client_ip[IP_ADDRESS_SIZE]; }; - void *arg; //extra argument pointer - char *data; //buffer for write or read + void *arg; //extra argument pointer char *recv_body; //for extra (dynamic) recv buffer struct { @@ -93,9 +115,9 @@ struct fast_task_info int count; } iovec_array; //for writev - int size; //alloc size - int length; //data length - int offset; //current offset + struct fast_net_buffer_wrapper send; //send buffer + struct fast_net_buffer_wrapper recv; //recv buffer + uint16_t port; //peer port struct { uint8_t current; @@ -105,83 +127,163 @@ struct fast_task_info volatile int8_t canceled; //if task canceled short connect_timeout; //for client side short network_timeout; - int64_t req_count; //request count + int pending_send_count; + int64_t req_count; //request count + struct { + int64_t last_req_count; + uint32_t last_calc_time; + uint16_t continuous_count; + bool in_queue; + struct fc_list_head dlink; //for polling queue + } polling; //for RDMA busy polling TaskContinueCallback continue_callback; //for continue stage - TaskFinishCallback finish_callback; - struct nio_thread_data *thread_data; - void *ctx; //context pointer for libserverframe nio - struct fast_task_info *next; //for free queue and deleted list - struct fast_task_info *notify_next; //for nio notify queue + TaskFinishCallback finish_callback; + struct nio_thread_data *thread_data; + struct sf_network_handler *handler; //network handler for libserverframe nio + struct fast_task_info *next; //for free queue and deleted list + struct fast_task_info *notify_next; //for nio notify queue + struct fast_task_queue *free_queue; //task allocator + char conn[0]; //for RDMA connection }; struct fast_task_queue { - struct fast_task_info *head; - struct fast_task_info *tail; - pthread_mutex_t lock; - int max_connections; - int alloc_connections; - int alloc_task_once; - int min_buff_size; - int max_buff_size; - int arg_size; - int block_size; - bool malloc_whole_block; + int min_buff_size; + int max_buff_size; + int padding_size; //for last field: conn[0] + int arg_size; //for arg pointer + int block_size; + bool malloc_whole_block; + bool double_buffers; //if send buffer and recv buffer are independent + struct fast_mblock_man allocator; TaskInitCallback init_callback; + TaskReleaseCallback release_callback; }; #ifdef __cplusplus extern "C" { #endif -int free_queue_init_ex2(const int max_connections, const int init_connections, +int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, + const bool double_buffers, const int max_connections, const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int arg_size, - TaskInitCallback init_callback); + const int max_buff_size, const int padding_size, + const int arg_size, TaskInitCallback init_callback); -static inline int free_queue_init_ex(const int max_connections, - const int init_connections, const int alloc_task_once, - const int min_buff_size, const int max_buff_size, const int arg_size) +static inline int free_queue_init_ex(struct fast_task_queue *queue, + const char *name, const bool double_buffers, + const int max_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, + const int arg_size) { - return free_queue_init_ex2(max_connections, init_connections, - alloc_task_once, min_buff_size, max_buff_size, arg_size, NULL); + const int padding_size = 0; + return free_queue_init_ex2(queue, name, double_buffers, max_connections, + alloc_task_once, min_buff_size, max_buff_size, padding_size, + arg_size, NULL); } -static inline int free_queue_init(const int max_connections, - const int min_buff_size, const int max_buff_size, const int arg_size) +static inline void free_queue_set_release_callback( + struct fast_task_queue *queue, + TaskReleaseCallback callback) { - return free_queue_init_ex2(max_connections, max_connections, - 0, min_buff_size, max_buff_size, arg_size, NULL); + queue->release_callback = callback; } -void free_queue_destroy(); +void free_queue_destroy(struct fast_task_queue *queue); -int free_queue_push(struct fast_task_info *pTask); -struct fast_task_info *free_queue_pop(); -int free_queue_count(); -int free_queue_alloc_connections(); -int free_queue_set_buffer_size(struct fast_task_info *pTask, - const int expect_size); -int free_queue_realloc_buffer(struct fast_task_info *pTask, - const int expect_size); +static inline struct fast_task_info *free_queue_pop( + struct fast_task_queue *queue) +{ + return fast_mblock_alloc_object(&queue->allocator); +} -int free_queue_set_max_buffer_size(struct fast_task_info *pTask); +void free_queue_push(struct fast_task_info *task); -int free_queue_realloc_max_buffer(struct fast_task_info *pTask); +static inline int free_queue_count(struct fast_task_queue *queue) +{ + return queue->allocator.info.element_total_count - + queue->allocator.info.element_used_count; +} -int task_queue_init(struct fast_task_queue *pQueue); -int task_queue_push(struct fast_task_queue *pQueue, \ - struct fast_task_info *pTask); -struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue); -int task_queue_count(struct fast_task_queue *pQueue); -int task_queue_set_buffer_size(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size); -int task_queue_realloc_buffer(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size); +static inline int free_queue_alloc_connections(struct fast_task_queue *queue) +{ + return queue->allocator.info.element_total_count; +} -int task_queue_get_new_buffer_size(const int min_buff_size, +int free_queue_get_new_buffer_size(const int min_buff_size, const int max_buff_size, const int expect_size, int *new_size); +int free_queue_set_buffer_size(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size); + +static inline int free_queue_set_max_buffer_size( + struct fast_task_info *task, + struct fast_net_buffer *buffer) +{ + return free_queue_set_buffer_size(task, buffer, + task->free_queue->max_buff_size); +} + +int free_queue_realloc_buffer(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size); + +static inline int free_queue_realloc_max_buffer( + struct fast_task_info *task, + struct fast_net_buffer *buffer) +{ + return free_queue_realloc_buffer(task, buffer, + task->free_queue->max_buff_size); +} + +/* send and recv buffer operations */ +static inline int free_queue_set_send_buffer_size( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_set_buffer_size(task, task->send.ptr, expect_size); +} + +static inline int free_queue_set_recv_buffer_size( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_set_buffer_size(task, task->recv.ptr, expect_size); +} + +static inline int free_queue_set_send_max_buffer_size( + struct fast_task_info *task) +{ + return free_queue_set_max_buffer_size(task, task->send.ptr); +} + +static inline int free_queue_set_recv_max_buffer_size( + struct fast_task_info *task) +{ + return free_queue_set_max_buffer_size(task, task->recv.ptr); +} + +static inline int free_queue_realloc_send_buffer( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_realloc_buffer(task, task->send.ptr, expect_size); +} + +static inline int free_queue_realloc_recv_buffer( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_realloc_buffer(task, task->recv.ptr, expect_size); +} + +static inline int free_queue_realloc_send_max_buffer( + struct fast_task_info *task) +{ + return free_queue_realloc_max_buffer(task, task->send.ptr); +} + +static inline int free_queue_realloc_recv_max_buffer( + struct fast_task_info *task) +{ + return free_queue_realloc_max_buffer(task, task->recv.ptr); +} + #ifdef __cplusplus } #endif diff --git a/src/fc_queue.c b/src/fc_queue.c index c8f30c4..2a40c45 100644 --- a/src/fc_queue.c +++ b/src/fc_queue.c @@ -50,10 +50,53 @@ void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify) *notify = false; } queue->tail = data; - PTHREAD_MUTEX_UNLOCK(&queue->lcp.lock); } +static inline bool fc_queue_exists(struct fc_queue *queue, void *data) +{ + void *current; + if (queue->head == NULL) { + return false; + } + + current = queue->head; + do { + if (current == data) { + return true; + } + current = FC_QUEUE_NEXT_PTR(queue, current); + } while (current != NULL); + + return false; +} + +int fc_queue_push_with_check_ex(struct fc_queue *queue, + void *data, bool *notify) +{ + int result; + + PTHREAD_MUTEX_LOCK(&queue->lcp.lock); + if (fc_queue_exists(queue, data)) { + result = EEXIST; + *notify = false; + } else { + result = 0; + FC_QUEUE_NEXT_PTR(queue, data) = NULL; + if (queue->tail == NULL) { + queue->head = data; + *notify = true; + } else { + FC_QUEUE_NEXT_PTR(queue, queue->tail) = data; + *notify = false; + } + queue->tail = data; + } + PTHREAD_MUTEX_UNLOCK(&queue->lcp.lock); + + return result; +} + void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked) { void *data; diff --git a/src/fc_queue.h b/src/fc_queue.h index 1533435..7765811 100644 --- a/src/fc_queue.h +++ b/src/fc_queue.h @@ -68,6 +68,8 @@ static inline void fc_queue_terminate_all( //notify by the caller void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify); +int fc_queue_push_with_check_ex(struct fc_queue *queue, + void *data, bool *notify); static inline void fc_queue_push(struct fc_queue *queue, void *data) { @@ -79,6 +81,19 @@ static inline void fc_queue_push(struct fc_queue *queue, void *data) } } +static inline int fc_queue_push_with_check(struct fc_queue *queue, void *data) +{ + int result; + bool notify; + + result = fc_queue_push_with_check_ex(queue, data, ¬ify); + if (notify) { + pthread_cond_signal(&(queue->lcp.cond)); + } + + return result; +} + static inline void fc_queue_push_silence(struct fc_queue *queue, void *data) { bool notify; @@ -171,6 +186,16 @@ static inline int fc_queue_count(struct fc_queue *queue) return count; } +static inline void *fc_queue_peek(struct fc_queue *queue) +{ + void *data; + + pthread_mutex_lock(&queue->lcp.lock); + data = queue->head; + pthread_mutex_unlock(&queue->lcp.lock); + return data; +} + void *fc_queue_timedpop(struct fc_queue *queue, const int timeout, const int time_unit); diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 83fe22d..6feb4f2 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -91,7 +91,7 @@ static void deal_timeouts(FastTimerEntry *head) } } -int ioevent_loop(struct nio_thread_data *pThreadData, +int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag) { @@ -102,15 +102,17 @@ int ioevent_loop(struct nio_thread_data *pThreadData, time_t last_check_time; int save_extra_events; int count; + uint32_t sched_counter; + bool sched_pull; memset(&ev_notify, 0, sizeof(ev_notify)); - ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData); + ev_notify.event.fd = FC_NOTIFY_READ_FD(thread_data); ev_notify.event.callback = recv_notify_callback; - ev_notify.thread_data = pThreadData; + ev_notify.thread_data = thread_data; - save_extra_events = pThreadData->ev_puller.extra_events; - pThreadData->ev_puller.extra_events = 0; //disable edge trigger temporarily - if (ioevent_attach(&pThreadData->ev_puller, ev_notify. + save_extra_events = thread_data->ev_puller.extra_events; + thread_data->ev_puller.extra_events = 0; //disable edge trigger temporarily + if (ioevent_attach(&thread_data->ev_puller, ev_notify. event.fd, IOEVENT_READ, &ev_notify) != 0) { result = errno != 0 ? errno : ENOMEM; @@ -119,39 +121,67 @@ int ioevent_loop(struct nio_thread_data *pThreadData, __LINE__, result, STRERROR(result)); return result; } - pThreadData->ev_puller.extra_events = save_extra_events; //restore + thread_data->ev_puller.extra_events = save_extra_events; //restore - pThreadData->deleted_list = NULL; + sched_counter = 0; + thread_data->deleted_list = NULL; last_check_time = g_current_time; while (*continue_flag) { - pThreadData->ev_puller.iterator.count = ioevent_poll( - &pThreadData->ev_puller); - if (pThreadData->ev_puller.iterator.count > 0) - { - deal_ioevents(&pThreadData->ev_puller); - } - else if (pThreadData->ev_puller.iterator.count < 0) - { - result = errno != 0 ? errno : EINVAL; - if (result != EINTR) - { - logError("file: "__FILE__", line: %d, " \ - "ioevent_poll fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - } +#ifdef OS_LINUX + if (thread_data->ev_puller.timeout == 0) { + sched_pull = (sched_counter++ & 8) != 0; + } else { + sched_pull = true; + } +#else + sched_pull = true; +#endif - if (pThreadData->deleted_list != NULL) + if (sched_pull) + { + thread_data->ev_puller.iterator.count = ioevent_poll( + &thread_data->ev_puller); + if (thread_data->ev_puller.iterator.count > 0) + { + deal_ioevents(&thread_data->ev_puller); + } + else if (thread_data->ev_puller.iterator.count < 0) + { + result = errno != 0 ? errno : EINVAL; + if (result != EINTR) + { + logError("file: "__FILE__", line: %d, " \ + "ioevent_poll fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + return result; + } + } + } + + if (thread_data->busy_polling_callback != NULL) + { + thread_data->busy_polling_callback(thread_data); + } + + if (thread_data->deleted_list != NULL) { count = 0; - while (pThreadData->deleted_list != NULL) + while (thread_data->deleted_list != NULL) { - task = pThreadData->deleted_list; - pThreadData->deleted_list = task->next; + task = thread_data->deleted_list; + thread_data->deleted_list = task->next; + if (task->polling.in_queue) + { + fc_list_del_init(&task->polling.dlink); + task->polling.in_queue = false; + if (fc_list_empty(&task->thread_data->polling_queue)) { + ioevent_set_timeout(&task->thread_data->ev_puller, + task->thread_data->timeout_ms); + } + } clean_up_callback(task); count++; } @@ -162,31 +192,31 @@ int ioevent_loop(struct nio_thread_data *pThreadData, { last_check_time = g_current_time; count = fast_timer_timeouts_get( - &pThreadData->timer, g_current_time, &head); + &thread_data->timer, g_current_time, &head); if (count > 0) { deal_timeouts(&head); } } - if (pThreadData->notify.enabled) + if (thread_data->notify.enabled) { int64_t n; - if ((n=__sync_fetch_and_add(&pThreadData->notify.counter, 0)) != 0) + if ((n=__sync_fetch_and_add(&thread_data->notify.counter, 0)) != 0) { - __sync_fetch_and_sub(&pThreadData->notify.counter, n); + __sync_fetch_and_sub(&thread_data->notify.counter, n); /* logInfo("file: "__FILE__", line: %d, " "n ==== %"PRId64", now: %"PRId64, __LINE__, n, __sync_fetch_and_add( - &pThreadData->notify.counter, 0)); + &thread_data->notify.counter, 0)); */ } } - if (pThreadData->thread_loop_callback != NULL) + if (thread_data->thread_loop_callback != NULL) { - pThreadData->thread_loop_callback(pThreadData); + thread_data->thread_loop_callback(thread_data); } } @@ -201,14 +231,13 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, task->thread_data = pThread; task->event.fd = sock; task->event.callback = callback; - if (ioevent_attach(&pThread->ev_puller, - sock, event, task) < 0) + if (ioevent_attach(&pThread->ev_puller, sock, event, task) < 0) { result = errno != 0 ? errno : ENOENT; - logError("file: "__FILE__", line: %d, " \ - "ioevent_attach fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); + logError("file: "__FILE__", line: %d, " + "ioevent_attach fail, fd: %d, " + "errno: %d, error info: %s", + __LINE__, sock, result, STRERROR(result)); return result; } @@ -216,3 +245,19 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, fast_timer_add(&pThread->timer, &task->event.timer); return 0; } + +int ioevent_reset(struct fast_task_info *task, int new_fd, short event) +{ + if (task->event.fd == new_fd) + { + return 0; + } + + if (task->event.fd >= 0) + { + ioevent_detach(&task->thread_data->ev_puller, task->event.fd); + } + + task->event.fd = new_fd; + return ioevent_attach(&task->thread_data->ev_puller, new_fd, event, task); +} diff --git a/src/ioevent_loop.h b/src/ioevent_loop.h index 66f8ab9..cce2cb6 100644 --- a/src/ioevent_loop.h +++ b/src/ioevent_loop.h @@ -22,7 +22,7 @@ extern "C" { #endif -int ioevent_loop(struct nio_thread_data *pThreadData, +int ioevent_loop(struct nio_thread_data *thread_data, IOEventCallback recv_notify_callback, TaskCleanUpCallback clean_up_callback, volatile bool *continue_flag); @@ -32,6 +32,8 @@ int ioevent_remove(IOEventPoller *ioevent, void *data); int ioevent_set(struct fast_task_info *pTask, struct nio_thread_data *pThread, int sock, short event, IOEventCallback callback, const int timeout); +int ioevent_reset(struct fast_task_info *task, int new_fd, short event); + static inline bool ioevent_is_canceled(struct fast_task_info *task) { return __sync_fetch_and_add(&task->canceled, 0) != 0; diff --git a/src/logger.c b/src/logger.c index 4541c61..62299c9 100644 --- a/src/logger.c +++ b/src/logger.c @@ -225,28 +225,41 @@ int log_reopen_ex(LogContext *pContext) int log_set_prefix_ex(LogContext *pContext, const char *base_path, const char *filename_prefix) { - int result; + int result; + char log_filename[MAX_PATH_SIZE]; - if ((result=check_and_mk_log_dir(base_path)) != 0) - { - return result; - } + if ((result=check_and_mk_log_dir(base_path)) != 0) + { + return result; + } - snprintf(pContext->log_filename, MAX_PATH_SIZE, - "%s/logs/%s.log", base_path, filename_prefix); - - return log_open(pContext); + snprintf(log_filename, MAX_PATH_SIZE, "%s/logs/%s.log", + base_path, filename_prefix); + return log_set_filename_ex(pContext, log_filename); } int log_set_filename_ex(LogContext *pContext, const char *log_filename) { - if (log_filename == NULL) { - fprintf(stderr, "file: "__FILE__", line: %d, " \ - "log_filename is NULL!\n", __LINE__); + if (log_filename == NULL || *log_filename == '\0') + { + fprintf(stderr, "file: "__FILE__", line: %d, " + "log_filename is NULL or empty!\n", __LINE__); return EINVAL; } - snprintf(pContext->log_filename, MAX_PATH_SIZE, "%s", log_filename); - return log_open(pContext); + + if (*(pContext->log_filename) == '\0') + { + snprintf(pContext->log_filename, MAX_PATH_SIZE, "%s", log_filename); + return log_open(pContext); + } + + if (strcmp(log_filename, pContext->log_filename) == 0) + { + return 0; + } + + snprintf(pContext->log_filename, MAX_PATH_SIZE, "%s", log_filename); + return log_reopen_ex(pContext); } void log_set_cache_ex(LogContext *pContext, const bool bLogCache) diff --git a/src/server_id_func.c b/src/server_id_func.c index 2d3766b..eeecdf3 100644 --- a/src/server_id_func.c +++ b/src/server_id_func.c @@ -314,35 +314,96 @@ static inline void fc_server_set_ip_prefix(FCServerGroupInfo *ginfo, } } -static int fc_server_load_one_group(FCServerConfig *ctx, - const char *config_filename, IniContext *ini_context, - const int group_count, const char *section_name) +static inline int fc_server_set_comm_type(FCCommunicationType *comm_type, + const char *config_filename, const char *section_name, + const char *comm_type_str, const FCCommunicationType default_comm_type) { + if (comm_type_str == NULL) { + *comm_type = default_comm_type; + return 0; + } else if (strcasecmp(comm_type_str, "socket") == 0) { + *comm_type = fc_comm_type_sock; + return 0; + } else if (strcasecmp(comm_type_str, "rdma") == 0) { + *comm_type = fc_comm_type_rdma; + return 0; + } else { + logError("file: "__FILE__", line: %d, " + "config filename: %s, section: %s, " + "invalid communication: %s!", __LINE__, + config_filename, section_name, comm_type_str); + return EINVAL; + } +} + +static int load_comm_type_and_smart_polling(IniFullContext *ini_ctx, + FCCommunicationType *comm_type, FCSmartPollingConfig *smart_polling, + const FCCommunicationType default_comm_type, + const FCSmartPollingConfig *default_smart_polling) +{ + int result; + char *comm_type_str; + + comm_type_str = iniGetStrValue(ini_ctx->section_name, + "communication", ini_ctx->context); + if (comm_type_str == NULL) { + comm_type_str = iniGetStrValue(ini_ctx->section_name, + "comm_type", ini_ctx->context); + } + if ((result=fc_server_set_comm_type(comm_type, ini_ctx->filename, + ini_ctx->section_name, comm_type_str, + default_comm_type)) != 0) + { + return result; + } + + if (*comm_type == fc_comm_type_sock) { + smart_polling->enabled = false; + smart_polling->switch_on_iops = 0; + smart_polling->switch_on_count = 0; + } else { + smart_polling->enabled = iniGetBoolValue(ini_ctx->section_name, + "smart_polling", ini_ctx->context, + default_smart_polling->enabled); + smart_polling->switch_on_iops = iniGetIntValue(ini_ctx->section_name, + "polling_switch_on_iops", ini_ctx->context, + default_smart_polling->switch_on_iops); + smart_polling->switch_on_count = iniGetIntValue(ini_ctx->section_name, + "polling_switch_on_count", ini_ctx->context, + default_smart_polling->switch_on_count); + } + return 0; +} + +static int fc_server_load_one_group(FCServerConfig *ctx, + IniFullContext *ini_ctx, const int group_count) +{ + int result; FCServerGroupInfo *group; char new_name[FAST_INI_ITEM_NAME_SIZE]; char *port_str; char *net_type; char *ip_prefix; - strcpy(new_name, section_name); + strcpy(new_name, ini_ctx->section_name); group = ctx->group_array.groups + ctx->group_array.count; fc_server_set_group_ptr_name(group, new_name + GROUP_SECTION_PREFIX_LEN); - if (group->group_name.len == 0) { logError("file: "__FILE__", line: %d, " "config filename: %s, section: %s, no group name!", - __LINE__, config_filename, section_name); + __LINE__, ini_ctx->filename, ini_ctx->section_name); return EINVAL; } - port_str = iniGetStrValue(section_name, SERVER_ITEM_PORT_STR, ini_context); + port_str = iniGetStrValue(ini_ctx->section_name, + SERVER_ITEM_PORT_STR, ini_ctx->context); if (port_str == NULL) { if (group_count == 1) { group->port = ctx->default_port; } else { logError("file: "__FILE__", line: %d, " "config filename: %s, section: %s, no item: %s!", - __LINE__, config_filename, section_name, + __LINE__, ini_ctx->filename, ini_ctx->section_name, SERVER_ITEM_PORT_STR); return ENOENT; } @@ -352,24 +413,33 @@ static int fc_server_load_one_group(FCServerConfig *ctx, if (group->port <= 0 || (endptr != NULL && *endptr != '\0')) { logError("file: "__FILE__", line: %d, " "config filename: %s, section: %s, item: %s, " - "invalid port: %s", __LINE__, config_filename, - section_name, SERVER_ITEM_PORT_STR, port_str); + "invalid port: %s", __LINE__, ini_ctx->filename, + ini_ctx->section_name, SERVER_ITEM_PORT_STR, port_str); return EINVAL; } } - net_type = iniGetStrValue(section_name, "net_type", ini_context); + net_type = iniGetStrValue(ini_ctx->section_name, + "net_type", ini_ctx->context); group->filter.net_type = fc_get_net_type_by_name(net_type); if (group->filter.net_type == FC_NET_TYPE_NONE) { logError("file: "__FILE__", line: %d, " "config filename: %s, section: %s, invalid net_type: %s", - __LINE__, config_filename, group->group_name.str, net_type); + __LINE__, ini_ctx->filename, group->group_name.str, net_type); return EINVAL; } - ip_prefix = iniGetStrValue(section_name, "ip_prefix", ini_context); + ip_prefix = iniGetStrValue(ini_ctx->section_name, + "ip_prefix", ini_ctx->context); fc_server_set_ip_prefix(group, ip_prefix); + if ((result=load_comm_type_and_smart_polling(ini_ctx, + &group->comm_type, &group->smart_polling, + ctx->comm_type, &ctx->smart_polling)) != 0) + { + return result; + } + ctx->group_array.count++; return 0; } @@ -429,7 +499,7 @@ static void fc_server_sort_groups(FCServerConfig *ctx) } static int fc_server_load_groups(FCServerConfig *ctx, - const char *config_filename, IniContext *ini_context) + IniFullContext *ini_ctx) { int result; int count; @@ -437,13 +507,13 @@ static int fc_server_load_groups(FCServerConfig *ctx, IniSectionInfo *section; IniSectionInfo *end; - if ((result=iniGetSectionNamesByPrefix(ini_context, + if ((result=iniGetSectionNamesByPrefix(ini_ctx->context, GROUP_SECTION_PREFIX_STR, sections, FC_MAX_GROUP_COUNT, &count)) != 0) { logError("file: "__FILE__", line: %d, " "config filename: %s, get sections by prefix %s fail, " - "errno: %d, error info: %s", __LINE__, config_filename, + "errno: %d, error info: %s", __LINE__, ini_ctx->filename, GROUP_SECTION_PREFIX_STR, result, STRERROR(result)); return result; } @@ -452,15 +522,14 @@ static int fc_server_load_groups(FCServerConfig *ctx, ctx->group_array.count = 1; fc_server_set_group_ptr_name(ctx->group_array.groups + 0, ""); ctx->group_array.groups[0].port = iniGetIntValue(NULL, "port", - ini_context, ctx->default_port); + ini_ctx->context, ctx->default_port); return 0; } end = sections + count; for (section=sections; sectionsection_name)) != 0) - { + ini_ctx->section_name = section->section_name; + if ((result=fc_server_load_one_group(ctx, ini_ctx, count)) != 0) { return result; } } @@ -794,6 +863,7 @@ static int fc_server_load_group_server(FCServerConfig *ctx, return result; } + address.conn.comm_type = group->comm_type; if ((result=fc_server_set_group_server_address(server, group_addr, &address)) != 0) { @@ -835,9 +905,16 @@ static int fc_server_set_host(FCServerConfig *ctx, FCServerInfo *server, if (addr->conn.port == 0) { addr_holder = *addr; addr_holder.conn.port = FC_SERVER_GROUP_PORT(group); + addr_holder.conn.comm_type = group->comm_type; new_addr = &addr_holder; } else { - new_addr = addr; + if (addr->conn.comm_type == group->comm_type) { + new_addr = addr; + } else { + addr_holder = *addr; + addr_holder.conn.comm_type = group->comm_type; + new_addr = &addr_holder; + } } if ((result=fc_server_set_group_server_address(server, @@ -1167,17 +1244,77 @@ static int fc_server_load_servers(FCServerConfig *ctx, return result; } +static void load_connection_thread_local(FCServerConfig *ctx, + IniContext *ini_context, const char *config_filename) +{ + char *connection_thread_local; + + connection_thread_local = iniGetStrValue(NULL, + "connection_thread_local", ini_context); + if (connection_thread_local == NULL || *connection_thread_local == '\0') { + ctx->connection_thread_local = fc_connection_thread_local_auto; + } else if (strcasecmp(connection_thread_local, "auto") == 0) { + ctx->connection_thread_local = fc_connection_thread_local_auto; + } else if (strcasecmp(connection_thread_local, "yes") == 0) { + ctx->connection_thread_local = fc_connection_thread_local_yes; + } else if (strcasecmp(connection_thread_local, "no") == 0) { + ctx->connection_thread_local = fc_connection_thread_local_no; + } else { + logWarning("file: "__FILE__", line: %d, " + "config file: %s, invalid connection_thread_local: %s, " + "set to auto!", __LINE__, config_filename, + connection_thread_local); + ctx->connection_thread_local = fc_connection_thread_local_auto; + } +} + static int fc_server_load_data(FCServerConfig *ctx, IniContext *ini_context, const char *config_filename) { int result; + int buffer_size; + bool have_rdma; + IniFullContext full_ini_ctx; + FCSmartPollingConfig default_smart_polling; + FCServerGroupInfo *group; + FCServerGroupInfo *end; - if ((result=fc_server_load_groups(ctx, config_filename, - ini_context)) != 0) + FAST_INI_SET_FULL_CTX_EX(full_ini_ctx, + config_filename, NULL, ini_context); + default_smart_polling.enabled = true; + default_smart_polling.switch_on_iops = 10240; + default_smart_polling.switch_on_count = 3; + if ((result=load_comm_type_and_smart_polling(&full_ini_ctx, + &ctx->comm_type, &ctx->smart_polling, + fc_comm_type_sock, &default_smart_polling)) != 0) { return result; } + if ((result=fc_server_load_groups(ctx, &full_ini_ctx)) != 0) { + return result; + } + + have_rdma = false; + end = ctx->group_array.groups + ctx->group_array.count; + for (group=ctx->group_array.groups; groupcomm_type != fc_comm_type_sock) { + have_rdma = true; + break; + } + } + + if (have_rdma) { + full_ini_ctx.section_name = NULL; + buffer_size = iniGetByteValue(NULL, "buffer_size", + ini_context, 256 * 1024); + ctx->buffer_size = iniCheckAndCorrectIntValue(&full_ini_ctx, + "buffer_size", buffer_size, 8 * 1024, 8 * 1024 * 1024); + } else { + ctx->buffer_size = 0; + } + load_connection_thread_local(ctx, ini_context, config_filename); + if ((result=fc_server_load_servers(ctx, config_filename, ini_context)) != 0) { @@ -1340,13 +1477,27 @@ static int fc_groups_to_string(FCServerConfig *ctx, FastBuffer *buffer) fast_buffer_append(buffer, "[%s%.*s]\n" - "port = %d\n" - "net_type = %s\n" - "ip_prefix = %.*s\n\n", + "port = %d\n", GROUP_SECTION_PREFIX_STR, group->group_name.len, group->group_name.str, - group->port, net_type_caption, - group->filter.ip_prefix.len, + group->port); + + if (group->comm_type != fc_comm_type_sock) { + fast_buffer_append(buffer, + "communication = %s\n" + "smart_polling = %d\n" + "polling_switch_on_iops = %d\n" + "polling_switch_on_count = %d\n", + fc_comm_type_str(group->comm_type), + group->smart_polling.enabled, + group->smart_polling.switch_on_iops, + group->smart_polling.switch_on_count); + } + + fast_buffer_append(buffer, + "net_type = %s\n" + "ip_prefix = %.*s\n\n", + net_type_caption, group->filter.ip_prefix.len, group->filter.ip_prefix.str); } return 0; @@ -1423,6 +1574,14 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer) { int result; + if (ctx->buffer_size > 0) { + if ((result=fast_buffer_check(buffer, 1024)) != 0) { + return result; + } + fast_buffer_append(buffer, "buffer_size = %d KB", + ctx->buffer_size / 1024); + } + fc_server_clear_server_port(&ctx->group_array); if ((result=fc_groups_to_string(ctx, buffer)) != 0) { return result; @@ -1435,13 +1594,27 @@ static void fc_server_log_groups(FCServerConfig *ctx) { FCServerGroupInfo *group; FCServerGroupInfo *end; + char buff[1024]; + char *p; end = ctx->group_array.groups + ctx->group_array.count; for (group=ctx->group_array.groups; groupgroup_name.len, group->group_name.str, group->port, + p = buff + sprintf(buff, "group_name: %.*s, port: %d", + group->group_name.len, group->group_name.str, + group->port); + if (group->comm_type != fc_comm_type_sock) { + p += sprintf(p, ", communication: %s, smart_polling: %d, " + "polling_switch_on_iops: %d, polling_switch_on_count: %d", + fc_comm_type_str(group->comm_type), + group->smart_polling.enabled, + group->smart_polling.switch_on_iops, + group->smart_polling.switch_on_count); + } + p += sprintf(p, ", net_type: %s, ip_prefix: %.*s", get_net_type_caption(group->filter.net_type), group->filter.ip_prefix.len, group->filter.ip_prefix.str); + + log_it1(LOG_INFO, buff, p - buff); } } @@ -1491,67 +1664,20 @@ static void fc_server_log_servers(FCServerConfig *ctx) void fc_server_to_log(FCServerConfig *ctx) { + char buff[256]; + char *p; + + p = buff + sprintf(buff, "connection_thread_local: %s", + fc_connection_thread_local_str(ctx->connection_thread_local)); + if (ctx->buffer_size > 0) { + p += sprintf(p, ", buffer_size: %d KB", ctx->buffer_size / 1024); + } + log_it1(LOG_INFO, buff, p - buff); + fc_server_log_groups(ctx); fc_server_log_servers(ctx); } -ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array, - const char *service_name, const int connect_timeout, - const char *bind_ipaddr, const bool log_connect_error, int *err_no) -{ - FCAddressInfo **current; - FCAddressInfo **addr; - FCAddressInfo **end; - - if (addr_array->count <= 0) { - *err_no = ENOENT; - return NULL; - } - - current = addr_array->addrs + addr_array->index; - if ((*current)->conn.sock >= 0) { - return &(*current)->conn; - } - - if ((*err_no=conn_pool_connect_server_ex1(&(*current)->conn, - service_name, connect_timeout, bind_ipaddr, - log_connect_error)) == 0) - { - return &(*current)->conn; - } - - if (addr_array->count == 1) { - return NULL; - } - - end = addr_array->addrs + addr_array->count; - for (addr=addr_array->addrs; addrconn, - service_name, connect_timeout, bind_ipaddr, - log_connect_error)) == 0) - { - addr_array->index = addr - addr_array->addrs; - return &(*addr)->conn; - } - } - - return NULL; -} - -void fc_server_disconnect(FCAddressPtrArray *addr_array) -{ - FCAddressInfo **current; - - current = addr_array->addrs + addr_array->index; - if ((*current)->conn.sock >= 0) { - close((*current)->conn.sock); - (*current)->conn.sock = -1; - } -} - int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, ConnectionInfo *conn, const char *service_name, const int connect_timeout, const char *bind_ipaddr, @@ -1567,10 +1693,11 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, } current = addr_array->addrs + addr_array->index; - *conn = (*current)->conn; - conn->sock = -1; - if ((result=conn_pool_connect_server_ex1(conn, - service_name, connect_timeout, + conn_pool_set_server_info(conn, (*current)->conn.ip_addr, + (*current)->conn.port); + conn->comm_type = (*current)->conn.comm_type; + if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + make_connection(conn, service_name, connect_timeout * 1000, bind_ipaddr, log_connect_error)) == 0) { return 0; @@ -1586,10 +1713,10 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, continue; } - *conn = (*addr)->conn; - conn->sock = -1; - if ((result=conn_pool_connect_server_ex1(conn, - service_name, connect_timeout, + conn_pool_set_server_info(conn, (*addr)->conn.ip_addr, + (*addr)->conn.port); + if ((result=G_COMMON_CONNECTION_CALLBACKS[conn->comm_type]. + make_connection(conn, service_name, connect_timeout * 1000, bind_ipaddr, log_connect_error)) == 0) { addr_array->index = addr - addr_array->addrs; @@ -1624,3 +1751,34 @@ const FCAddressInfo *fc_server_get_address_by_peer( return *(addr_array->addrs); } + +struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd, + FCAddressPtrArray *address_array, int *result) +{ + char *ip_addrs[FC_MAX_SERVER_IP_COUNT]; + char **ip_addr; + FCAddressInfo **addr; + FCAddressInfo **end; + struct ibv_pd *pd; + int port; + + if (address_array->count == 0) { + port = 0; + } else { + port = address_array->addrs[0]->conn.port; + } + + end = address_array->addrs + address_array->count; + for (addr=address_array->addrs, ip_addr=ip_addrs; addrconn.ip_addr; + } + + if ((pd=alloc_pd((const char **)ip_addrs, address_array-> + count, port)) != NULL) + { + *result = 0; + } else { + *result = ENODEV; + } + return pd; +} diff --git a/src/server_id_func.h b/src/server_id_func.h index 52a32bc..6002371 100644 --- a/src/server_id_func.h +++ b/src/server_id_func.h @@ -52,11 +52,20 @@ typedef struct { FCAddressInfo **addrs; } FCAddressPtrArray; +typedef struct +{ + bool enabled; + int switch_on_iops; + int switch_on_count; +} FCSmartPollingConfig; + typedef struct { string_t group_name; int port; //default port int server_port; //port in server section + FCCommunicationType comm_type; + FCSmartPollingConfig smart_polling; struct { int net_type; string_t ip_prefix; @@ -111,11 +120,21 @@ typedef struct FCServerMap *maps; } FCServerMapArray; -typedef struct +typedef enum { + fc_connection_thread_local_auto, + fc_connection_thread_local_yes, + fc_connection_thread_local_no +} FCServerConnThreadLocal; + +typedef struct fc_server_config { int default_port; int min_hosts_each_group; bool share_between_groups; //if an address shared between different groups + int buffer_size; //for RDMA + FCCommunicationType comm_type; + FCSmartPollingConfig smart_polling; + FCServerConnThreadLocal connection_thread_local; FCServerGroupArray group_array; struct { FCServerInfoArray by_id; //sorted by server id @@ -140,6 +159,16 @@ static inline FCServerInfo *fc_server_get_by_ip_port(FCServerConfig *ctx, FCServerGroupInfo *fc_server_get_group_by_name(FCServerConfig *ctx, const string_t *group_name); +static inline FCServerGroupInfo *fc_server_get_group_by_index( + FCServerConfig *ctx, const int index) +{ + if (index < 0 || index >= ctx->group_array.count) { + return NULL; + } + + return ctx->group_array.groups + index; +} + static inline int fc_server_get_group_index_ex(FCServerConfig *ctx, const string_t *group_name) { @@ -211,17 +240,6 @@ int fc_server_to_config_string(FCServerConfig *ctx, FastBuffer *buffer); void fc_server_to_log(FCServerConfig *ctx); -ConnectionInfo *fc_server_check_connect_ex(FCAddressPtrArray *addr_array, - const char *service_name, const int connect_timeout, - const char *bind_ipaddr, const bool log_connect_error, int *err_no); - -#define fc_server_check_connect(addr_array, service_name, \ - connect_timeout, err_no) \ - fc_server_check_connect_ex(addr_array, service_name, \ - connect_timeout, NULL, true, err_no) - -void fc_server_disconnect(FCAddressPtrArray *addr_array); - const FCAddressInfo *fc_server_get_address_by_peer( FCAddressPtrArray *addr_array, const char *peer_ip); @@ -235,6 +253,35 @@ int fc_server_make_connection_ex(FCAddressPtrArray *addr_array, fc_server_make_connection_ex(addr_array, conn, \ service_name, connect_timeout, NULL, true) +static inline void fc_server_close_connection(ConnectionInfo *conn) +{ + G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].close_connection(conn); +} + +static inline void fc_server_destroy_connection(ConnectionInfo *conn) +{ + fc_server_close_connection(conn); + conn_pool_free_connection(conn); +} + +struct ibv_pd *fc_alloc_rdma_pd(fc_alloc_pd_callback alloc_pd, + FCAddressPtrArray *address_array, int *result); + +static inline const char *fc_connection_thread_local_str( + const FCServerConnThreadLocal value) +{ + switch (value) { + case fc_connection_thread_local_auto: + return "auto"; + case fc_connection_thread_local_yes: + return "yes"; + case fc_connection_thread_local_no: + return "no"; + default: + return "unkown"; + } +} + #ifdef __cplusplus } #endif