diff --git a/src/sf_global.c b/src/sf_global.c index ae251a5..7e33872 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -43,10 +43,10 @@ SFGlobalVariables g_sf_global_vars = { {0, 0}, NULL, {NULL, 0} }; -SFContext g_sf_context = { - {'\0'}, NULL, 0, -1, -1, 0, 0, 1, DEFAULT_WORK_THREADS, - {'\0'}, {'\0'}, 0, true, true, NULL, NULL, NULL, NULL, - NULL, sf_task_finish_clean_up, NULL +SFContext g_sf_context = {{'\0'}, NULL, 0, + {{false, sf_network_type_sock}, {false, sf_network_type_rdma}}, + 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, NULL, + NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL }; static inline void set_config_str_value(const char *value, @@ -411,17 +411,50 @@ int sf_load_config_ex(const char *server_name, SFContextIniConfig *config, return sf_load_context_from_config_ex(&g_sf_context, config); } +static int init_network_handler(SFNetworkHandler *handler, + SFContext *sf_context) +{ + handler->ctx = sf_context; + handler->inner.handler = handler; + handler->outer.handler = handler; + handler->inner.is_inner = true; + handler->outer.is_inner = false; + + if (handler->type == sf_network_type_sock) { + handler->inner.sock = -1; + handler->outer.sock = -1; + } else { + } + + return 0; +} + int sf_load_context_from_config_ex(SFContext *sf_context, SFContextIniConfig *config) { + SFNetworkHandler *sock_handler; + SFNetworkHandler *rdma_handler; char *inner_port; char *outer_port; char *inner_bind_addr; char *outer_bind_addr; char *bind_addr; int port; + int i; + int result; - sf_context->inner_port = sf_context->outer_port = 0; + memset(sf_context->handlers, 0, sizeof(sf_context->handlers)); + sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; + rdma_handler = sf_context->handlers + SF_RDMACM_NETWORK_HANDLER_INDEX; + sock_handler->type = sf_network_type_sock; + rdma_handler->type = sf_network_type_rdma; + for (i=0; ihandlers + i, + sf_context)) != 0) + { + return result; + } + } inner_port = iniGetStrValue(config->ini_ctx.section_name, "inner_port", config->ini_ctx.context); @@ -431,22 +464,30 @@ int sf_load_context_from_config_ex(SFContext *sf_context, port = iniGetIntValue(config->ini_ctx.section_name, "port", config->ini_ctx.context, 0); if (port > 0) { - sf_context->inner_port = sf_context->outer_port = port; + sock_handler->inner.port = sock_handler->outer.port = port; } } else { if (inner_port != NULL) { - sf_context->inner_port = atoi(inner_port); + sock_handler->inner.port = strtol(inner_port, NULL, 10); } if (outer_port != NULL) { - sf_context->outer_port = atoi(outer_port); + sock_handler->outer.port = strtol(outer_port, NULL, 10); } } - if (sf_context->inner_port <= 0) { - sf_context->inner_port = config->default_inner_port; + if (sock_handler->inner.port <= 0) { + sock_handler->inner.port = config->default_inner_port; } - if (sf_context->outer_port <= 0) { - sf_context->outer_port = config->default_outer_port; + if (sock_handler->outer.port <= 0) { + sock_handler->outer.port = config->default_outer_port; + } + + if (sock_handler->inner.port == sock_handler->outer.port) { + sock_handler->inner.enabled = false; + sock_handler->outer.enabled = true; + } else { + sock_handler->inner.enabled = true; + sock_handler->outer.enabled = true; } inner_bind_addr = iniGetStrValue(config->ini_ctx.section_name, @@ -495,23 +536,25 @@ int sf_load_context_from_config_ex(SFContext *sf_context, void sf_context_config_to_string(const SFContext *sf_context, char *output, const int size) { + const SFNetworkHandler *sock_handler; int len; + sock_handler = sf_context->handlers + SF_SOCKET_NETWORK_HANDLER_INDEX; len = 0; - if ((sf_context->inner_port == sf_context->outer_port) && + if ((sock_handler->inner.port == sock_handler->outer.port) && (strcmp(sf_context->inner_bind_addr, sf_context->outer_bind_addr) == 0)) { len += snprintf(output + len, size - len, "port=%u, bind_addr=%s", - sf_context->inner_port, + sock_handler->inner.port, sf_context->inner_bind_addr); } else { len += snprintf(output + len, size - len, "inner_port=%u, inner_bind_addr=%s, " "outer_port=%u, outer_bind_addr=%s", - sf_context->inner_port, sf_context->inner_bind_addr, - sf_context->outer_port, sf_context->outer_bind_addr); + sock_handler->inner.port, sf_context->inner_bind_addr, + sock_handler->outer.port, sf_context->outer_bind_addr); } len += snprintf(output + len, size - len, diff --git a/src/sf_service.c b/src/sf_service.c index e844ac1..0c34faf 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -57,12 +57,6 @@ struct worker_thread_context { struct nio_thread_data *thread_data; }; -struct accept_thread_context { - SFContext *sf_context; - int server_sock; -}; - - int sf_init_task(struct fast_task_info *task) { task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side @@ -345,15 +339,16 @@ static void *worker_thread_entrance(void *arg) return NULL; } -static int _socket_server(const char *bind_addr, int port, int *sock) +int sf_create_socket_server(SFListener *listener, const char *bind_addr) { int result; - *sock = socketServer(bind_addr, port, &result); - if (*sock < 0) { + + listener->sock = socketServer(bind_addr, listener->port, &result); + if (listener->sock < 0) { return result; } - if ((result=tcpsetserveropt(*sock, SF_G_NETWORK_TIMEOUT)) != 0) { + if ((result=tcpsetserveropt(listener->sock, SF_G_NETWORK_TIMEOUT)) != 0) { return result; } @@ -363,57 +358,104 @@ static int _socket_server(const char *bind_addr, int port, int *sock) int sf_socket_server_ex(SFContext *sf_context) { int result; + bool dual_ports; const char *bind_addr; + SFNetworkHandler *handler; + SFNetworkHandler *end; - sf_context->inner_sock = sf_context->outer_sock = -1; - if (sf_context->outer_port == sf_context->inner_port) { - if (*sf_context->outer_bind_addr == '\0' || - *sf_context->inner_bind_addr == '\0') { - bind_addr = ""; - return _socket_server(bind_addr, sf_context->outer_port, - &sf_context->outer_sock); - } else if (strcmp(sf_context->outer_bind_addr, - sf_context->inner_bind_addr) == 0) { - bind_addr = sf_context->outer_bind_addr; - if (is_private_ip(bind_addr)) { - return _socket_server(bind_addr, sf_context-> - inner_port, &sf_context->inner_sock); - } else { - return _socket_server(bind_addr, sf_context-> - outer_port, &sf_context->outer_sock); - } + end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + continue; } - } - if ((result=_socket_server(sf_context->outer_bind_addr, - sf_context->outer_port, &sf_context->outer_sock)) != 0) - { - return result; - } + handler->inner.enabled = false; + handler->outer.enabled = false; + if (handler->outer.port == handler->inner.port) { + if (*sf_context->outer_bind_addr == '\0' || + *sf_context->inner_bind_addr == '\0') { + bind_addr = ""; + if ((result=handler->create_server(&handler-> + outer, bind_addr)) != 0) + { + return result; + } + handler->outer.enabled = true; + dual_ports = false; + } else if (strcmp(sf_context->outer_bind_addr, + sf_context->inner_bind_addr) == 0) { + bind_addr = sf_context->outer_bind_addr; + if (is_private_ip(bind_addr)) { + if ((result=handler->create_server(&handler-> + inner, bind_addr)) != 0) + { + return result; + } + handler->inner.enabled = true; + } else { + if ((result=handler->create_server(&handler-> + outer, bind_addr)) != 0) + { + return result; + } + handler->outer.enabled = true; + } + dual_ports = false; + } else { + dual_ports = true; + } + } else { + dual_ports = true; + } - if ((result=_socket_server(sf_context->inner_bind_addr, - sf_context->inner_port, &sf_context->inner_sock)) != 0) - { - return result; + if (dual_ports) { + if ((result=handler->create_server(&handler->outer, + sf_context->outer_bind_addr)) != 0) + { + return result; + } + + if ((result=handler->create_server(&handler->inner, + sf_context->inner_bind_addr)) != 0) + { + return result; + } + handler->inner.enabled = true; + handler->outer.enabled = true; + } } return 0; } -void sf_socket_close_ex(SFContext *sf_context) +void sf_close_socket_server(SFListener *listener) { - if (sf_context->inner_sock >= 0) { - close(sf_context->inner_sock); - sf_context->inner_sock = -1; - } - - if (sf_context->outer_sock >= 0) { - close(sf_context->outer_sock); - sf_context->outer_sock = -1; + if (listener->sock >= 0) { + close(listener->sock); + listener->sock = -1; } } -static void accept_run(struct accept_thread_context *accept_context) +void sf_socket_close_ex(SFContext *sf_context) +{ + SFNetworkHandler *handler; + SFNetworkHandler *end; + + end = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + continue; + } + if (handler->outer.enabled) { + handler->close_server(&handler->outer); + } + if (handler->inner.enabled) { + handler->close_server(&handler->inner); + } + } +} + +static void accept_run(SFListener *listener) { int incomesock; int port; @@ -423,7 +465,7 @@ static void accept_run(struct accept_thread_context *accept_context) while (g_sf_global_vars.continue_flag) { sockaddr_len = sizeof(inaddr); - incomesock = accept(accept_context->server_sock, + incomesock = accept(listener->sock, (struct sockaddr*)&inaddr, &sockaddr_len); if (incomesock < 0) { //error if (!(errno == EINTR || errno == EAGAIN)) { @@ -441,8 +483,8 @@ static void accept_run(struct accept_thread_context *accept_context) } FC_SET_CLOEXEC(incomesock); - if ((task=sf_alloc_init_task(accept_context-> - sf_context, incomesock)) == NULL) + if ((task=sf_alloc_init_task(listener->handler->ctx, + incomesock)) == NULL) { close(incomesock); continue; @@ -451,13 +493,12 @@ static void accept_run(struct accept_thread_context *accept_context) getPeerIpAddPort(incomesock, task->client_ip, sizeof(task->client_ip), &port); task->port = port; - task->thread_data = accept_context->sf_context->thread_data + - incomesock % accept_context->sf_context->work_threads; - if (accept_context->sf_context->accept_done_func != NULL) { - if (accept_context->sf_context->accept_done_func(task, - inaddr.sin_addr.s_addr, - accept_context->server_sock == - accept_context->sf_context->inner_sock) != 0) + task->thread_data = listener->handler->ctx->thread_data + + incomesock % listener->handler->ctx->work_threads; + if (listener->handler->ctx->accept_done_func != NULL) { + if (listener->handler->ctx->accept_done_func( + task, inaddr.sin_addr.s_addr, + listener->is_inner) != 0) { close(incomesock); sf_release_task(task); @@ -472,24 +513,23 @@ static void accept_run(struct accept_thread_context *accept_context) } } -static void *accept_thread_entrance(struct accept_thread_context - *accept_context) +static void *accept_thread_entrance(SFListener *listener) { #ifdef OS_LINUX { char thread_name[32]; - snprintf(thread_name, sizeof(thread_name), "%s-listen", - accept_context->sf_context->name); + snprintf(thread_name, sizeof(thread_name), "%s-%s-listen", + listener->handler->type == sf_network_type_sock ? + "sock" : "rdma", listener->handler->ctx->name); prctl(PR_SET_NAME, thread_name); } #endif - accept_run(accept_context); + accept_run(listener); return NULL; } -void _accept_loop(struct accept_thread_context *accept_context, - const int accept_threads) +int _accept_loop(SFListener *listener, const int accept_threads) { pthread_t tid; pthread_attr_t thread_attr; @@ -497,7 +537,7 @@ void _accept_loop(struct accept_thread_context *accept_context, int i; if (accept_threads <= 0) { - return; + return 0; } if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars. @@ -505,68 +545,73 @@ void _accept_loop(struct accept_thread_context *accept_context, { logWarning("file: "__FILE__", line: %d, " "init_pthread_attr fail!", __LINE__); + return result; } - else { - for (i=0; iouter_sock >= 0) { - count = 2; + listener = listeners; + hend = sf_context->handlers + SF_NETWORK_HANDLER_COUNT; + for (handler=sf_context->handlers; handlerenabled) { + continue; + } + + if (handler->inner.enabled) { + *listener++ = &handler->inner; + } + if (handler->outer.enabled) { + *listener++ = &handler->outer; + } + } + + if (listener == listeners) { + logError("file: "__FILE__", line: %d, " + "no listener!", __LINE__); + return ENOENT; + } + + last = listener - 1; + if (blocked) { + lend = listener - 1; } else { - count = 1; + lend = listener; } - bytes = sizeof(struct accept_thread_context) * count; - accept_contexts = (struct accept_thread_context *)fc_malloc(bytes); - if (accept_contexts == NULL) { - return; + for (listener=listeners; listeneraccept_threads); } - accept_contexts[0].sf_context = sf_context; - accept_contexts[0].server_sock = sf_context->inner_sock; - - if (sf_context->outer_sock >= 0) { - accept_contexts[1].sf_context = sf_context; - accept_contexts[1].server_sock = sf_context->outer_sock; - - if (sf_context->inner_sock >= 0) { - _accept_loop(accept_contexts, sf_context->accept_threads); - } - - if (block) { - _accept_loop(accept_contexts + 1, sf_context->accept_threads - 1); - accept_run(accept_contexts + 1); - } else { - _accept_loop(accept_contexts + 1, sf_context->accept_threads); - } - } else { - if (block) { - _accept_loop(accept_contexts, sf_context->accept_threads - 1); - accept_run(accept_contexts); - } else { - _accept_loop(accept_contexts, sf_context->accept_threads); - } + if (blocked) { + _accept_loop(*last, sf_context->accept_threads - 1); + accept_run(*last); } + + return 0; } #if defined(DEBUG_FLAG) diff --git a/src/sf_service.h b/src/sf_service.h index 8008485..b793963 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -83,13 +83,16 @@ int sf_add_slow_log_schedule(SFSlowLogContext *slowlog_ctx); void sf_set_current_time(); +int sf_create_socket_server(SFListener *listener, const char *bind_addr); +void sf_close_socket_server(SFListener *listener); + int sf_socket_server_ex(SFContext *sf_context); #define sf_socket_server() sf_socket_server_ex(&g_sf_context) void sf_socket_close_ex(SFContext *sf_context); #define sf_socket_close() sf_socket_close_ex(&g_sf_context) -void sf_accept_loop_ex(SFContext *sf_context, const bool block); +int sf_accept_loop_ex(SFContext *sf_context, const bool blocked); #define sf_accept_loop() sf_accept_loop_ex(&g_sf_context, true) diff --git a/src/sf_types.h b/src/sf_types.h index 30f8b76..1f96905 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -34,6 +34,10 @@ #define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency #define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency +#define SF_NETWORK_HANDLER_COUNT 2 +#define SF_SOCKET_NETWORK_HANDLER_INDEX 0 +#define SF_RDMACM_NETWORK_HANDLER_INDEX 1 + typedef int (*sf_accept_done_callback)(struct fast_task_info *task, const in_addr_t client_addr, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); @@ -49,15 +53,71 @@ typedef void (*sf_release_buffer_callback)(struct fast_task_info *task); typedef int (*sf_error_handler_callback)(const int errnum); +struct sf_listener; +typedef int (*sf_create_server_callback)(struct sf_listener *listener, + const char *bind_addr); +typedef void (*sf_close_server_callback)(struct sf_listener *listener); +typedef struct fast_task_info * (*sf_accept_connection_callback)( + struct sf_listener *listener); +typedef int (*sf_async_connect_server_callback)(struct fast_task_info *task); +typedef int (*sf_connect_server_done_callback)(struct fast_task_info *task); +typedef void (*sf_close_connection_callback)(struct fast_task_info *task); + +typedef int (*sf_send_data_callback)(struct fast_task_info *task, + bool *send_done); +typedef int (*sf_recv_data_callback)(struct fast_task_info *task, + bool *recv_done); + +typedef enum { + sf_network_type_sock = 's', + sf_network_type_rdma = 'r' +} SFNetworkType; + +struct sf_network_handler; +typedef struct sf_listener { + struct sf_network_handler *handler; + int port; + bool enabled; + bool is_inner; + union { + int sock; //for socket + void *id; //for rdma_cm + }; +} SFListener; + +struct sf_context; +typedef struct sf_network_handler { + bool enabled; + SFNetworkType type; + struct sf_context *ctx; + + SFListener inner; + SFListener outer; + + /* for server side */ + sf_create_server_callback create_server; + sf_close_server_callback close_server; + sf_accept_connection_callback accept_connection; + + /* for client side */ + sf_async_connect_server_callback async_connect_server; + sf_connect_server_done_callback connect_server_done; + + /* server and client both */ + sf_close_connection_callback close_connection; + + sf_send_data_callback send_data; + sf_recv_data_callback recv_data; +} SFNetworkHandler; + typedef struct sf_context { char name[64]; struct nio_thread_data *thread_data; volatile int thread_count; - int outer_sock; - int inner_sock; - int outer_port; - int inner_port; + //int rdma_port_offset; + SFNetworkHandler handlers[SF_NETWORK_HANDLER_COUNT]; + int accept_threads; int work_threads;