support address family IPv4, IPv6 and both

use_iouring
YuQing 2023-11-29 18:52:02 +08:00
parent 5618afabbb
commit a969a0df07
9 changed files with 505 additions and 289 deletions

View File

@ -175,12 +175,19 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel
const char *server_ip, const uint16_t port, int *err_no) const char *server_ip, const uint16_t port, int *err_no)
{ {
struct fast_task_info *task; struct fast_task_info *task;
SFAddressFamilyHandler *fh;
SFNetworkHandler *handler; SFNetworkHandler *handler;
if (comm_type == fc_comm_type_sock) { if (is_ipv6_addr(server_ip)) {
handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; fh = g_sf_context.handlers + SF_IPV6_ADDRESS_FAMILY_INDEX;
} else { } else {
handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; fh = g_sf_context.handlers + SF_IPV4_ADDRESS_FAMILY_INDEX;
}
if (comm_type == fc_comm_type_sock) {
handler = fh->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
} else {
handler = fh->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
} }
if ((task=sf_alloc_init_task(handler, -1)) == NULL) { if ((task=sf_alloc_init_task(handler, -1)) == NULL) {
*err_no = ENOMEM; *err_no = ENOMEM;

View File

@ -521,7 +521,6 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm,
connect_done_callback, void *args, FCServerConfig *server_cfg, connect_done_callback, void *args, FCServerConfig *server_cfg,
const bool bg_thread_enabled) const bool bg_thread_enabled)
{ {
const int socket_domain = AF_UNSPEC;
struct { struct {
ConnectionExtraParams holder; ConnectionExtraParams holder;
ConnectionExtraParams *ptr; ConnectionExtraParams *ptr;
@ -553,8 +552,8 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm,
extra_params.ptr = &extra_params.holder; extra_params.ptr = &extra_params.holder;
} }
if ((result=conn_pool_init_ex1(&cm->cpool, common_cfg->connect_timeout, if ((result=conn_pool_init_ex1(&cm->cpool, common_cfg->connect_timeout,
max_count_per_entry, max_idle_time, socket_domain, max_count_per_entry, max_idle_time, htable_init_capacity,
htable_init_capacity, connect_done_callback, args, connect_done_callback, args,
sf_cm_validate_connection_callback, cm, sf_cm_validate_connection_callback, cm,
sizeof(SFConnectionParameters), sizeof(SFConnectionParameters),
extra_params.ptr)) != 0) extra_params.ptr)) != 0)

View File

@ -30,6 +30,7 @@
#include "fastcommon/common_define.h" #include "fastcommon/common_define.h"
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
#include "fastcommon/process_ctrl.h" #include "fastcommon/process_ctrl.h"
#include "fastcommon/local_ip_func.h"
#include "fastcommon/logger.h" #include "fastcommon/logger.h"
#include "sf_nio.h" #include "sf_nio.h"
#include "sf_service.h" #include "sf_service.h"
@ -46,22 +47,13 @@ SFGlobalVariables g_sf_global_vars = {
{0, 0}, NULL, {NULL, 0} {0, 0}, NULL, {NULL, 0}
}; };
SFContext g_sf_context = {{'\0'}, NULL, 0, SFContext g_sf_context = {{'\0'}, NULL, 0, sf_address_family_auto,
{{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, {{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}},
1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, true, {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}},
{false, 0, 0}, {sf_task_finish_clean_up} 1, DEFAULT_WORK_THREADS, 0, true, true, true, {false, 0, 0},
{sf_task_finish_clean_up}
}; };
static inline void set_config_str_value(const char *value,
char *dest, const int dest_size)
{
if (value == NULL) {
*dest = '\0';
} else {
snprintf(dest, dest_size, "%s", value);
}
}
static int load_network_parameters(IniFullContext *ini_ctx, static int load_network_parameters(IniFullContext *ini_ctx,
const char *max_pkg_size_item_nm, const int fixed_buff_size, const char *max_pkg_size_item_nm, const int fixed_buff_size,
const int task_buffer_extra_size) const int task_buffer_extra_size)
@ -472,9 +464,9 @@ static int load_rdma_apis(SFNetworkHandler *handler)
} }
static int init_network_handler(SFNetworkHandler *handler, static int init_network_handler(SFNetworkHandler *handler,
SFContext *sf_context) SFAddressFamilyHandler *fh)
{ {
handler->ctx = sf_context; handler->fh = fh;
handler->inner.handler = handler; handler->inner.handler = handler;
handler->outer.handler = handler; handler->outer.handler = handler;
handler->inner.is_inner = true; handler->inner.is_inner = true;
@ -501,83 +493,46 @@ static int init_network_handler(SFNetworkHandler *handler,
} }
} }
int sf_load_context_from_config_ex(SFContext *sf_context, static void set_bind_address(const char *bind_addr, char *ipv4_bind_addr,
char *ipv6_bind_addr, const int addr_size)
{
char new_bind_addr[2 * IP_ADDRESS_SIZE];
char *cols[2];
char *ip_addr;
int count;
int len;
int i;
if (bind_addr == NULL || *bind_addr == '\0') {
*ipv4_bind_addr = *ipv6_bind_addr = '\0';
return;
}
snprintf(new_bind_addr, sizeof(new_bind_addr), "%s", bind_addr);
count = splitEx(new_bind_addr, ',', cols, 2);
for (i=0; i<count; i++) {
ip_addr = cols[i];
if (is_ipv6_addr(ip_addr)) {
len = strlen(ip_addr);
if (*ip_addr == '[' && *(ip_addr + (len - 1)) == ']') {
++ip_addr;
len -= 2;
}
snprintf(ipv6_bind_addr, addr_size, "%.*s", len, ip_addr);
} else {
snprintf(ipv4_bind_addr, addr_size, "%s", ip_addr);
}
}
}
static int load_bind_address(SFContext *sf_context,
SFContextIniConfig *config) SFContextIniConfig *config)
{ {
SFNetworkHandler *sock_handler;
SFNetworkHandler *rdma_handler;
char *inner_port;
char *outer_port;
char *inner_bind_addr; char *inner_bind_addr;
char *outer_bind_addr; char *outer_bind_addr;
char *bind_addr; char *bind_addr;
int port; SFAddressFamilyHandler *ipv4_handler;
int i; SFAddressFamilyHandler *ipv6_handler;
int result;
sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
sock_handler->comm_type = fc_comm_type_sock;
rdma_handler->comm_type = fc_comm_type_rdma;
if (config->comm_type == fc_comm_type_sock) {
sock_handler->enabled = true;
rdma_handler->enabled = false;
} else if (config->comm_type == fc_comm_type_rdma) {
sock_handler->enabled = false;
rdma_handler->enabled = true;
} else if (config->comm_type == fc_comm_type_both) {
sock_handler->enabled = true;
rdma_handler->enabled = true;
}
for (i=0; i<SF_NETWORK_HANDLER_COUNT; i++) {
if (!sf_context->handlers[i].enabled) {
continue;
}
if ((result=init_network_handler(sf_context->handlers + i,
sf_context)) != 0)
{
return result;
}
}
inner_port = iniGetStrValue(config->ini_ctx.section_name,
"inner_port", config->ini_ctx.context);
outer_port = iniGetStrValue(config->ini_ctx.section_name,
"outer_port", config->ini_ctx.context);
if (inner_port == NULL && outer_port == NULL) {
port = iniGetIntValue(config->ini_ctx.section_name,
"port", config->ini_ctx.context, 0);
if (port > 0) {
sock_handler->inner.port = sock_handler->outer.port = port;
}
} else {
if (inner_port != NULL) {
sock_handler->inner.port = strtol(inner_port, NULL, 10);
}
if (outer_port != NULL) {
sock_handler->outer.port = strtol(outer_port, NULL, 10);
}
}
if (sock_handler->inner.port <= 0) {
sock_handler->inner.port = config->default_inner_port;
}
if (sock_handler->outer.port <= 0) {
sock_handler->outer.port = config->default_outer_port;
}
if (sock_handler->inner.port == sock_handler->outer.port) {
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = false;
} else {
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = true;
}
rdma_handler->inner.port = sock_handler->inner.port;
rdma_handler->inner.enabled = sock_handler->inner.enabled;
rdma_handler->outer.port = sock_handler->outer.port;
rdma_handler->outer.enabled = sock_handler->outer.enabled;
inner_bind_addr = iniGetStrValue(config->ini_ctx.section_name, inner_bind_addr = iniGetStrValue(config->ini_ctx.section_name,
"inner_bind_addr", config->ini_ctx.context); "inner_bind_addr", config->ini_ctx.context);
@ -590,10 +545,192 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
inner_bind_addr = outer_bind_addr = bind_addr; inner_bind_addr = outer_bind_addr = bind_addr;
} }
} }
set_config_str_value(inner_bind_addr, sf_context->inner_bind_addr,
sizeof(sf_context->inner_bind_addr)); ipv4_handler = sf_context->handlers + SF_IPV4_ADDRESS_FAMILY_INDEX;
set_config_str_value(outer_bind_addr, sf_context->outer_bind_addr, ipv6_handler = sf_context->handlers + SF_IPV6_ADDRESS_FAMILY_INDEX;
sizeof(sf_context->outer_bind_addr)); set_bind_address(inner_bind_addr, ipv4_handler->inner_bind_addr,
ipv6_handler->inner_bind_addr,
sizeof(ipv4_handler->inner_bind_addr));
set_bind_address(outer_bind_addr, ipv4_handler->outer_bind_addr,
ipv6_handler->outer_bind_addr,
sizeof(ipv4_handler->outer_bind_addr));
return 0;
}
static int load_address_family(SFContext *sf_context,
SFContextIniConfig *config)
{
char *address_family_str;
SFAddressFamily address_family;
SFAddressFamilyHandler *ipv4_handler;
SFAddressFamilyHandler *ipv6_handler;
bool ipv4_bound;
bool ipv6_bound;
address_family_str = iniGetStrValue(config->ini_ctx.section_name,
"address_family", config->ini_ctx.context);
if (address_family_str == NULL) {
sf_context->address_family = sf_address_family_auto;
} else if (strcasecmp(address_family_str, "auto") == 0) {
sf_context->address_family = sf_address_family_auto;
} else if (strcasecmp(address_family_str, "IPv4") == 0) {
sf_context->address_family = sf_address_family_ipv4;
} else if (strcasecmp(address_family_str, "IPv6") == 0) {
sf_context->address_family = sf_address_family_ipv6;
} else if (strcasecmp(address_family_str, "both") == 0) {
sf_context->address_family = sf_address_family_both;
} else {
logError("file: "__FILE__", line: %d, "
"config file: %s, section: %s, address_family: %s "
"is invalid!", __LINE__, config->ini_ctx.filename,
config->ini_ctx.section_name, address_family_str);
return EINVAL;
}
ipv4_handler = sf_context->handlers + SF_IPV4_ADDRESS_FAMILY_INDEX;
ipv6_handler = sf_context->handlers + SF_IPV6_ADDRESS_FAMILY_INDEX;
if (sf_context->address_family == sf_address_family_auto) {
ipv4_bound = (*ipv4_handler->inner_bind_addr != '\0' ||
*ipv4_handler->outer_bind_addr != '\0');
ipv6_bound = (*ipv6_handler->inner_bind_addr != '\0' ||
*ipv6_handler->outer_bind_addr != '\0');
if (ipv4_bound) {
if (ipv6_bound) {
address_family = sf_address_family_both;
} else {
address_family = sf_address_family_ipv4;
}
} else {
if (ipv6_bound) {
address_family = sf_address_family_ipv6;
} else {
int ipv4_count;
int ipv6_count;
stat_local_host_ip(&ipv4_count, &ipv6_count);
if (ipv4_count > 0) {
address_family = sf_address_family_ipv4;
} else {
address_family = sf_address_family_ipv6;
}
}
}
} else {
address_family = sf_context->address_family;
}
switch (address_family) {
case sf_address_family_ipv4:
ipv4_handler->af = AF_INET;
ipv6_handler->af = AF_UNSPEC;
break;
case sf_address_family_ipv6:
ipv4_handler->af = AF_UNSPEC;
ipv6_handler->af = AF_INET6;
break;
case sf_address_family_both:
ipv4_handler->af = AF_INET;
ipv6_handler->af = AF_INET6;
break;
default:
break;
}
return 0;
}
int sf_load_context_from_config_ex(SFContext *sf_context,
SFContextIniConfig *config)
{
SFAddressFamilyHandler *fh;
SFNetworkHandler *sock_handler;
SFNetworkHandler *rdma_handler;
SFNetworkHandler *handler;
SFNetworkHandler *end;
char *inner_port_str;
char *outer_port_str;
int inner_port;
int outer_port;
int port;
int i;
int result;
inner_port_str = iniGetStrValue(config->ini_ctx.section_name,
"inner_port", config->ini_ctx.context);
outer_port_str = iniGetStrValue(config->ini_ctx.section_name,
"outer_port", config->ini_ctx.context);
if (inner_port_str == NULL && outer_port_str == NULL) {
port = iniGetIntValue(config->ini_ctx.section_name,
"port", config->ini_ctx.context, 0);
if (port > 0) {
inner_port = outer_port = port;
} else {
inner_port = outer_port = 0;
}
} else {
if (inner_port_str != NULL) {
inner_port = strtol(inner_port_str, NULL, 10);
} else {
inner_port = 0;
}
if (outer_port_str != NULL) {
outer_port = strtol(outer_port_str, NULL, 10);
} else {
outer_port = 0;
}
}
if (inner_port <= 0) {
inner_port = config->default_inner_port;
}
if (outer_port <= 0) {
outer_port = config->default_outer_port;
}
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
fh = sf_context->handlers + i;
fh->ctx = sf_context;
sock_handler = fh->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
rdma_handler = fh->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
sock_handler->comm_type = fc_comm_type_sock;
rdma_handler->comm_type = fc_comm_type_rdma;
if (config->comm_type == fc_comm_type_sock) {
sock_handler->enabled = true;
rdma_handler->enabled = false;
} else if (config->comm_type == fc_comm_type_rdma) {
sock_handler->enabled = false;
rdma_handler->enabled = true;
} else if (config->comm_type == fc_comm_type_both) {
sock_handler->enabled = true;
rdma_handler->enabled = true;
}
end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=fh->handlers; handler<end; handler++) {
if (!handler->enabled) {
continue;
}
if ((result=init_network_handler(handler, fh)) != 0) {
return result;
}
}
sock_handler->inner.port = inner_port;
sock_handler->outer.port = outer_port;
if (sock_handler->inner.port == sock_handler->outer.port) {
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = false;
} else {
sock_handler->inner.enabled = true;
sock_handler->outer.enabled = true;
}
rdma_handler->inner.port = sock_handler->inner.port;
rdma_handler->inner.enabled = sock_handler->inner.enabled;
rdma_handler->outer.port = sock_handler->outer.port;
rdma_handler->outer.enabled = sock_handler->outer.enabled;
}
sf_context->accept_threads = iniGetIntValue( sf_context->accept_threads = iniGetIntValue(
config->ini_ctx.section_name, config->ini_ctx.section_name,
@ -619,51 +756,123 @@ int sf_load_context_from_config_ex(SFContext *sf_context,
return EINVAL; return EINVAL;
} }
if ((result=load_bind_address(sf_context, config)) != 0) {
return result;
}
if ((result=load_address_family(sf_context, config)) != 0) {
return result;
}
return 0; return 0;
} }
int sf_alloc_rdma_pd(SFContext *sf_context, int sf_alloc_rdma_pd(SFContext *sf_context,
FCAddressPtrArray *address_array) FCAddressPtrArray *address_array)
{ {
SFAddressFamilyHandler *fh;
SFNetworkHandler *handler; SFNetworkHandler *handler;
int i;
int result; int result;
handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
if (!handler->enabled) { fh = sf_context->handlers + i;
return 0; if (fh->af == AF_UNSPEC) {
continue;
}
handler = fh->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX;
if (handler->enabled) {
if ((handler->pd=fc_alloc_rdma_pd(handler->alloc_pd,
address_array, &result)) == NULL)
{
return result;
}
}
} }
handler->pd = fc_alloc_rdma_pd(handler->alloc_pd, return 0;
address_array, &result); }
return result;
static void combine_bind_addr(char *bind_addr, const char *ip_addr)
{
char *p;
if (*bind_addr == '\0') {
p = bind_addr;
} else {
p = bind_addr + strlen(bind_addr);
*p++ = ',';
}
sprintf(p, "%s", ip_addr);
}
static const char *get_address_family_caption(
const SFAddressFamily address_family)
{
switch (address_family) {
case sf_address_family_auto:
return "auto";
case sf_address_family_ipv4:
return "IPv4";
case sf_address_family_ipv6:
return "IPv6";
case sf_address_family_both:
return "both";
default:
return "unkown";
}
} }
void sf_context_config_to_string(const SFContext *sf_context, void sf_context_config_to_string(const SFContext *sf_context,
char *output, const int size) char *output, const int size)
{ {
const SFAddressFamilyHandler *fh;
const SFNetworkHandler *sock_handler; const SFNetworkHandler *sock_handler;
char inner_bind_addr[2 * IP_ADDRESS_SIZE + 2];
char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2];
int i;
int len; int len;
sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; *inner_bind_addr = '\0';
*outer_bind_addr = '\0';
sock_handler = NULL;
for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
fh = sf_context->handlers + i;
if (fh->af == AF_UNSPEC) {
continue;
}
if (*(fh->inner_bind_addr) != '\0') {
combine_bind_addr(inner_bind_addr, fh->inner_bind_addr);
}
if (*(fh->outer_bind_addr) != '\0') {
combine_bind_addr(outer_bind_addr, fh->outer_bind_addr);
}
sock_handler = fh->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX;
}
len = 0; len = 0;
if ((sock_handler->inner.port == sock_handler->outer.port) && if ((sock_handler->inner.port == sock_handler->outer.port) &&
(strcmp(sf_context->inner_bind_addr, (strcmp(inner_bind_addr, outer_bind_addr) == 0))
sf_context->outer_bind_addr) == 0))
{ {
len += snprintf(output + len, size - len, len += snprintf(output + len, size - len,
"port=%u, bind_addr=%s", "port=%u, bind_addr=%s",
sock_handler->inner.port, sock_handler->inner.port,
sf_context->inner_bind_addr); inner_bind_addr);
} else { } else {
len += snprintf(output + len, size - len, len += snprintf(output + len, size - len,
"inner_port=%u, inner_bind_addr=%s, " "inner_port=%u, inner_bind_addr=%s, "
"outer_port=%u, outer_bind_addr=%s", "outer_port=%u, outer_bind_addr=%s",
sock_handler->inner.port, sf_context->inner_bind_addr, sock_handler->inner.port, inner_bind_addr,
sock_handler->outer.port, sf_context->outer_bind_addr); sock_handler->outer.port, outer_bind_addr);
} }
len += snprintf(output + len, size - len, len += snprintf(output + len, size - len,
", accept_threads=%d, work_threads=%d", ", address_family=%s, accept_threads=%d, work_threads=%d",
get_address_family_caption(sf_context->address_family),
sf_context->accept_threads, sf_context->work_threads); sf_context->accept_threads, sf_context->work_threads);
} }

View File

@ -90,12 +90,24 @@ extern SFContext g_sf_context;
#define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size
#define SF_G_UP_TIME g_sf_global_vars.up_time #define SF_G_UP_TIME g_sf_global_vars.up_time
#define SF_G_SOCK_HANDLER (g_sf_context.handlers + \ #define SF_G_SOCK_HANDLER (g_sf_context.handlers \
[SF_IPV4_ADDRESS_FAMILY_INDEX].handlers + \
SF_SOCKET_NETWORK_HANDLER_INDEX) SF_SOCKET_NETWORK_HANDLER_INDEX)
#define SF_G_OUTER_PORT SF_G_SOCK_HANDLER->outer.port #define SF_G_OUTER_PORT SF_G_SOCK_HANDLER->outer.port
#define SF_G_INNER_PORT SF_G_SOCK_HANDLER->inner.port #define SF_G_INNER_PORT SF_G_SOCK_HANDLER->inner.port
#define SF_G_OUTER_BIND_ADDR g_sf_context.outer_bind_addr #define SF_G_OUTER_BIND_ADDR4 g_sf_context.handlers \
#define SF_G_INNER_BIND_ADDR g_sf_context.inner_bind_addr [SF_IPV4_ADDRESS_FAMILY_INDEX].outer_bind_addr
#define SF_G_INNER_BIND_ADDR4 g_sf_context.handlers \
[SF_IPV4_ADDRESS_FAMILY_INDEX].inner_bind_addr
#define SF_G_OUTER_BIND_ADDR6 g_sf_context.handlers \
[SF_IPV6_ADDRESS_FAMILY_INDEX].outer_bind_addr
#define SF_G_INNER_BIND_ADDR6 g_sf_context.handlers \
[SF_IPV6_ADDRESS_FAMILY_INDEX].inner_bind_addr
#define SF_G_IPV4_ENABLED (g_sf_context.handlers \
[SF_IPV4_ADDRESS_FAMILY_INDEX].af == AF_INET)
#define SF_G_IPV6_ENABLED (g_sf_context.handlers \
[SF_IPV6_ADDRESS_FAMILY_INDEX].af == AF_INET6)
#define SF_G_ACCEPT_THREADS g_sf_context.accept_threads #define SF_G_ACCEPT_THREADS g_sf_context.accept_threads
#define SF_G_WORK_THREADS g_sf_context.work_threads #define SF_G_WORK_THREADS g_sf_context.work_threads
@ -115,6 +127,11 @@ extern SFContext g_sf_context;
#define SF_ALIVE_THREAD_COUNT(sf_context) sf_context.thread_count #define SF_ALIVE_THREAD_COUNT(sf_context) sf_context.thread_count
#define SF_THREAD_INDEX(sf_context, tdata) (int)(tdata - sf_context.thread_data) #define SF_THREAD_INDEX(sf_context, tdata) (int)(tdata - sf_context.thread_data)
#define SF_IPV4_ENABLED(sf_context) (sf_context.handlers \
[SF_IPV4_ADDRESS_FAMILY_INDEX].af == AF_INET)
#define SF_IPV6_ENABLED(sf_context) (sf_context.handlers \
[SF_IPV6_ADDRESS_FAMILY_INDEX].af == AF_INET6)
#define SF_CHOWN_RETURN_ON_ERROR(path, current_uid, current_gid) \ #define SF_CHOWN_RETURN_ON_ERROR(path, current_uid, current_gid) \
do { \ do { \
if (g_sf_global_vars.run_by.inited && !(g_sf_global_vars. \ if (g_sf_global_vars.run_by.inited && !(g_sf_global_vars. \

View File

@ -27,7 +27,7 @@
#include "sf_types.h" #include "sf_types.h"
#include "sf_global.h" #include "sf_global.h"
#define SF_CTX (task->handler->ctx) #define SF_CTX (task->handler->fh->ctx)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {

View File

@ -35,8 +35,8 @@ int sf_proto_set_body_length(struct fast_task_info *task)
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"%s peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT" is invalid, " "%s peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT" is invalid, "
"expect: "SF_PROTO_MAGIC_FORMAT", cmd: %d, body length: %d", "expect: "SF_PROTO_MAGIC_FORMAT", cmd: %d, body length: %d",
__LINE__, (task->handler != NULL ? task->handler->ctx->name : __LINE__, (task->handler != NULL ? task->handler->fh->ctx->
""), task->client_ip, task->port, name : ""), task->client_ip, task->port,
SF_PROTO_MAGIC_PARAMS(header->magic), SF_PROTO_MAGIC_PARAMS(header->magic),
SF_PROTO_MAGIC_EXPECT_PARAMS, header->cmd, SF_PROTO_MAGIC_EXPECT_PARAMS, header->cmd,
buff2int(header->body_len)); buff2int(header->body_len));

View File

@ -115,6 +115,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
int result; int result;
int bytes; int bytes;
int extra_events; int extra_events;
int i;
struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_contexts;
struct worker_thread_context *thread_ctx; struct worker_thread_context *thread_ctx;
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
@ -132,8 +133,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
send_done_callback, deal_func, task_cleanup_func, send_done_callback, deal_func, task_cleanup_func,
timeout_callback, release_buffer_callback); timeout_callback, release_buffer_callback);
if (explicit_post_recv) { if (explicit_post_recv) {
sf_context->handlers[SF_RDMACM_NETWORK_HANDLER_INDEX]. for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
explicit_post_recv = true; sf_context->handlers[i].handlers[SF_RDMACM_NETWORK_HANDLER_INDEX].
explicit_post_recv = true;
}
} }
if ((result=sf_init_free_queue(&sf_context->free_queue, if ((result=sf_init_free_queue(&sf_context->free_queue,
@ -350,28 +353,8 @@ int sf_socket_create_server(SFListener *listener,
{ {
int result; int result;
if (af == AF_UNSPEC) { listener->sock = socketServer2(af, bind_addr,
if (bind_addr == NULL || *bind_addr == '\0') { listener->port, &result);
// 如果当前服务不存在IPv4地址但是存在IPv6地址则自动绑定IPv6地址
if (!checkHostHasIPv4Addr() && checkHostHasIPv6Addr()) {
listener->sock = socketServerIPv6(bind_addr,
listener->port, &result);
} else {
listener->sock = socketServer(bind_addr,
listener->port, &result);
}
} else if (is_ipv6_addr(bind_addr)) {
listener->sock = socketServerIPv6(bind_addr,
listener->port, &result);
} else {
listener->sock = socketServer(bind_addr,
listener->port, &result);
}
} else {
listener->sock = socketServer2(af, bind_addr,
listener->port, &result);
}
if (listener->sock < 0) { if (listener->sock < 0) {
return result; return result;
} }
@ -386,80 +369,90 @@ int sf_socket_create_server(SFListener *listener,
int sf_socket_server_ex(SFContext *sf_context) int sf_socket_server_ex(SFContext *sf_context)
{ {
int result; int result;
int af = AF_UNSPEC; int i;
bool dual_ports; bool dual_ports;
const char *bind_addr; const char *bind_addr;
SFAddressFamilyHandler *fh;
SFNetworkHandler *handler; SFNetworkHandler *handler;
SFNetworkHandler *end; SFNetworkHandler *end;
end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
for (handler=sf_context->handlers; handler<end; handler++) { fh = sf_context->handlers + i;
if (!handler->enabled) { if (fh->af == AF_UNSPEC) {
continue; continue;
} }
handler->inner.enabled = false; end = fh->handlers + SF_NETWORK_HANDLER_COUNT;
handler->outer.enabled = false; for (handler=fh->handlers; handler<end; handler++) {
if (handler->outer.port == handler->inner.port) { if (!handler->enabled) {
if (*sf_context->outer_bind_addr == '\0' || continue;
*sf_context->inner_bind_addr == '\0') { }
bind_addr = "";
if ((result=handler->create_server(&handler-> handler->inner.enabled = false;
outer, af, bind_addr)) != 0) handler->outer.enabled = false;
if (handler->outer.port == handler->inner.port) {
if (*fh->outer_bind_addr == '\0' ||
*fh->inner_bind_addr == '\0')
{ {
return result; bind_addr = "";
}
handler->outer.enabled = true;
dual_ports = false;
} else if (strcmp(sf_context->outer_bind_addr,
sf_context->inner_bind_addr) == 0) {
bind_addr = sf_context->outer_bind_addr;
if (is_private_ip(bind_addr)) {
if ((result=handler->create_server(&handler-> if ((result=handler->create_server(&handler->
inner, af, bind_addr)) != 0) outer, fh->af, bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
} else {
if ((result=handler->create_server(&handler->
outer, af, bind_addr)) != 0)
{ {
return result; return result;
} }
handler->outer.enabled = true; handler->outer.enabled = true;
dual_ports = false;
} else if (strcmp(fh->outer_bind_addr,
fh->inner_bind_addr) == 0)
{
bind_addr = fh->outer_bind_addr;
if (is_private_ip(bind_addr)) {
if ((result=handler->create_server(&handler->
inner, fh->af, bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
} else {
if ((result=handler->create_server(&handler->
outer, fh->af, bind_addr)) != 0)
{
return result;
}
handler->outer.enabled = true;
}
dual_ports = false;
} else {
dual_ports = true;
} }
dual_ports = false;
} else { } else {
dual_ports = true; dual_ports = true;
} }
} else {
dual_ports = true;
}
if (dual_ports) { if (dual_ports) {
if ((result=handler->create_server(&handler->outer, af, if ((result=handler->create_server(&handler->outer,
sf_context->outer_bind_addr)) != 0) fh->af, fh->outer_bind_addr)) != 0)
{ {
return result; return result;
}
if ((result=handler->create_server(&handler->inner,
fh->af, fh->inner_bind_addr)) != 0)
{
return result;
}
handler->inner.enabled = true;
handler->outer.enabled = true;
} }
if ((result=handler->create_server(&handler->inner, af, /*
sf_context->inner_bind_addr)) != 0) logInfo("%p [%d] inner {port: %d, enabled: %d}, "
{ "outer {port: %d, enabled: %d}", sf_context,
return result; (int)(handler-sf_context->handlers),
} handler->inner.port, handler->inner.enabled,
handler->inner.enabled = true; handler->outer.port, handler->outer.enabled);
handler->outer.enabled = true; */
} }
/*
logInfo("%p [%d] inner {port: %d, enabled: %d}, "
"outer {port: %d, enabled: %d}", sf_context,
(int)(handler-sf_context->handlers),
handler->inner.port, handler->inner.enabled,
handler->outer.port, handler->outer.enabled);
*/
} }
return 0; return 0;
@ -518,19 +511,27 @@ void sf_socket_close_connection(struct fast_task_info *task)
void sf_socket_close_ex(SFContext *sf_context) void sf_socket_close_ex(SFContext *sf_context)
{ {
int i;
SFNetworkHandler *handler; SFNetworkHandler *handler;
SFNetworkHandler *end; SFNetworkHandler *end;
end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
for (handler=sf_context->handlers; handler<end; handler++) { if (sf_context->handlers[i].af == AF_UNSPEC) {
if (!handler->enabled) {
continue; continue;
} }
if (handler->outer.enabled) {
handler->close_server(&handler->outer); end = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT;
} for (handler=sf_context->handlers[i].handlers; handler<end; handler++) {
if (handler->inner.enabled) { if (!handler->enabled) {
handler->close_server(&handler->inner); continue;
}
if (handler->outer.enabled) {
handler->close_server(&handler->outer);
}
if (handler->inner.enabled) {
handler->close_server(&handler->inner);
}
} }
} }
} }
@ -544,10 +545,10 @@ static void accept_run(SFListener *listener)
continue; continue;
} }
task->thread_data = listener->handler->ctx->thread_data + task->thread_data = listener->handler->fh->ctx->thread_data +
task->event.fd % listener->handler->ctx->work_threads; task->event.fd % listener->handler->fh->ctx->work_threads;
if (listener->handler->ctx->callbacks.accept_done != NULL) { if (listener->handler->fh->ctx->callbacks.accept_done != NULL) {
if (listener->handler->ctx->callbacks.accept_done(task, if (listener->handler->fh->ctx->callbacks.accept_done(task,
listener->inaddr.sin_addr.s_addr, listener->inaddr.sin_addr.s_addr,
listener->is_inner) != 0) listener->is_inner) != 0)
{ {
@ -571,7 +572,7 @@ static void *accept_thread_entrance(SFListener *listener)
char thread_name[32]; char thread_name[32];
snprintf(thread_name, sizeof(thread_name), "%s-%s-listen", snprintf(thread_name, sizeof(thread_name), "%s-%s-listen",
listener->handler->comm_type == fc_comm_type_sock ? listener->handler->comm_type == fc_comm_type_sock ?
"sock" : "rdma", listener->handler->ctx->name); "sock" : "rdma", listener->handler->fh->ctx->name);
prctl(PR_SET_NAME, thread_name); prctl(PR_SET_NAME, thread_name);
} }
#endif #endif
@ -618,25 +619,35 @@ int _accept_loop(SFListener *listener, const int accept_threads)
int sf_accept_loop_ex(SFContext *sf_context, const bool blocked) int sf_accept_loop_ex(SFContext *sf_context, const bool blocked)
{ {
int i;
SFNetworkHandler *handler; SFNetworkHandler *handler;
SFNetworkHandler *hend; SFNetworkHandler *hend;
SFListener *listeners[SF_NETWORK_HANDLER_COUNT * 2]; SFListener *listeners[SF_ADDRESS_FAMILY_COUNT *
SF_NETWORK_HANDLER_COUNT * 2];
SFListener **listener; SFListener **listener;
SFListener **last; SFListener **last;
SFListener **lend; SFListener **lend;
listener = listeners; listener = listeners;
hend = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
for (handler=sf_context->handlers; handler<hend; handler++) { if (sf_context->handlers[i].af == AF_UNSPEC) {
if (!handler->enabled) {
continue; continue;
} }
if (handler->inner.enabled) { hend = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT;
*listener++ = &handler->inner; for (handler=sf_context->handlers[i].handlers;
} handler<hend; handler++)
if (handler->outer.enabled) { {
*listener++ = &handler->outer; if (!handler->enabled) {
continue;
}
if (handler->inner.enabled) {
*listener++ = &handler->inner;
}
if (handler->outer.enabled) {
*listener++ = &handler->outer;
}
} }
} }
@ -862,59 +873,3 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler)
{ {
sig_quit_handler = quit_handler; sig_quit_handler = quit_handler;
} }
// 判断当前服务器是否存在IPv4地址
bool checkHostHasIPv4Addr()
{
struct ifaddrs *ifaddr, *ifa;
bool hasIPv4;
if (getifaddrs(&ifaddr) == -1) {
return false;
}
hasIPv4 = false;
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL) {
continue;
}
if (strcmp(ifa->ifa_name, "lo") == 0) { // 排除lo接口
continue;
}
if (ifa->ifa_addr->sa_family == AF_INET) {
hasIPv4 = true;
break;
}
}
freeifaddrs(ifaddr);
return hasIPv4;
}
// 判断当前服务器是否存在IPv6地址
bool checkHostHasIPv6Addr()
{
struct ifaddrs *ifaddr, *ifa;
bool hasIPv6;
if (getifaddrs(&ifaddr) == -1) {
return false;
}
hasIPv6 = false;
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL) {
continue;
}
if (strcmp(ifa->ifa_name, "lo") == 0) { // 排除lo接口
continue;
}
if (ifa->ifa_addr->sa_family == AF_INET6) {
hasIPv6 = true;
break;
}
}
freeifaddrs(ifaddr);
return hasIPv6;
}

View File

@ -156,7 +156,7 @@ static inline struct fast_task_info *sf_alloc_init_task_ex(
{ {
struct fast_task_info *task; struct fast_task_info *task;
task = free_queue_pop(&handler->ctx->free_queue); task = free_queue_pop(&handler->fh->ctx->free_queue);
if (task == NULL) { if (task == NULL) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"malloc task buff failed, you should " "malloc task buff failed, you should "
@ -190,22 +190,23 @@ static inline void sf_release_task(struct fast_task_info *task)
} }
} }
// 判断当前服务器是否存在IPv4地址
bool checkHostHasIPv4Addr();
// 判断当前服务器是否存在IPv6地址
bool checkHostHasIPv6Addr();
static inline SFNetworkHandler *sf_get_first_network_handler_ex( static inline SFNetworkHandler *sf_get_first_network_handler_ex(
SFContext *sf_context) SFContext *sf_context)
{ {
int i;
SFNetworkHandler *handler; SFNetworkHandler *handler;
SFNetworkHandler *end; SFNetworkHandler *end;
end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
for (handler=sf_context->handlers; handler<end; handler++) { if (sf_context->handlers[i].af == AF_UNSPEC) {
if (handler->enabled) { continue;
return handler; }
end = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT;
for (handler=sf_context->handlers[i].handlers; handler<end; handler++) {
if (handler->enabled) {
return handler;
}
} }
} }
@ -218,10 +219,20 @@ static inline SFNetworkHandler *sf_get_first_network_handler_ex(
static inline SFNetworkHandler *sf_get_rdma_network_handler( static inline SFNetworkHandler *sf_get_rdma_network_handler(
SFContext *sf_context) SFContext *sf_context)
{ {
int i;
SFNetworkHandler *handler; SFNetworkHandler *handler;
handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; for (i=0; i<SF_ADDRESS_FAMILY_COUNT; i++) {
return (handler->enabled ? handler : NULL); if (sf_context->handlers[i].af != AF_UNSPEC) {
handler = sf_context->handlers[i].handlers +
SF_RDMACM_NETWORK_HANDLER_INDEX;
if (handler->enabled) {
return handler;
}
}
}
return NULL;
} }
static inline SFNetworkHandler *sf_get_rdma_network_handler2( static inline SFNetworkHandler *sf_get_rdma_network_handler2(

View File

@ -34,6 +34,10 @@
#define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency
#define SF_ADDRESS_FAMILY_COUNT 2
#define SF_IPV4_ADDRESS_FAMILY_INDEX 0
#define SF_IPV6_ADDRESS_FAMILY_INDEX 1
#define SF_NETWORK_HANDLER_COUNT 2 #define SF_NETWORK_HANDLER_COUNT 2
#define SF_SOCKET_NETWORK_HANDLER_INDEX 0 #define SF_SOCKET_NETWORK_HANDLER_INDEX 0
#define SF_RDMACM_NETWORK_HANDLER_INDEX 1 #define SF_RDMACM_NETWORK_HANDLER_INDEX 1
@ -61,6 +65,13 @@ typedef enum {
sf_comm_action_finish = 'f' sf_comm_action_finish = 'f'
} SFCommAction; } SFCommAction;
typedef enum {
sf_address_family_auto = 0,
sf_address_family_ipv4 = 1,
sf_address_family_ipv6 = 2,
sf_address_family_both = 3
} SFAddressFamily;
struct ibv_pd; struct ibv_pd;
struct sf_listener; struct sf_listener;
@ -98,11 +109,12 @@ typedef struct sf_listener {
} SFListener; } SFListener;
struct sf_context; struct sf_context;
struct sf_address_family_handler;
typedef struct sf_network_handler { typedef struct sf_network_handler {
bool enabled; bool enabled;
bool explicit_post_recv; bool explicit_post_recv;
FCCommunicationType comm_type; FCCommunicationType comm_type;
struct sf_context *ctx; struct sf_address_family_handler *fh;
struct ibv_pd *pd; struct ibv_pd *pd;
SFListener inner; SFListener inner;
@ -140,20 +152,26 @@ typedef struct sf_nio_callbacks {
sf_release_buffer_callback release_buffer; sf_release_buffer_callback release_buffer;
} SFNIOCallbacks; } SFNIOCallbacks;
typedef struct sf_address_family_handler {
int af; //AF_UNSPEC for disabled
SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT];
char inner_bind_addr[IP_ADDRESS_SIZE];
char outer_bind_addr[IP_ADDRESS_SIZE];
struct sf_context *ctx;
} SFAddressFamilyHandler;
typedef struct sf_context { typedef struct sf_context {
char name[64]; char name[64];
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
volatile int thread_count; volatile int thread_count;
//int rdma_port_offset; //int rdma_port_offset;
SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT]; SFAddressFamily address_family;
SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT];
int accept_threads; int accept_threads;
int work_threads; int work_threads;
char inner_bind_addr[IP_ADDRESS_SIZE];
char outer_bind_addr[IP_ADDRESS_SIZE];
int header_size; int header_size;
bool remove_from_ready_list; bool remove_from_ready_list;
bool realloc_task_buffer; bool realloc_task_buffer;