diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 822f008..6170d50 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -175,12 +175,19 @@ static struct fast_task_info *alloc_channel_task(IdempotencyClientChannel const char *server_ip, const uint16_t port, int *err_no) { struct fast_task_info *task; + SFAddressFamilyHandler *fh; SFNetworkHandler *handler; - if (comm_type == fc_comm_type_sock) { - handler = g_sf_context.handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + if (is_ipv6_addr(server_ip)) { + fh = g_sf_context.handlers + SF_IPV6_ADDRESS_FAMILY_INDEX; } else { - handler = g_sf_context.handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + fh = g_sf_context.handlers + SF_IPV4_ADDRESS_FAMILY_INDEX; + } + + if (comm_type == fc_comm_type_sock) { + handler = fh->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + } else { + handler = fh->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; } if ((task=sf_alloc_init_task(handler, -1)) == NULL) { *err_no = ENOMEM; diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index e7e35d0..b275cfe 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -521,7 +521,6 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, connect_done_callback, void *args, FCServerConfig *server_cfg, const bool bg_thread_enabled) { - const int socket_domain = AF_UNSPEC; struct { ConnectionExtraParams holder; ConnectionExtraParams *ptr; @@ -553,8 +552,8 @@ int sf_connection_manager_init_ex(SFConnectionManager *cm, extra_params.ptr = &extra_params.holder; } if ((result=conn_pool_init_ex1(&cm->cpool, common_cfg->connect_timeout, - max_count_per_entry, max_idle_time, socket_domain, - htable_init_capacity, connect_done_callback, args, + max_count_per_entry, max_idle_time, htable_init_capacity, + connect_done_callback, args, sf_cm_validate_connection_callback, cm, sizeof(SFConnectionParameters), extra_params.ptr)) != 0) diff --git a/src/sf_global.c b/src/sf_global.c index 4754bd3..acc22b5 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -30,6 +30,7 @@ #include "fastcommon/common_define.h" #include "fastcommon/shared_func.h" #include "fastcommon/process_ctrl.h" +#include "fastcommon/local_ip_func.h" #include "fastcommon/logger.h" #include "sf_nio.h" #include "sf_service.h" @@ -46,22 +47,13 @@ SFGlobalVariables g_sf_global_vars = { {0, 0}, NULL, {NULL, 0} }; -SFContext g_sf_context = {{'\0'}, NULL, 0, - {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, - 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, true, - {false, 0, 0}, {sf_task_finish_clean_up} +SFContext g_sf_context = {{'\0'}, NULL, 0, sf_address_family_auto, + {{AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}, + {AF_UNSPEC, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}}}, + 1, DEFAULT_WORK_THREADS, 0, true, true, true, {false, 0, 0}, + {sf_task_finish_clean_up} }; -static inline void set_config_str_value(const char *value, - char *dest, const int dest_size) -{ - if (value == NULL) { - *dest = '\0'; - } else { - snprintf(dest, dest_size, "%s", value); - } -} - static int load_network_parameters(IniFullContext *ini_ctx, const char *max_pkg_size_item_nm, const int fixed_buff_size, const int task_buffer_extra_size) @@ -472,9 +464,9 @@ static int load_rdma_apis(SFNetworkHandler *handler) } static int init_network_handler(SFNetworkHandler *handler, - SFContext *sf_context) + SFAddressFamilyHandler *fh) { - handler->ctx = sf_context; + handler->fh = fh; handler->inner.handler = handler; handler->outer.handler = handler; handler->inner.is_inner = true; @@ -501,83 +493,46 @@ static int init_network_handler(SFNetworkHandler *handler, } } -int sf_load_context_from_config_ex(SFContext *sf_context, +static void set_bind_address(const char *bind_addr, char *ipv4_bind_addr, + char *ipv6_bind_addr, const int addr_size) +{ + char new_bind_addr[2 * IP_ADDRESS_SIZE]; + char *cols[2]; + char *ip_addr; + int count; + int len; + int i; + + if (bind_addr == NULL || *bind_addr == '\0') { + *ipv4_bind_addr = *ipv6_bind_addr = '\0'; + return; + } + + snprintf(new_bind_addr, sizeof(new_bind_addr), "%s", bind_addr); + count = splitEx(new_bind_addr, ',', cols, 2); + for (i=0; ihandlers + SF_SOCKET_NETWORK_HANDLER_INDEX; - rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; - sock_handler->comm_type = fc_comm_type_sock; - rdma_handler->comm_type = fc_comm_type_rdma; - if (config->comm_type == fc_comm_type_sock) { - sock_handler->enabled = true; - rdma_handler->enabled = false; - } else if (config->comm_type == fc_comm_type_rdma) { - sock_handler->enabled = false; - rdma_handler->enabled = true; - } else if (config->comm_type == fc_comm_type_both) { - sock_handler->enabled = true; - rdma_handler->enabled = true; - } - for (i=0; ihandlers[i].enabled) { - continue; - } - if ((result=init_network_handler(sf_context->handlers + i, - sf_context)) != 0) - { - return result; - } - } - - inner_port = iniGetStrValue(config->ini_ctx.section_name, - "inner_port", config->ini_ctx.context); - outer_port = iniGetStrValue(config->ini_ctx.section_name, - "outer_port", config->ini_ctx.context); - if (inner_port == NULL && outer_port == NULL) { - port = iniGetIntValue(config->ini_ctx.section_name, - "port", config->ini_ctx.context, 0); - if (port > 0) { - sock_handler->inner.port = sock_handler->outer.port = port; - } - } else { - if (inner_port != NULL) { - sock_handler->inner.port = strtol(inner_port, NULL, 10); - } - if (outer_port != NULL) { - sock_handler->outer.port = strtol(outer_port, NULL, 10); - } - } - - if (sock_handler->inner.port <= 0) { - sock_handler->inner.port = config->default_inner_port; - } - if (sock_handler->outer.port <= 0) { - sock_handler->outer.port = config->default_outer_port; - } - - if (sock_handler->inner.port == sock_handler->outer.port) { - sock_handler->inner.enabled = true; - sock_handler->outer.enabled = false; - } else { - sock_handler->inner.enabled = true; - sock_handler->outer.enabled = true; - } - - rdma_handler->inner.port = sock_handler->inner.port; - rdma_handler->inner.enabled = sock_handler->inner.enabled; - rdma_handler->outer.port = sock_handler->outer.port; - rdma_handler->outer.enabled = sock_handler->outer.enabled; + SFAddressFamilyHandler *ipv4_handler; + SFAddressFamilyHandler *ipv6_handler; inner_bind_addr = iniGetStrValue(config->ini_ctx.section_name, "inner_bind_addr", config->ini_ctx.context); @@ -590,10 +545,192 @@ int sf_load_context_from_config_ex(SFContext *sf_context, inner_bind_addr = outer_bind_addr = bind_addr; } } - set_config_str_value(inner_bind_addr, sf_context->inner_bind_addr, - sizeof(sf_context->inner_bind_addr)); - set_config_str_value(outer_bind_addr, sf_context->outer_bind_addr, - sizeof(sf_context->outer_bind_addr)); + + ipv4_handler = sf_context->handlers + SF_IPV4_ADDRESS_FAMILY_INDEX; + ipv6_handler = sf_context->handlers + SF_IPV6_ADDRESS_FAMILY_INDEX; + set_bind_address(inner_bind_addr, ipv4_handler->inner_bind_addr, + ipv6_handler->inner_bind_addr, + sizeof(ipv4_handler->inner_bind_addr)); + set_bind_address(outer_bind_addr, ipv4_handler->outer_bind_addr, + ipv6_handler->outer_bind_addr, + sizeof(ipv4_handler->outer_bind_addr)); + return 0; +} + +static int load_address_family(SFContext *sf_context, + SFContextIniConfig *config) +{ + char *address_family_str; + SFAddressFamily address_family; + SFAddressFamilyHandler *ipv4_handler; + SFAddressFamilyHandler *ipv6_handler; + bool ipv4_bound; + bool ipv6_bound; + + address_family_str = iniGetStrValue(config->ini_ctx.section_name, + "address_family", config->ini_ctx.context); + if (address_family_str == NULL) { + sf_context->address_family = sf_address_family_auto; + } else if (strcasecmp(address_family_str, "auto") == 0) { + sf_context->address_family = sf_address_family_auto; + } else if (strcasecmp(address_family_str, "IPv4") == 0) { + sf_context->address_family = sf_address_family_ipv4; + } else if (strcasecmp(address_family_str, "IPv6") == 0) { + sf_context->address_family = sf_address_family_ipv6; + } else if (strcasecmp(address_family_str, "both") == 0) { + sf_context->address_family = sf_address_family_both; + } else { + logError("file: "__FILE__", line: %d, " + "config file: %s, section: %s, address_family: %s " + "is invalid!", __LINE__, config->ini_ctx.filename, + config->ini_ctx.section_name, address_family_str); + return EINVAL; + } + + ipv4_handler = sf_context->handlers + SF_IPV4_ADDRESS_FAMILY_INDEX; + ipv6_handler = sf_context->handlers + SF_IPV6_ADDRESS_FAMILY_INDEX; + if (sf_context->address_family == sf_address_family_auto) { + ipv4_bound = (*ipv4_handler->inner_bind_addr != '\0' || + *ipv4_handler->outer_bind_addr != '\0'); + ipv6_bound = (*ipv6_handler->inner_bind_addr != '\0' || + *ipv6_handler->outer_bind_addr != '\0'); + if (ipv4_bound) { + if (ipv6_bound) { + address_family = sf_address_family_both; + } else { + address_family = sf_address_family_ipv4; + } + } else { + if (ipv6_bound) { + address_family = sf_address_family_ipv6; + } else { + int ipv4_count; + int ipv6_count; + stat_local_host_ip(&ipv4_count, &ipv6_count); + if (ipv4_count > 0) { + address_family = sf_address_family_ipv4; + } else { + address_family = sf_address_family_ipv6; + } + } + } + } else { + address_family = sf_context->address_family; + } + + switch (address_family) { + case sf_address_family_ipv4: + ipv4_handler->af = AF_INET; + ipv6_handler->af = AF_UNSPEC; + break; + case sf_address_family_ipv6: + ipv4_handler->af = AF_UNSPEC; + ipv6_handler->af = AF_INET6; + break; + case sf_address_family_both: + ipv4_handler->af = AF_INET; + ipv6_handler->af = AF_INET6; + break; + default: + break; + } + + return 0; +} + +int sf_load_context_from_config_ex(SFContext *sf_context, + SFContextIniConfig *config) +{ + SFAddressFamilyHandler *fh; + SFNetworkHandler *sock_handler; + SFNetworkHandler *rdma_handler; + SFNetworkHandler *handler; + SFNetworkHandler *end; + char *inner_port_str; + char *outer_port_str; + int inner_port; + int outer_port; + int port; + int i; + int result; + + inner_port_str = iniGetStrValue(config->ini_ctx.section_name, + "inner_port", config->ini_ctx.context); + outer_port_str = iniGetStrValue(config->ini_ctx.section_name, + "outer_port", config->ini_ctx.context); + if (inner_port_str == NULL && outer_port_str == NULL) { + port = iniGetIntValue(config->ini_ctx.section_name, + "port", config->ini_ctx.context, 0); + if (port > 0) { + inner_port = outer_port = port; + } else { + inner_port = outer_port = 0; + } + } else { + if (inner_port_str != NULL) { + inner_port = strtol(inner_port_str, NULL, 10); + } else { + inner_port = 0; + } + + if (outer_port_str != NULL) { + outer_port = strtol(outer_port_str, NULL, 10); + } else { + outer_port = 0; + } + } + + if (inner_port <= 0) { + inner_port = config->default_inner_port; + } + if (outer_port <= 0) { + outer_port = config->default_outer_port; + } + + for (i=0; ihandlers + i; + fh->ctx = sf_context; + sock_handler = fh->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + rdma_handler = fh->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + sock_handler->comm_type = fc_comm_type_sock; + rdma_handler->comm_type = fc_comm_type_rdma; + if (config->comm_type == fc_comm_type_sock) { + sock_handler->enabled = true; + rdma_handler->enabled = false; + } else if (config->comm_type == fc_comm_type_rdma) { + sock_handler->enabled = false; + rdma_handler->enabled = true; + } else if (config->comm_type == fc_comm_type_both) { + sock_handler->enabled = true; + rdma_handler->enabled = true; + } + + end = fh->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=fh->handlers; handlerenabled) { + continue; + } + if ((result=init_network_handler(handler, fh)) != 0) { + return result; + } + } + + sock_handler->inner.port = inner_port; + sock_handler->outer.port = outer_port; + if (sock_handler->inner.port == sock_handler->outer.port) { + sock_handler->inner.enabled = true; + sock_handler->outer.enabled = false; + } else { + sock_handler->inner.enabled = true; + sock_handler->outer.enabled = true; + } + + rdma_handler->inner.port = sock_handler->inner.port; + rdma_handler->inner.enabled = sock_handler->inner.enabled; + rdma_handler->outer.port = sock_handler->outer.port; + rdma_handler->outer.enabled = sock_handler->outer.enabled; + + } sf_context->accept_threads = iniGetIntValue( config->ini_ctx.section_name, @@ -619,51 +756,123 @@ int sf_load_context_from_config_ex(SFContext *sf_context, return EINVAL; } + if ((result=load_bind_address(sf_context, config)) != 0) { + return result; + } + + if ((result=load_address_family(sf_context, config)) != 0) { + return result; + } + return 0; } int sf_alloc_rdma_pd(SFContext *sf_context, FCAddressPtrArray *address_array) { + SFAddressFamilyHandler *fh; SFNetworkHandler *handler; + int i; int result; - handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; - if (!handler->enabled) { - return 0; + for (i=0; ihandlers + i; + if (fh->af == AF_UNSPEC) { + continue; + } + + handler = fh->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + if (handler->enabled) { + if ((handler->pd=fc_alloc_rdma_pd(handler->alloc_pd, + address_array, &result)) == NULL) + { + return result; + } + } } - handler->pd = fc_alloc_rdma_pd(handler->alloc_pd, - address_array, &result); - return result; + return 0; +} + +static void combine_bind_addr(char *bind_addr, const char *ip_addr) +{ + char *p; + + if (*bind_addr == '\0') { + p = bind_addr; + } else { + p = bind_addr + strlen(bind_addr); + *p++ = ','; + } + + sprintf(p, "%s", ip_addr); +} + +static const char *get_address_family_caption( + const SFAddressFamily address_family) +{ + switch (address_family) { + case sf_address_family_auto: + return "auto"; + case sf_address_family_ipv4: + return "IPv4"; + case sf_address_family_ipv6: + return "IPv6"; + case sf_address_family_both: + return "both"; + default: + return "unkown"; + } } void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { + const SFAddressFamilyHandler *fh; const SFNetworkHandler *sock_handler; + char inner_bind_addr[2 * IP_ADDRESS_SIZE + 2]; + char outer_bind_addr[2 * IP_ADDRESS_SIZE + 2]; + int i; int len; - sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + *inner_bind_addr = '\0'; + *outer_bind_addr = '\0'; + sock_handler = NULL; + for (i=0; ihandlers + i; + if (fh->af == AF_UNSPEC) { + continue; + } + + if (*(fh->inner_bind_addr) != '\0') { + combine_bind_addr(inner_bind_addr, fh->inner_bind_addr); + } + if (*(fh->outer_bind_addr) != '\0') { + combine_bind_addr(outer_bind_addr, fh->outer_bind_addr); + } + + sock_handler = fh->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + } + len = 0; if ((sock_handler->inner.port == sock_handler->outer.port) && - (strcmp(sf_context->inner_bind_addr, - sf_context->outer_bind_addr) == 0)) + (strcmp(inner_bind_addr, outer_bind_addr) == 0)) { len += snprintf(output + len, size - len, "port=%u, bind_addr=%s", sock_handler->inner.port, - sf_context->inner_bind_addr); + inner_bind_addr); } else { len += snprintf(output + len, size - len, "inner_port=%u, inner_bind_addr=%s, " "outer_port=%u, outer_bind_addr=%s", - sock_handler->inner.port, sf_context->inner_bind_addr, - sock_handler->outer.port, sf_context->outer_bind_addr); + sock_handler->inner.port, inner_bind_addr, + sock_handler->outer.port, outer_bind_addr); } len += snprintf(output + len, size - len, - ", accept_threads=%d, work_threads=%d", + ", address_family=%s, accept_threads=%d, work_threads=%d", + get_address_family_caption(sf_context->address_family), sf_context->accept_threads, sf_context->work_threads); } diff --git a/src/sf_global.h b/src/sf_global.h index 00b17cd..693fb73 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -90,12 +90,24 @@ extern SFContext g_sf_context; #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size #define SF_G_UP_TIME g_sf_global_vars.up_time -#define SF_G_SOCK_HANDLER (g_sf_context.handlers + \ +#define SF_G_SOCK_HANDLER (g_sf_context.handlers \ + [SF_IPV4_ADDRESS_FAMILY_INDEX].handlers + \ SF_SOCKET_NETWORK_HANDLER_INDEX) #define SF_G_OUTER_PORT SF_G_SOCK_HANDLER->outer.port #define SF_G_INNER_PORT SF_G_SOCK_HANDLER->inner.port -#define SF_G_OUTER_BIND_ADDR g_sf_context.outer_bind_addr -#define SF_G_INNER_BIND_ADDR g_sf_context.inner_bind_addr +#define SF_G_OUTER_BIND_ADDR4 g_sf_context.handlers \ + [SF_IPV4_ADDRESS_FAMILY_INDEX].outer_bind_addr +#define SF_G_INNER_BIND_ADDR4 g_sf_context.handlers \ + [SF_IPV4_ADDRESS_FAMILY_INDEX].inner_bind_addr +#define SF_G_OUTER_BIND_ADDR6 g_sf_context.handlers \ + [SF_IPV6_ADDRESS_FAMILY_INDEX].outer_bind_addr +#define SF_G_INNER_BIND_ADDR6 g_sf_context.handlers \ + [SF_IPV6_ADDRESS_FAMILY_INDEX].inner_bind_addr + +#define SF_G_IPV4_ENABLED (g_sf_context.handlers \ + [SF_IPV4_ADDRESS_FAMILY_INDEX].af == AF_INET) +#define SF_G_IPV6_ENABLED (g_sf_context.handlers \ + [SF_IPV6_ADDRESS_FAMILY_INDEX].af == AF_INET6) #define SF_G_ACCEPT_THREADS g_sf_context.accept_threads #define SF_G_WORK_THREADS g_sf_context.work_threads @@ -115,6 +127,11 @@ extern SFContext g_sf_context; #define SF_ALIVE_THREAD_COUNT(sf_context) sf_context.thread_count #define SF_THREAD_INDEX(sf_context, tdata) (int)(tdata - sf_context.thread_data) +#define SF_IPV4_ENABLED(sf_context) (sf_context.handlers \ + [SF_IPV4_ADDRESS_FAMILY_INDEX].af == AF_INET) +#define SF_IPV6_ENABLED(sf_context) (sf_context.handlers \ + [SF_IPV6_ADDRESS_FAMILY_INDEX].af == AF_INET6) + #define SF_CHOWN_RETURN_ON_ERROR(path, current_uid, current_gid) \ do { \ if (g_sf_global_vars.run_by.inited && !(g_sf_global_vars. \ diff --git a/src/sf_nio.h b/src/sf_nio.h index 1841429..a989a2e 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -27,7 +27,7 @@ #include "sf_types.h" #include "sf_global.h" -#define SF_CTX (task->handler->ctx) +#define SF_CTX (task->handler->fh->ctx) #ifdef __cplusplus extern "C" { diff --git a/src/sf_proto.c b/src/sf_proto.c index 0350435..b2ab5cf 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -35,8 +35,8 @@ int sf_proto_set_body_length(struct fast_task_info *task) logError("file: "__FILE__", line: %d, " "%s peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT" is invalid, " "expect: "SF_PROTO_MAGIC_FORMAT", cmd: %d, body length: %d", - __LINE__, (task->handler != NULL ? task->handler->ctx->name : - ""), task->client_ip, task->port, + __LINE__, (task->handler != NULL ? task->handler->fh->ctx-> + name : ""), task->client_ip, task->port, SF_PROTO_MAGIC_PARAMS(header->magic), SF_PROTO_MAGIC_EXPECT_PARAMS, header->cmd, buff2int(header->body_len)); diff --git a/src/sf_service.c b/src/sf_service.c index 184cc68..de48e7b 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -115,6 +115,7 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, int result; int bytes; int extra_events; + int i; struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_ctx; struct nio_thread_data *thread_data; @@ -132,8 +133,10 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, send_done_callback, deal_func, task_cleanup_func, timeout_callback, release_buffer_callback); if (explicit_post_recv) { - sf_context->handlers[SF_RDMACM_NETWORK_HANDLER_INDEX]. - explicit_post_recv = true; + for (i=0; ihandlers[i].handlers[SF_RDMACM_NETWORK_HANDLER_INDEX]. + explicit_post_recv = true; + } } if ((result=sf_init_free_queue(&sf_context->free_queue, @@ -350,28 +353,8 @@ int sf_socket_create_server(SFListener *listener, { int result; - if (af == AF_UNSPEC) { - if (bind_addr == NULL || *bind_addr == '\0') { - // 如果当前服务不存在IPv4地址,但是存在IPv6地址,则自动绑定IPv6地址 - if (!checkHostHasIPv4Addr() && checkHostHasIPv6Addr()) { - listener->sock = socketServerIPv6(bind_addr, - listener->port, &result); - } else { - listener->sock = socketServer(bind_addr, - listener->port, &result); - } - } else if (is_ipv6_addr(bind_addr)) { - listener->sock = socketServerIPv6(bind_addr, - listener->port, &result); - } else { - listener->sock = socketServer(bind_addr, - listener->port, &result); - } - } else { - listener->sock = socketServer2(af, bind_addr, - listener->port, &result); - } - + listener->sock = socketServer2(af, bind_addr, + listener->port, &result); if (listener->sock < 0) { return result; } @@ -386,80 +369,90 @@ int sf_socket_create_server(SFListener *listener, int sf_socket_server_ex(SFContext *sf_context) { int result; - int af = AF_UNSPEC; + int i; bool dual_ports; const char *bind_addr; + SFAddressFamilyHandler *fh; SFNetworkHandler *handler; SFNetworkHandler *end; - end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; - for (handler=sf_context->handlers; handlerenabled) { + for (i=0; ihandlers + i; + if (fh->af == AF_UNSPEC) { continue; } - handler->inner.enabled = false; - handler->outer.enabled = false; - if (handler->outer.port == handler->inner.port) { - if (*sf_context->outer_bind_addr == '\0' || - *sf_context->inner_bind_addr == '\0') { - bind_addr = ""; - if ((result=handler->create_server(&handler-> - outer, af, bind_addr)) != 0) + end = fh->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=fh->handlers; handlerenabled) { + continue; + } + + handler->inner.enabled = false; + handler->outer.enabled = false; + if (handler->outer.port == handler->inner.port) { + if (*fh->outer_bind_addr == '\0' || + *fh->inner_bind_addr == '\0') { - return result; - } - handler->outer.enabled = true; - dual_ports = false; - } else if (strcmp(sf_context->outer_bind_addr, - sf_context->inner_bind_addr) == 0) { - bind_addr = sf_context->outer_bind_addr; - if (is_private_ip(bind_addr)) { + bind_addr = ""; if ((result=handler->create_server(&handler-> - inner, af, bind_addr)) != 0) - { - return result; - } - handler->inner.enabled = true; - } else { - if ((result=handler->create_server(&handler-> - outer, af, bind_addr)) != 0) + outer, fh->af, bind_addr)) != 0) { return result; } handler->outer.enabled = true; + dual_ports = false; + } else if (strcmp(fh->outer_bind_addr, + fh->inner_bind_addr) == 0) + { + bind_addr = fh->outer_bind_addr; + if (is_private_ip(bind_addr)) { + if ((result=handler->create_server(&handler-> + inner, fh->af, bind_addr)) != 0) + { + return result; + } + handler->inner.enabled = true; + } else { + if ((result=handler->create_server(&handler-> + outer, fh->af, bind_addr)) != 0) + { + return result; + } + handler->outer.enabled = true; + } + dual_ports = false; + } else { + dual_ports = true; } - dual_ports = false; } else { dual_ports = true; } - } else { - dual_ports = true; - } - if (dual_ports) { - if ((result=handler->create_server(&handler->outer, af, - sf_context->outer_bind_addr)) != 0) - { - return result; + if (dual_ports) { + if ((result=handler->create_server(&handler->outer, + fh->af, fh->outer_bind_addr)) != 0) + { + return result; + } + + if ((result=handler->create_server(&handler->inner, + fh->af, fh->inner_bind_addr)) != 0) + { + return result; + } + handler->inner.enabled = true; + handler->outer.enabled = true; } - if ((result=handler->create_server(&handler->inner, af, - sf_context->inner_bind_addr)) != 0) - { - return result; - } - handler->inner.enabled = true; - handler->outer.enabled = true; + /* + logInfo("%p [%d] inner {port: %d, enabled: %d}, " + "outer {port: %d, enabled: %d}", sf_context, + (int)(handler-sf_context->handlers), + handler->inner.port, handler->inner.enabled, + handler->outer.port, handler->outer.enabled); + */ } - - /* - logInfo("%p [%d] inner {port: %d, enabled: %d}, " - "outer {port: %d, enabled: %d}", sf_context, - (int)(handler-sf_context->handlers), - handler->inner.port, handler->inner.enabled, - handler->outer.port, handler->outer.enabled); - */ } return 0; @@ -518,19 +511,27 @@ void sf_socket_close_connection(struct fast_task_info *task) void sf_socket_close_ex(SFContext *sf_context) { + int i; SFNetworkHandler *handler; SFNetworkHandler *end; - end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; - for (handler=sf_context->handlers; handlerenabled) { + for (i=0; ihandlers[i].af == AF_UNSPEC) { continue; } - if (handler->outer.enabled) { - handler->close_server(&handler->outer); - } - if (handler->inner.enabled) { - handler->close_server(&handler->inner); + + end = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers[i].handlers; handlerenabled) { + continue; + } + + if (handler->outer.enabled) { + handler->close_server(&handler->outer); + } + if (handler->inner.enabled) { + handler->close_server(&handler->inner); + } } } } @@ -544,10 +545,10 @@ static void accept_run(SFListener *listener) continue; } - task->thread_data = listener->handler->ctx->thread_data + - task->event.fd % listener->handler->ctx->work_threads; - if (listener->handler->ctx->callbacks.accept_done != NULL) { - if (listener->handler->ctx->callbacks.accept_done(task, + task->thread_data = listener->handler->fh->ctx->thread_data + + task->event.fd % listener->handler->fh->ctx->work_threads; + if (listener->handler->fh->ctx->callbacks.accept_done != NULL) { + if (listener->handler->fh->ctx->callbacks.accept_done(task, listener->inaddr.sin_addr.s_addr, listener->is_inner) != 0) { @@ -571,7 +572,7 @@ static void *accept_thread_entrance(SFListener *listener) char thread_name[32]; snprintf(thread_name, sizeof(thread_name), "%s-%s-listen", listener->handler->comm_type == fc_comm_type_sock ? - "sock" : "rdma", listener->handler->ctx->name); + "sock" : "rdma", listener->handler->fh->ctx->name); prctl(PR_SET_NAME, thread_name); } #endif @@ -618,25 +619,35 @@ int _accept_loop(SFListener *listener, const int accept_threads) int sf_accept_loop_ex(SFContext *sf_context, const bool blocked) { + int i; SFNetworkHandler *handler; SFNetworkHandler *hend; - SFListener *listeners[SF_NETWORK_HANDLER_COUNT * 2]; + SFListener *listeners[SF_ADDRESS_FAMILY_COUNT * + SF_NETWORK_HANDLER_COUNT * 2]; SFListener **listener; SFListener **last; SFListener **lend; listener = listeners; - hend = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; - for (handler=sf_context->handlers; handlerenabled) { + for (i=0; ihandlers[i].af == AF_UNSPEC) { continue; } - if (handler->inner.enabled) { - *listener++ = &handler->inner; - } - if (handler->outer.enabled) { - *listener++ = &handler->outer; + hend = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers[i].handlers; + handlerenabled) { + continue; + } + + if (handler->inner.enabled) { + *listener++ = &handler->inner; + } + if (handler->outer.enabled) { + *listener++ = &handler->outer; + } } } @@ -862,59 +873,3 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler) { sig_quit_handler = quit_handler; } - -// 判断当前服务器是否存在IPv4地址 -bool checkHostHasIPv4Addr() -{ - struct ifaddrs *ifaddr, *ifa; - bool hasIPv4; - - if (getifaddrs(&ifaddr) == -1) { - return false; - } - - hasIPv4 = false; - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { - if (ifa->ifa_addr == NULL) { - continue; - } - if (strcmp(ifa->ifa_name, "lo") == 0) { // 排除lo接口 - continue; - } - if (ifa->ifa_addr->sa_family == AF_INET) { - hasIPv4 = true; - break; - } - } - - freeifaddrs(ifaddr); - return hasIPv4; -} - -// 判断当前服务器是否存在IPv6地址 -bool checkHostHasIPv6Addr() -{ - struct ifaddrs *ifaddr, *ifa; - bool hasIPv6; - - if (getifaddrs(&ifaddr) == -1) { - return false; - } - - hasIPv6 = false; - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { - if (ifa->ifa_addr == NULL) { - continue; - } - if (strcmp(ifa->ifa_name, "lo") == 0) { // 排除lo接口 - continue; - } - if (ifa->ifa_addr->sa_family == AF_INET6) { - hasIPv6 = true; - break; - } - } - - freeifaddrs(ifaddr); - return hasIPv6; -} diff --git a/src/sf_service.h b/src/sf_service.h index dd2b428..1e885f3 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -156,7 +156,7 @@ static inline struct fast_task_info *sf_alloc_init_task_ex( { struct fast_task_info *task; - task = free_queue_pop(&handler->ctx->free_queue); + task = free_queue_pop(&handler->fh->ctx->free_queue); if (task == NULL) { logError("file: "__FILE__", line: %d, " "malloc task buff failed, you should " @@ -190,22 +190,23 @@ static inline void sf_release_task(struct fast_task_info *task) } } -// 判断当前服务器是否存在IPv4地址 -bool checkHostHasIPv4Addr(); - -// 判断当前服务器是否存在IPv6地址 -bool checkHostHasIPv6Addr(); - static inline SFNetworkHandler *sf_get_first_network_handler_ex( SFContext *sf_context) { + int i; SFNetworkHandler *handler; SFNetworkHandler *end; - end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; - for (handler=sf_context->handlers; handlerenabled) { - return handler; + for (i=0; ihandlers[i].af == AF_UNSPEC) { + continue; + } + + end = sf_context->handlers[i].handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers[i].handlers; handlerenabled) { + return handler; + } } } @@ -218,10 +219,20 @@ static inline SFNetworkHandler *sf_get_first_network_handler_ex( static inline SFNetworkHandler *sf_get_rdma_network_handler( SFContext *sf_context) { + int i; SFNetworkHandler *handler; - handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; - return (handler->enabled ? handler : NULL); + for (i=0; ihandlers[i].af != AF_UNSPEC) { + handler = sf_context->handlers[i].handlers + + SF_RDMACM_NETWORK_HANDLER_INDEX; + if (handler->enabled) { + return handler; + } + } + } + + return NULL; } static inline SFNetworkHandler *sf_get_rdma_network_handler2( diff --git a/src/sf_types.h b/src/sf_types.h index 32f427a..d27d9a5 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -34,6 +34,10 @@ #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency +#define SF_ADDRESS_FAMILY_COUNT 2 +#define SF_IPV4_ADDRESS_FAMILY_INDEX 0 +#define SF_IPV6_ADDRESS_FAMILY_INDEX 1 + #define SF_NETWORK_HANDLER_COUNT 2 #define SF_SOCKET_NETWORK_HANDLER_INDEX 0 #define SF_RDMACM_NETWORK_HANDLER_INDEX 1 @@ -61,6 +65,13 @@ typedef enum { sf_comm_action_finish = 'f' } SFCommAction; +typedef enum { + sf_address_family_auto = 0, + sf_address_family_ipv4 = 1, + sf_address_family_ipv6 = 2, + sf_address_family_both = 3 +} SFAddressFamily; + struct ibv_pd; struct sf_listener; @@ -98,11 +109,12 @@ typedef struct sf_listener { } SFListener; struct sf_context; +struct sf_address_family_handler; typedef struct sf_network_handler { bool enabled; bool explicit_post_recv; FCCommunicationType comm_type; - struct sf_context *ctx; + struct sf_address_family_handler *fh; struct ibv_pd *pd; SFListener inner; @@ -140,20 +152,26 @@ typedef struct sf_nio_callbacks { sf_release_buffer_callback release_buffer; } SFNIOCallbacks; +typedef struct sf_address_family_handler { + int af; //AF_UNSPEC for disabled + SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT]; + char inner_bind_addr[IP_ADDRESS_SIZE]; + char outer_bind_addr[IP_ADDRESS_SIZE]; + struct sf_context *ctx; +} SFAddressFamilyHandler; + typedef struct sf_context { char name[64]; struct nio_thread_data *thread_data; volatile int thread_count; //int rdma_port_offset; - SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT]; + SFAddressFamily address_family; + SFAddressFamilyHandler handlers[SF_ADDRESS_FAMILY_COUNT]; int accept_threads; int work_threads; - char inner_bind_addr[IP_ADDRESS_SIZE]; - char outer_bind_addr[IP_ADDRESS_SIZE]; - int header_size; bool remove_from_ready_list; bool realloc_task_buffer;