diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index d21fd22..ae752de 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -171,13 +171,13 @@ void client_channel_destroy() } static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel - *channel, const uint32_t hash_code, const FCNetworkType network_type, + *channel, const uint32_t hash_code, const FCCommunicationType comm_type, const char *server_ip, const uint16_t port, int *err_no) { struct fast_task_info *task; SFNetworkHandler *handler; - if (network_type == fc_network_type_sock) { + if (comm_type == fc_comm_type_sock) { handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; } else { handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; @@ -232,7 +232,7 @@ int idempotency_client_channel_check_reconnect( } struct idempotency_client_channel *idempotency_client_channel_get( - const FCNetworkType network_type, const char *server_ip, + const FCCommunicationType comm_type, const char *server_ip, const uint16_t server_port, const int timeout, int *err_no) { int r; @@ -284,7 +284,7 @@ struct idempotency_client_channel *idempotency_client_channel_get( } channel->task = alloc_channel_task(channel, hash_code, - network_type, server_ip, server_port, err_no); + comm_type, server_ip, server_port, err_no); if (channel->task == NULL) { fast_mblock_free_object(&channel_context. channel_allocator, channel); diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index 6c2798b..ba31309 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -41,7 +41,7 @@ void idempotency_client_channel_config_to_string_ex( char *output, const int size, const bool add_comma); struct idempotency_client_channel *idempotency_client_channel_get( - const FCNetworkType network_type, const char *server_ip, + const FCCommunicationType comm_type, const char *server_ip, const uint16_t server_port, const int timeout, int *err_no); static inline uint64_t idempotency_client_channel_next_seq_id( diff --git a/src/sf_global.c b/src/sf_global.c index b13ef9a..0508621 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -46,7 +46,7 @@ SFGlobalVariables g_sf_global_vars = { }; SFContext g_sf_context = {{'\0'}, NULL, 0, - {{true, fc_network_type_sock}, {false, fc_network_type_rdma}}, + {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, NULL, NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL }; @@ -441,6 +441,7 @@ static int load_rdma_apis(SFNetworkHandler *handler) LOAD_API(handler, get_connection_size); LOAD_API(handler, init_connection); + LOAD_API(handler, alloc_pd); LOAD_API(handler, create_server); LOAD_API(handler, close_server); LOAD_API(handler, accept_connection); @@ -462,7 +463,7 @@ static int init_network_handler(SFNetworkHandler *handler, handler->inner.is_inner = true; handler->outer.is_inner = false; - if (handler->type == fc_network_type_sock) { + if (handler->comm_type == fc_comm_type_sock) { handler->inner.sock = -1; handler->outer.sock = -1; handler->create_server = sf_socket_create_server; @@ -497,9 +498,22 @@ int sf_load_context_from_config_ex(SFContext *sf_context, sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; - sock_handler->type = fc_network_type_sock; - rdma_handler->type = fc_network_type_rdma; + sock_handler->comm_type = fc_comm_type_sock; + rdma_handler->comm_type = fc_comm_type_rdma; + if (config->comm_type == fc_comm_type_sock) { + sock_handler->enabled = true; + rdma_handler->enabled = false; + } else if (config->comm_type == fc_comm_type_rdma) { + sock_handler->enabled = false; + rdma_handler->enabled = true; + } else if (config->comm_type == fc_comm_type_both) { + sock_handler->enabled = true; + rdma_handler->enabled = true; + } for (i=0; ihandlers[i].enabled) { + continue; + } if ((result=init_network_handler(sf_context->handlers + i, sf_context)) != 0) { @@ -584,6 +598,34 @@ int sf_load_context_from_config_ex(SFContext *sf_context, return 0; } +int sf_alloc_rdma_pd(SFContext *sf_context, + FCAddressPtrArray *address_array) +{ + SFNetworkHandler *handler; + char *ip_addrs[FC_MAX_SERVER_IP_COUNT]; + char **ip_addr; + FCAddressInfo **addr; + FCAddressInfo **end; + + handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + if (!handler->enabled) { + return 0; + } + + end = address_array->addrs + address_array->count; + for (addr=address_array->addrs, ip_addr=ip_addrs; addrconn.ip_addr; + } + + if ((handler->pd=handler->alloc_pd((const char **)ip_addrs, + address_array->count)) != NULL) + { + return 0; + } else { + return ENODEV; + } +} + void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { diff --git a/src/sf_global.h b/src/sf_global.h index c41068c..90450a0 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -66,6 +66,7 @@ typedef struct sf_context_ini_config { int default_inner_port; int default_outer_port; int default_work_threads; + FCCommunicationType comm_type; const char *max_pkg_size_item_name; } SFContextIniConfig; @@ -151,23 +152,25 @@ extern SFContext g_sf_context; #define SF_FCHOWN_TO_RUNBY_RETURN_ON_ERROR(fd, path) \ SF_FCHOWN_RETURN_ON_ERROR(fd, path, geteuid(), getegid()) -#define SF_SET_CONTEXT_INI_CONFIG_EX(config, filename, pIniContext, \ - section_name, def_inner_port, def_outer_port, def_work_threads, \ - max_pkg_size_item_nm) \ +#define SF_SET_CONTEXT_INI_CONFIG_EX(config, the_comm_type, filename, \ + pIniContext, section_name, def_inner_port, def_outer_port, \ + def_work_threads, max_pkg_size_item_nm) \ do { \ FAST_INI_SET_FULL_CTX_EX(config.ini_ctx, filename, \ section_name, pIniContext); \ + config.comm_type = the_comm_type; \ config.default_inner_port = def_inner_port; \ config.default_outer_port = def_outer_port; \ config.default_work_threads = def_work_threads; \ config.max_pkg_size_item_name = max_pkg_size_item_nm; \ } while (0) -#define SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, \ - section_name, def_inner_port, def_outer_port, def_work_threads) \ - SF_SET_CONTEXT_INI_CONFIG_EX(config, filename, pIniContext, \ - section_name, def_inner_port, def_outer_port, def_work_threads, \ - "max_pkg_size") +#define SF_SET_CONTEXT_INI_CONFIG(config, the_comm_type, \ + filename, pIniContext, section_name, def_inner_port, \ + def_outer_port, def_work_threads) \ + SF_SET_CONTEXT_INI_CONFIG_EX(config, the_comm_type, filename, \ + pIniContext, section_name, def_inner_port, def_outer_port, \ + def_work_threads, "max_pkg_size") int sf_load_global_config_ex(const char *server_name, IniFullContext *ini_ctx, const bool load_network_params, @@ -190,6 +193,7 @@ int sf_load_config_ex(const char *server_name, SFContextIniConfig *config, const int task_buffer_extra_size, const bool need_set_run_by); static inline int sf_load_config(const char *server_name, + const FCCommunicationType comm_type, const char *filename, IniContext *pIniContext, const char *section_name, const int default_inner_port, const int default_outer_port, const int task_buffer_extra_size) @@ -197,7 +201,7 @@ static inline int sf_load_config(const char *server_name, const bool need_set_run_by = true; SFContextIniConfig config; - SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, + SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext, section_name, default_inner_port, default_outer_port, DEFAULT_WORK_THREADS); return sf_load_config_ex(server_name, &config, @@ -208,18 +212,22 @@ int sf_load_context_from_config_ex(SFContext *sf_context, SFContextIniConfig *config); static inline int sf_load_context_from_config(SFContext *sf_context, + const FCCommunicationType comm_type, const char *filename, IniContext *pIniContext, const char *section_name, const int default_inner_port, const int default_outer_port) { SFContextIniConfig config; - SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, + SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext, section_name, default_inner_port, default_outer_port, DEFAULT_WORK_THREADS); return sf_load_context_from_config_ex(sf_context, &config); } +int sf_alloc_rdma_pd(SFContext *sf_context, + FCAddressPtrArray *address_array); + int sf_load_log_config(IniFullContext *ini_ctx, LogContext *log_ctx, SFLogConfig *log_cfg); diff --git a/src/sf_nio.c b/src/sf_nio.c index 9d0c98a..da3087d 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -38,7 +38,6 @@ #include "fastcommon/fast_task_queue.h" #include "fastcommon/ioevent_loop.h" #include "fastcommon/fc_atomic.h" -#include "sf_global.h" #include "sf_service.h" #include "sf_nio.h" @@ -212,6 +211,9 @@ static int sf_client_sock_connect(int sock, short event, void *arg) result = ETIMEDOUT; } else { result = task->handler->connect_server_done(task); + if (result == EINPROGRESS) { + return 0; + } } if (result != 0) { diff --git a/src/sf_nio.h b/src/sf_nio.h index 026d4f4..3ce0438 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -25,6 +25,7 @@ #include "fastcommon/ioevent_loop.h" #include "sf_define.h" #include "sf_types.h" +#include "sf_global.h" #define SF_CTX (task->handler->ctx) diff --git a/src/sf_service.c b/src/sf_service.c index 9f0e5f6..da91311 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -545,7 +545,7 @@ static void *accept_thread_entrance(SFListener *listener) { char thread_name[32]; snprintf(thread_name, sizeof(thread_name), "%s-%s-listen", - listener->handler->type == fc_network_type_sock ? + listener->handler->comm_type == fc_comm_type_sock ? "sock" : "rdma", listener->handler->ctx->name); prctl(PR_SET_NAME, thread_name); } diff --git a/src/sf_service.h b/src/sf_service.h index e5d7134..912a864 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -168,6 +168,60 @@ static inline void sf_release_task(struct fast_task_info *task) } } +static inline SFNetworkHandler *sf_get_first_network_handler_ex( + SFContext *sf_context) +{ + SFNetworkHandler *handler; + SFNetworkHandler *end; + + end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + return handler; + } + } + + return NULL; +} +#define sf_get_first_network_handler() \ + sf_get_first_network_handler_ex(&g_sf_context) + + +static inline SFNetworkHandler *sf_get_rdma_network_handler( + SFContext *sf_context) +{ + SFNetworkHandler *handler; + + handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + return (handler->enabled ? handler : NULL); +} + +static inline SFNetworkHandler *sf_get_rdma_network_handler2( + SFContext *sf_context1, SFContext *sf_context2) +{ + SFNetworkHandler *handler; + + if ((handler=sf_get_rdma_network_handler(sf_context1)) != NULL) { + return handler; + } + return sf_get_rdma_network_handler(sf_context2); +} + +static inline SFNetworkHandler *sf_get_rdma_network_handler3( + SFContext *sf_context1, SFContext *sf_context2, + SFContext *sf_context3) +{ + SFNetworkHandler *handler; + + if ((handler=sf_get_rdma_network_handler(sf_context1)) != NULL) { + return handler; + } + if ((handler=sf_get_rdma_network_handler(sf_context2)) != NULL) { + return handler; + } + return sf_get_rdma_network_handler(sf_context3); +} + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index a88b317..4973631 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -59,10 +59,15 @@ typedef enum { sf_comm_action_finish = 'f' } SFCommAction; +struct ibv_pd; struct sf_listener; typedef int (*sf_get_connection_size_callback)(); -typedef int (*sf_init_connection_callback)(struct fast_task_info *task, void *arg); +typedef int (*sf_init_connection_callback)( + struct fast_task_info *task, void *arg); +typedef struct ibv_pd *(*sf_alloc_pd_callback)( + const char **ip_addrs, const int count); + typedef int (*sf_create_server_callback)(struct sf_listener *listener, int af, const char *bind_addr); typedef void (*sf_close_server_callback)(struct sf_listener *listener); @@ -91,10 +96,9 @@ typedef struct sf_listener { } SFListener; struct sf_context; -struct ibv_pd; typedef struct sf_network_handler { bool enabled; - FCNetworkType type; + FCCommunicationType comm_type; struct sf_context *ctx; struct ibv_pd *pd; @@ -104,6 +108,7 @@ typedef struct sf_network_handler { /* for server side */ sf_get_connection_size_callback get_connection_size; sf_init_connection_callback init_connection; + sf_alloc_pd_callback alloc_pd; sf_create_server_callback create_server; sf_close_server_callback close_server; sf_accept_connection_callback accept_connection;