add parameter comm_type when load from config

support_rdma
YuQing 2023-09-07 09:38:21 +08:00
parent b3334d2ad5
commit dedc023235
9 changed files with 136 additions and 24 deletions

View File

@ -171,13 +171,13 @@ void client_channel_destroy()
}
static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
*channel, const uint32_t hash_code, const FCNetworkType network_type,
*channel, const uint32_t hash_code, const FCCommunicationType comm_type,
const char *server_ip, const uint16_t port, int *err_no)
{
struct fast_task_info *task;
SFNetworkHandler *handler;
if (network_type == fc_network_type_sock) {
if (comm_type == fc_comm_type_sock) {
handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
} else {
handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
@ -232,7 +232,7 @@ int idempotency_client_channel_check_reconnect(
}
struct idempotency_client_channel *idempotency_client_channel_get(
const FCNetworkType network_type, const char *server_ip,
const FCCommunicationType comm_type, const char *server_ip,
const uint16_t server_port, const int timeout, int *err_no)
{
int r;
@ -284,7 +284,7 @@ struct idempotency_client_channel *idempotency_client_channel_get(
}
channel->task = alloc_channel_task(channel, hash_code,
network_type, server_ip, server_port, err_no);
comm_type, server_ip, server_port, err_no);
if (channel->task == NULL) {
fast_mblock_free_object(&channel_context.
channel_allocator, channel);

View File

@ -41,7 +41,7 @@ void idempotency_client_channel_config_to_string_ex(
char *output, const int size, const bool add_comma);
struct idempotency_client_channel *idempotency_client_channel_get(
const FCNetworkType network_type, const char *server_ip,
const FCCommunicationType comm_type, const char *server_ip,
const uint16_t server_port, const int timeout, int *err_no);
static inline uint64_t idempotency_client_channel_next_seq_id(

View File

@ -46,7 +46,7 @@ SFGlobalVariables g_sf_global_vars = {
};
SFContext g_sf_context = {{'\0'}, NULL, 0,
{{true, fc_network_type_sock}, {false, fc_network_type_rdma}},
{{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}},
1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, NULL,
NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL
};
@ -441,6 +441,7 @@ static int load_rdma_apis(SFNetworkHandler *handler)
LOAD_API(handler, get_connection_size);
LOAD_API(handler, init_connection);
LOAD_API(handler, alloc_pd);
LOAD_API(handler, create_server);
LOAD_API(handler, close_server);
LOAD_API(handler, accept_connection);
@ -462,7 +463,7 @@ static int init_network_handler(SFNetworkHandler *handler,
handler->inner.is_inner = true;
handler->outer.is_inner = false;
if (handler->type == fc_network_type_sock) {
if (handler->comm_type == fc_comm_type_sock) {
handler->inner.sock = -1;
handler->outer.sock = -1;
handler->create_server = sf_socket_create_server;
@ -497,9 +498,22 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
sock_handler->type = fc_network_type_sock;
rdma_handler->type = fc_network_type_rdma;
sock_handler->comm_type = fc_comm_type_sock;
rdma_handler->comm_type = fc_comm_type_rdma;
if (config->comm_type == fc_comm_type_sock) {
sock_handler->enabled = true;
rdma_handler->enabled = false;
} else if (config->comm_type == fc_comm_type_rdma) {
sock_handler->enabled = false;
rdma_handler->enabled = true;
} else if (config->comm_type == fc_comm_type_both) {
sock_handler->enabled = true;
rdma_handler->enabled = true;
}
for (i=0; i<SF_NETWORK_HANDLER_COUNT; i++) {
if (!sf_context->handlers[i].enabled) {
continue;
}
if ((result=init_network_handler(sf_context->handlers + i,
sf_context)) != 0)
{
@ -584,6 +598,34 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
return 0;
}
int sf_alloc_rdma_pd(SFContext *sf_context,
FCAddressPtrArray *address_array)
{
SFNetworkHandler *handler;
char *ip_addrs[FC_MAX_SERVER_IP_COUNT];
char **ip_addr;
FCAddressInfo **addr;
FCAddressInfo **end;
handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
if (!handler->enabled) {
return 0;
}
end = address_array->addrs + address_array->count;
for (addr=address_array->addrs, ip_addr=ip_addrs; addr<end; addr++) {
*ip_addr = (*addr)->conn.ip_addr;
}
if ((handler->pd=handler->alloc_pd((const char **)ip_addrs,
address_array->count)) != NULL)
{
return 0;
} else {
return ENODEV;
}
}
void sf_context_config_to_string(const SFContext *sf_context,
char *output, const int size)
{

View File

@ -66,6 +66,7 @@ typedef struct sf_context_ini_config {
int default_inner_port;
int default_outer_port;
int default_work_threads;
FCCommunicationType comm_type;
const char *max_pkg_size_item_name;
} SFContextIniConfig;
@ -151,23 +152,25 @@ extern SFContext g_sf_context;
#define SF_FCHOWN_TO_RUNBY_RETURN_ON_ERROR(fd, path) \
SF_FCHOWN_RETURN_ON_ERROR(fd, path, geteuid(), getegid())
#define SF_SET_CONTEXT_INI_CONFIG_EX(config, filename, pIniContext, \
section_name, def_inner_port, def_outer_port, def_work_threads, \
max_pkg_size_item_nm) \
#define SF_SET_CONTEXT_INI_CONFIG_EX(config, the_comm_type, filename, \
pIniContext, section_name, def_inner_port, def_outer_port, \
def_work_threads, max_pkg_size_item_nm) \
do { \
FAST_INI_SET_FULL_CTX_EX(config.ini_ctx, filename, \
section_name, pIniContext); \
config.comm_type = the_comm_type; \
config.default_inner_port = def_inner_port; \
config.default_outer_port = def_outer_port; \
config.default_work_threads = def_work_threads; \
config.max_pkg_size_item_name = max_pkg_size_item_nm; \
} while (0)
#define SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext, \
section_name, def_inner_port, def_outer_port, def_work_threads) \
SF_SET_CONTEXT_INI_CONFIG_EX(config, filename, pIniContext, \
section_name, def_inner_port, def_outer_port, def_work_threads, \
"max_pkg_size")
#define SF_SET_CONTEXT_INI_CONFIG(config, the_comm_type, \
filename, pIniContext, section_name, def_inner_port, \
def_outer_port, def_work_threads) \
SF_SET_CONTEXT_INI_CONFIG_EX(config, the_comm_type, filename, \
pIniContext, section_name, def_inner_port, def_outer_port, \
def_work_threads, "max_pkg_size")
int sf_load_global_config_ex(const char *server_name,
IniFullContext *ini_ctx, const bool load_network_params,
@ -190,6 +193,7 @@ int sf_load_config_ex(const char *server_name, SFContextIniConfig *config,
const int task_buffer_extra_size, const bool need_set_run_by);
static inline int sf_load_config(const char *server_name,
const FCCommunicationType comm_type,
const char *filename, IniContext *pIniContext,
const char *section_name, const int default_inner_port,
const int default_outer_port, const int task_buffer_extra_size)
@ -197,7 +201,7 @@ static inline int sf_load_config(const char *server_name,
const bool need_set_run_by = true;
SFContextIniConfig config;
SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext,
SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext,
section_name, default_inner_port, default_outer_port,
DEFAULT_WORK_THREADS);
return sf_load_config_ex(server_name, &config,
@ -208,18 +212,22 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
SFContextIniConfig *config);
static inline int sf_load_context_from_config(SFContext *sf_context,
const FCCommunicationType comm_type,
const char *filename, IniContext *pIniContext,
const char *section_name, const int default_inner_port,
const int default_outer_port)
{
SFContextIniConfig config;
SF_SET_CONTEXT_INI_CONFIG(config, filename, pIniContext,
SF_SET_CONTEXT_INI_CONFIG(config, comm_type, filename, pIniContext,
section_name, default_inner_port, default_outer_port,
DEFAULT_WORK_THREADS);
return sf_load_context_from_config_ex(sf_context, &config);
}
int sf_alloc_rdma_pd(SFContext *sf_context,
FCAddressPtrArray *address_array);
int sf_load_log_config(IniFullContext *ini_ctx, LogContext *log_ctx,
SFLogConfig *log_cfg);

View File

@ -38,7 +38,6 @@
#include "fastcommon/fast_task_queue.h"
#include "fastcommon/ioevent_loop.h"
#include "fastcommon/fc_atomic.h"
#include "sf_global.h"
#include "sf_service.h"
#include "sf_nio.h"
@ -212,6 +211,9 @@ static int sf_client_sock_connect(int sock, short event, void *arg)
result = ETIMEDOUT;
} else {
result = task->handler->connect_server_done(task);
if (result == EINPROGRESS) {
return 0;
}
}
if (result != 0) {

View File

@ -25,6 +25,7 @@
#include "fastcommon/ioevent_loop.h"
#include "sf_define.h"
#include "sf_types.h"
#include "sf_global.h"
#define SF_CTX (task->handler->ctx)

View File

@ -545,7 +545,7 @@ static void *accept_thread_entrance(SFListener *listener)
{
char thread_name[32];
snprintf(thread_name, sizeof(thread_name), "%s-%s-listen",
listener->handler->type == fc_network_type_sock ?
listener->handler->comm_type == fc_comm_type_sock ?
"sock" : "rdma", listener->handler->ctx->name);
prctl(PR_SET_NAME, thread_name);
}

View File

@ -168,6 +168,60 @@ static inline void sf_release_task(struct fast_task_info *task)
}
}
static inline SFNetworkHandler *sf_get_first_network_handler_ex(
SFContext *sf_context)
{
SFNetworkHandler *handler;
SFNetworkHandler *end;
end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers; handler<end; handler++) {
if (handler->enabled) {
return handler;
}
}
return NULL;
}
#define sf_get_first_network_handler() \
sf_get_first_network_handler_ex(&g_sf_context)
static inline SFNetworkHandler *sf_get_rdma_network_handler(
SFContext *sf_context)
{
SFNetworkHandler *handler;
handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
return (handler->enabled ? handler : NULL);
}
static inline SFNetworkHandler *sf_get_rdma_network_handler2(
SFContext *sf_context1, SFContext *sf_context2)
{
SFNetworkHandler *handler;
if ((handler=sf_get_rdma_network_handler(sf_context1)) != NULL) {
return handler;
}
return sf_get_rdma_network_handler(sf_context2);
}
static inline SFNetworkHandler *sf_get_rdma_network_handler3(
SFContext *sf_context1, SFContext *sf_context2,
SFContext *sf_context3)
{
SFNetworkHandler *handler;
if ((handler=sf_get_rdma_network_handler(sf_context1)) != NULL) {
return handler;
}
if ((handler=sf_get_rdma_network_handler(sf_context2)) != NULL) {
return handler;
}
return sf_get_rdma_network_handler(sf_context3);
}
#ifdef __cplusplus
}
#endif

View File

@ -59,10 +59,15 @@ typedef enum {
sf_comm_action_finish = 'f'
} SFCommAction;
struct ibv_pd;
struct sf_listener;
typedef int (*sf_get_connection_size_callback)();
typedef int (*sf_init_connection_callback)(struct fast_task_info *task, void *arg);
typedef int (*sf_init_connection_callback)(
struct fast_task_info *task, void *arg);
typedef struct ibv_pd *(*sf_alloc_pd_callback)(
const char **ip_addrs, const int count);
typedef int (*sf_create_server_callback)(struct sf_listener
*listener, int af, const char *bind_addr);
typedef void (*sf_close_server_callback)(struct sf_listener *listener);
@ -91,10 +96,9 @@ typedef struct sf_listener {
} SFListener;
struct sf_context;
struct ibv_pd;
typedef struct sf_network_handler {
bool enabled;
FCNetworkType type;
FCCommunicationType comm_type;
struct sf_context *ctx;
struct ibv_pd *pd;
@ -104,6 +108,7 @@ typedef struct sf_network_handler {
/* for server side */
sf_get_connection_size_callback get_connection_size;
sf_init_connection_callback init_connection;
sf_alloc_pd_callback alloc_pd;
sf_create_server_callback create_server;
sf_close_server_callback close_server;
sf_accept_connection_callback accept_connection;