From de6def01e4afdd832618f6db102aa537ac764c75 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Fri, 6 Mar 2020 22:04:34 +0800 Subject: [PATCH] support multi instance --- src/sf_define.h | 3 +- src/sf_global.c | 143 ++++++++++++------------- src/sf_global.h | 25 ++--- src/sf_nio.c | 100 ++++++++--------- src/sf_nio.h | 48 +++++++-- src/sf_service.c | 273 ++++++++++++++++++++++++++++++----------------- src/sf_service.h | 26 ++++- src/sf_types.h | 25 ++++- 8 files changed, 385 insertions(+), 258 deletions(-) diff --git a/src/sf_define.h b/src/sf_define.h index 5539b50..84d6dfb 100644 --- a/src/sf_define.h +++ b/src/sf_define.h @@ -14,7 +14,8 @@ #define SF_NIO_STAGE_RECV 1 //recv #define SF_NIO_STAGE_SEND 2 //send #define SF_NIO_STAGE_FORWARDED 3 //deal the forwarded request -#define SF_NIO_STAGE_CLOSE 4 //cleanup the task +#define SF_NIO_STAGE_CONTINUE 4 //notify the thread continue deal +#define SF_NIO_STAGE_CLOSE 9 //cleanup the task #ifdef __cplusplus extern "C" { diff --git a/src/sf_global.c b/src/sf_global.c index bb6c1cf..69a6c86 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -15,56 +15,40 @@ #include "fastcommon/shared_func.h" #include "fastcommon/logger.h" #include "sf_define.h" +#include "sf_nio.h" #include "sf_global.h" SFGlobalVariables g_sf_global_vars = { DEFAULT_CONNECT_TIMEOUT, DEFAULT_NETWORK_TIMEOUT, - {'/', 't', 'm', 'p', '\0'}, NULL, true, - 0, 0, DEFAULT_MAX_CONNECTONS, 1, - DEFAULT_WORK_THREADS, SF_DEF_THREAD_STACK_SIZE, + {'/', 't', 'm', 'p', '\0'}, true, + SF_DEF_THREAD_STACK_SIZE, DEFAULT_MAX_CONNECTONS, SF_DEF_MAX_PACKAGE_SIZE, SF_DEF_MIN_BUFF_SIZE, SF_DEF_MAX_BUFF_SIZE, - SYNC_LOG_BUFF_DEF_INTERVAL, 0, 0, 0, - {'\0'}, {'\0'}, false, 0, - {'\0'}, {'\0'}, {0, 0} + SYNC_LOG_BUFF_DEF_INTERVAL, 0, 0, 0, {'\0'}, {'\0'}, false, 0, {0, 0} }; -static int sf_get_config_int_value(IniContext *pIniContext, - const char *item_prefix_name, const char *item_affix_name, - const int default_value) -{ - int value; - char item_name[FAST_INI_ITEM_NAME_SIZE]; - - snprintf(item_name, sizeof(item_name), "%s_%s", - item_prefix_name, item_affix_name); - value = iniGetIntValue(NULL, item_name, pIniContext, default_value); - if (value <= 0) { - value = default_value; - } - return value; -} +SFContext g_sf_context = { + NULL, 0, -1, -1, 0, 0, 1, DEFAULT_WORK_THREADS, + {'\0'}, {'\0'}, 0, true, NULL, NULL, sf_task_finish_clean_up, + NULL +}; static void sf_get_config_str_value(IniContext *pIniContext, - const char *item_prefix_name, const char *item_affix_name, + const char *section_name, const char *item_name, char *dest, const int dest_size) { - char item_name[FAST_INI_ITEM_NAME_SIZE]; char *value; - snprintf(item_name, sizeof(item_name), "%s_%s", - item_prefix_name, item_affix_name); - value = iniGetStrValue(NULL, item_name, pIniContext); + value = iniGetStrValue(section_name, item_name, pIniContext); if (value == NULL) { *dest = '\0'; - } - else { + } else { snprintf(dest, dest_size, "%s", value); } } int sf_load_config_ex(const char *server_name, const char *filename, - IniContext *pIniContext, const SFCustomConfig *inner_cfg, - const SFCustomConfig *outer_cfg) + IniContext *pIniContext, const char *section_name, + const int default_inner_port, const int default_outer_port) { char *pBasePath; char *pRunByGroup; @@ -115,44 +99,15 @@ int sf_load_config_ex(const char *server_name, const char *filename, g_sf_global_vars.network_timeout = DEFAULT_NETWORK_TIMEOUT; } - g_sf_global_vars.inner_port = sf_get_config_int_value(pIniContext, - inner_cfg->item_prefix_name, "port", inner_cfg->default_port); - g_sf_global_vars.outer_port = sf_get_config_int_value(pIniContext, - outer_cfg->item_prefix_name, "port", outer_cfg->default_port); - - sf_get_config_str_value(pIniContext, inner_cfg->item_prefix_name, - "bind_addr", g_sf_global_vars.inner_bind_addr, - sizeof(g_sf_global_vars.inner_bind_addr)); - - sf_get_config_str_value(pIniContext, outer_cfg->item_prefix_name, - "bind_addr", g_sf_global_vars.outer_bind_addr, - sizeof(g_sf_global_vars.outer_bind_addr)); - g_sf_global_vars.max_connections = iniGetIntValue(NULL, "max_connections", pIniContext, DEFAULT_MAX_CONNECTONS); if (g_sf_global_vars.max_connections <= 0) { g_sf_global_vars.max_connections = DEFAULT_MAX_CONNECTONS; } - g_sf_global_vars.accept_threads = iniGetIntValue(NULL, "accept_threads", - pIniContext, 1); - if (g_sf_global_vars.accept_threads <= 0) { - logError("file: "__FILE__", line: %d, " - "item \"accept_threads\" is invalid, " - "value: %d <= 0!", __LINE__, g_sf_global_vars.accept_threads); - return EINVAL; - } - - g_sf_global_vars.work_threads = iniGetIntValue(NULL, "work_threads", - pIniContext, DEFAULT_WORK_THREADS); - if (g_sf_global_vars.work_threads <= 0) { - logError("file: "__FILE__", line: %d, " - "item \"work_threads\" is invalid, " - "value: %d <= 0!", __LINE__, g_sf_global_vars.work_threads); - return EINVAL; - } - - if ((result=set_rlimit(RLIMIT_NOFILE, g_sf_global_vars.max_connections)) != 0) { + if ((result=set_rlimit(RLIMIT_NOFILE, g_sf_global_vars. + max_connections)) != 0) + { return result; } @@ -291,20 +246,58 @@ int sf_load_config_ex(const char *server_name, const char *filename, return result; } - return 0; + return sf_load_context_from_config(&g_sf_context, filename, pIniContext, + section_name, default_inner_port, default_outer_port); } int sf_load_config(const char *server_name, const char *filename, IniContext *pIniContext, const int default_inner_port, const int default_outer_port) { - SFCustomConfig inner_cfg; - SFCustomConfig outer_cfg; + return sf_load_config_ex(server_name, filename, pIniContext, "", + default_inner_port, default_outer_port); +} - SF_SET_CUSTOM_CONFIG(inner_cfg, "inner", default_inner_port); - SF_SET_CUSTOM_CONFIG(outer_cfg, "outer", default_outer_port); - return sf_load_config_ex(server_name, filename, pIniContext, - &inner_cfg, &outer_cfg); +int sf_load_context_from_config(SFContext *sf_context, + const char *filename, IniContext *pIniContext, + const char *section_name, const int default_inner_port, + const int default_outer_port) +{ + sf_context->inner_port = iniGetIntValue(section_name, + "inner_port", pIniContext, default_inner_port); + sf_context->outer_port = iniGetIntValue(section_name, + "outer_port", pIniContext, default_outer_port); + + sf_get_config_str_value(pIniContext, section_name, + "inner_bind_addr", sf_context->inner_bind_addr, + sizeof(sf_context->inner_bind_addr)); + sf_get_config_str_value(pIniContext, section_name, + "outer_bind_addr", sf_context->outer_bind_addr, + sizeof(sf_context->outer_bind_addr)); + + sf_context->accept_threads = iniGetIntValue(section_name, + "accept_threads", pIniContext, 1); + if (sf_context->accept_threads <= 0) { + logError("file: "__FILE__", line: %d, " + "config file: %s, section: %s, " + "item \"accept_threads\" is invalid, " + "value: %d <= 0!", __LINE__, filename, + section_name, sf_context->accept_threads); + return EINVAL; + } + + sf_context->work_threads = iniGetIntValue(section_name, + "work_threads", pIniContext, DEFAULT_WORK_THREADS); + if (sf_context->work_threads <= 0) { + logError("file: "__FILE__", line: %d, " + "config file: %s, section: %s, " + "item \"work_threads\" is invalid, " + "value: %d <= 0!", __LINE__, filename, + section_name, sf_context->work_threads); + return EINVAL; + } + + return 0; } void sf_log_config_ex(const char *other_config) @@ -322,13 +315,13 @@ void sf_log_config_ex(const char *other_config) "log_level=%s, sync_log_buff_interval=%d, rotate_error_log=%d, " "log_file_keep_days=%d, run_by_group=%s, run_by_user=%s%s%s", g_sf_global_vars.base_path, - g_sf_global_vars.inner_port, - g_sf_global_vars.inner_bind_addr, - g_sf_global_vars.outer_port, - g_sf_global_vars.outer_bind_addr, + g_sf_context.inner_port, + g_sf_context.inner_bind_addr, + g_sf_context.outer_port, + g_sf_context.outer_bind_addr, g_sf_global_vars.max_connections, - g_sf_global_vars.accept_threads, - g_sf_global_vars.work_threads, + g_sf_context.accept_threads, + g_sf_context.work_threads, g_sf_global_vars.connect_timeout, g_sf_global_vars.network_timeout, int_to_comma_str(g_sf_global_vars.thread_stack_size, sz_thread_stack_size), diff --git a/src/sf_global.h b/src/sf_global.h index 5d43ad4..4f3fdd9 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -7,6 +7,7 @@ #include "fastcommon/ini_file_reader.h" #include "fastcommon/ioevent.h" #include "sf_define.h" +#include "sf_types.h" typedef struct sf_connection_stat { volatile int current_count; @@ -23,22 +24,15 @@ typedef struct sf_global_variables { int network_timeout; char base_path[MAX_PATH_SIZE]; - struct nio_thread_data *thread_data; - volatile bool continue_flag; - int outer_port; - int inner_port; int max_connections; - int accept_threads; - int work_threads; - int thread_stack_size; int max_pkg_size; int min_buff_size; int max_buff_size; + int thread_stack_size; int sync_log_buff_interval; //sync log buff to disk every interval seconds time_t up_time; - gid_t run_by_gid; uid_t run_by_uid; char run_by_group[32]; @@ -47,9 +41,6 @@ typedef struct sf_global_variables { bool rotate_error_log; int log_file_keep_days; - char inner_bind_addr[IP_ADDRESS_SIZE]; - char outer_bind_addr[IP_ADDRESS_SIZE]; - SFConnectionStat connection_stat; } SFGlobalVariables; @@ -58,13 +49,14 @@ extern "C" { #endif extern SFGlobalVariables g_sf_global_vars; +extern SFContext g_sf_context; #define SF_G_BASE_PATH g_sf_global_vars.base_path #define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag #define SF_G_CONNECT_TIMEOUT g_sf_global_vars.connect_timeout #define SF_G_NETWORK_TIMEOUT g_sf_global_vars.network_timeout -#define SF_G_WORK_THREADS g_sf_global_vars.work_threads #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size +#define SF_G_WORK_THREADS g_sf_context.work_threads #define SF_SET_CUSTOM_CONFIG(cfg, prefix_name, port) \ do { \ @@ -94,8 +86,13 @@ int sf_load_config(const char *server_name, const char *filename, const int default_outer_port); int sf_load_config_ex(const char *server_name, const char *filename, - IniContext *pIniContext, const SFCustomConfig *inner_cfg, - const SFCustomConfig *outer_cfg); + IniContext *pIniContext, const char *section_name, + const int default_inner_port, const int default_outer_port); + +int sf_load_context_from_config(SFContext *sf_context, + const char *filename, IniContext *pIniContext, + const char *section_name, const int default_inner_port, + const int default_outer_port); void sf_log_config_ex(const char *other_config); diff --git a/src/sf_nio.c b/src/sf_nio.c index db957e2..c3052f2 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -24,33 +24,18 @@ #include "sf_global.h" #include "sf_nio.h" -static int sf_header_size = 0; -static bool sf_remove_from_ready_list = true; -static sf_deal_task_func sf_deal_task = NULL; -static sf_set_body_length_callback sf_set_body_length = NULL; -static TaskCleanUpCallback sf_task_cleanup_func = sf_task_finish_clean_up; -static sf_recv_timeout_callback sf_timeout_callback = NULL; +#define SF_CTX ((SFContext *)(pTask->ctx)) -void sf_set_parameters(const int header_size, sf_set_body_length_callback - set_body_length_func, sf_deal_task_func deal_func, - TaskCleanUpCallback cleanup_func, +void sf_set_parameters_ex(SFContext *sf_context, const int header_size, + sf_set_body_length_callback set_body_length_func, + sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback) { - sf_header_size = header_size; - sf_set_body_length = set_body_length_func; - sf_deal_task = deal_func; - sf_task_cleanup_func = cleanup_func; - sf_timeout_callback = timeout_callback; -} - -void sf_set_remove_from_ready_list(const bool enabled) -{ - sf_remove_from_ready_list = enabled; -} - -TaskCleanUpCallback sf_get_task_cleanup_func() -{ - return sf_task_cleanup_func; + sf_context->header_size = header_size; + sf_context->set_body_length = set_body_length_func; + sf_context->deal_task = deal_func; + sf_context->task_cleanup_func = cleanup_func; + sf_context->timeout_callback = timeout_callback; } static void sf_task_detach_thread(struct fast_task_info *pTask) @@ -63,16 +48,16 @@ static void sf_task_detach_thread(struct fast_task_info *pTask) pTask->event.timer.expires = 0; } - if (sf_remove_from_ready_list) { + if (SF_CTX->remove_from_ready_list) { ioevent_remove(&pTask->thread_data->ev_puller, pTask); } } -void sf_task_switch_thread(struct fast_task_info *pTask, - const int new_thread_index) +void sf_task_switch_thread_ex(SFContext *sf_context, + struct fast_task_info *pTask, const int new_thread_index) { sf_task_detach_thread(pTask); - pTask->thread_data = g_sf_global_vars.thread_data + new_thread_index; + pTask->thread_data = sf_context->thread_data + new_thread_index; } void sf_task_finish_clean_up(struct fast_task_info *pTask) @@ -113,7 +98,7 @@ static inline int set_write_event(struct fast_task_info *pTask) pTask->event.fd, IOEVENT_WRITE, pTask) != 0) { result = errno != 0 ? errno : ENOENT; - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " @@ -137,7 +122,7 @@ static inline int set_read_event(struct fast_task_info *pTask) pTask->event.fd, IOEVENT_READ, pTask) != 0) { result = errno != 0 ? errno : ENOENT; - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " @@ -231,9 +216,12 @@ void sf_recv_notify_read(int sock, short event, void *arg) case SF_NIO_STAGE_SEND: result = sf_send_add_event(pTask); break; + case SF_NIO_STAGE_CONTINUE: //continue deal + result = SF_CTX->deal_task(pTask); + break; case SF_NIO_STAGE_FORWARDED: //forward by other thread if ((result=sf_ioevent_add(pTask)) == 0) { - result = sf_deal_task(pTask); + result = SF_CTX->deal_task(pTask); } break; case SF_NIO_STAGE_CLOSE: @@ -248,7 +236,7 @@ void sf_recv_notify_read(int sock, short event, void *arg) } if (result < 0) { - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); } } } @@ -273,13 +261,17 @@ int sf_client_sock_read(int sock, short event, void *arg) int total_read; struct fast_task_info *pTask; - assert(sock >= 0); pTask = (struct fast_task_info *)arg; + if (pTask->nio_stage != SF_NIO_STAGE_RECV) { + return 0; + } + + assert(sock >= 0); if (event & IOEVENT_TIMEOUT) { if (pTask->offset == 0 && pTask->req_count > 0) { - if (sf_timeout_callback != NULL) { - if (sf_timeout_callback(pTask) != 0) { - sf_task_cleanup_func(pTask); + if (SF_CTX->timeout_callback != NULL) { + if (SF_CTX->timeout_callback(pTask) != 0) { + SF_CTX->task_cleanup_func(pTask); return -1; } } @@ -303,7 +295,7 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, pTask->client_ip, pTask->req_count); } - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } @@ -315,7 +307,7 @@ int sf_client_sock_read(int sock, short event, void *arg) "client ip: %s, recv error event: %d, " "close connection", __LINE__, pTask->client_ip, event); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } @@ -325,7 +317,7 @@ int sf_client_sock_read(int sock, short event, void *arg) &pTask->event.timer, g_current_time + g_sf_global_vars.network_timeout); if (pTask->length == 0) { //recv header - recv_bytes = sf_header_size - pTask->offset; + recv_bytes = SF_CTX->header_size - pTask->offset; } else { recv_bytes = pTask->length - pTask->offset; @@ -349,7 +341,7 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, pTask->client_ip, errno, strerror(errno)); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } } @@ -378,19 +370,19 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, pTask->client_ip, sock); } - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } total_read += bytes; pTask->offset += bytes; if (pTask->length == 0) { //header - if (pTask->offset < sf_header_size) { + if (pTask->offset < SF_CTX->header_size) { break; } - if (sf_set_body_length(pTask) != 0) { - sf_task_cleanup_func(pTask); + if (SF_CTX->set_body_length(pTask) != 0) { + SF_CTX->task_cleanup_func(pTask); return -1; } if (pTask->length < 0) { @@ -399,11 +391,11 @@ int sf_client_sock_read(int sock, short event, void *arg) __LINE__, pTask->client_ip, pTask->length); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } - pTask->length += sf_header_size; + pTask->length += SF_CTX->header_size; if (pTask->length > g_sf_global_vars.max_pkg_size) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d > " @@ -411,7 +403,7 @@ int sf_client_sock_read(int sock, short event, void *arg) pTask->client_ip, pTask->length, g_sf_global_vars.max_pkg_size); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } @@ -424,7 +416,7 @@ int sf_client_sock_read(int sock, short event, void *arg) "from %d to %d fail", __LINE__, pTask->client_ip, pTask->size, pTask->length); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } @@ -438,8 +430,8 @@ int sf_client_sock_read(int sock, short event, void *arg) if (pTask->offset >= pTask->length) { //recv done pTask->req_count++; pTask->nio_stage = SF_NIO_STAGE_SEND; - if (sf_deal_task(pTask) < 0) { //fatal error - sf_task_cleanup_func(pTask); + if (SF_CTX->deal_task(pTask) < 0) { //fatal error + SF_CTX->task_cleanup_func(pTask); return -1; } break; @@ -463,7 +455,7 @@ int sf_client_sock_write(int sock, short event, void *arg) "remain: %d", __LINE__, pTask->client_ip, pTask->length, pTask->offset, pTask->length - pTask->offset); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } @@ -472,7 +464,7 @@ int sf_client_sock_write(int sock, short event, void *arg) "client ip: %s, recv error event: %d, " "close connection", __LINE__, pTask->client_ip, event); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } @@ -505,7 +497,7 @@ int sf_client_sock_write(int sock, short event, void *arg) __LINE__, pTask->client_ip, errno, strerror(errno)); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } } @@ -514,7 +506,7 @@ int sf_client_sock_write(int sock, short event, void *arg) "client ip: %s, sock: %d, send failed, connection disconnected", __LINE__, pTask->client_ip, sock); - sf_task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(pTask); return -1; } diff --git a/src/sf_nio.h b/src/sf_nio.h index c800674..93bfa93 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -14,12 +14,34 @@ extern "C" { #endif -void sf_set_parameters(const int header_size, sf_set_body_length_callback - set_body_length_func, sf_deal_task_func deal_func, - TaskCleanUpCallback cleanup_func, +void sf_set_parameters_ex(SFContext *sf_context, const int header_size, + sf_set_body_length_callback set_body_length_func, + sf_deal_task_func deal_func, TaskCleanUpCallback cleanup_func, sf_recv_timeout_callback timeout_callback); -void sf_set_remove_from_ready_list(const bool enabled); -TaskCleanUpCallback sf_get_task_cleanup_func(); + +#define sf_set_parameters(header_size, set_body_length_func, \ + deal_func, cleanup_func, timeout_callback) \ + sf_set_parameters_ex(&g_sf_context, header_size, \ + set_body_length_func, deal_func, \ + cleanup_func, timeout_callback) + +static inline void sf_set_remove_from_ready_list_ex(SFContext *sf_context, + const bool enabled) +{ + sf_context->remove_from_ready_list = enabled; +} + +#define sf_set_remove_from_ready_list(enabled) \ + sf_set_remove_from_ready_list_ex(&g_sf_context, enabled); + +static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex( + SFContext *sf_context) +{ + return sf_context->task_cleanup_func; +} + +#define sf_get_task_cleanup_func() \ + sf_get_task_cleanup_func_ex(&g_sf_context) void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *pTask); @@ -28,18 +50,24 @@ int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *pTask); -void sf_task_switch_thread(struct fast_task_info *pTask, - const int new_thread_index); +void sf_task_switch_thread_ex(SFContext *sf_context, + struct fast_task_info *pTask, const int new_thread_index); + +#define sf_task_switch_thread(pTask, new_thread_index) \ + sf_task_switch_thread_ex(&g_sf_context, pTask, new_thread_index) int sf_nio_notify(struct fast_task_info *pTask, const int stage); -static inline int sf_nio_forward_request(struct fast_task_info *pTask, - const int new_thread_index) +static inline int sf_nio_forward_request_ex(SFContext *sf_context, + struct fast_task_info *pTask, const int new_thread_index) { - sf_task_switch_thread(pTask, new_thread_index); + sf_task_switch_thread_ex(sf_context, pTask, new_thread_index); return sf_nio_notify(pTask, SF_NIO_STAGE_FORWARDED); } +#define sf_nio_forward_request(pTask, new_thread_index) \ + sf_nio_forward_request_ex(&g_sf_context, pTask, new_thread_index) + static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask) { return (pTask->event.callback == (IOEventCallback)sf_client_sock_read); diff --git a/src/sf_service.c b/src/sf_service.c index 72b8093..273e002 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -20,10 +20,6 @@ #include "sf_nio.h" #include "sf_service.h" -int g_worker_thread_count = 0; -int g_server_outer_sock = -1; -int g_server_inner_sock = -1; -static sf_accept_done_callback sf_accept_done_func = NULL; static bool bTerminateFlag = false; @@ -35,44 +31,36 @@ static void sigUsrHandler(int sig); static void sigDumpHandler(int sig); #endif +struct worker_thread_context { + SFContext *sf_context; + struct nio_thread_data *thread_data; +}; -static void *worker_thread_entrance(void* arg); +struct accept_thread_context { + SFContext *sf_context; + int server_sock; +}; -int sf_service_init(sf_alloc_thread_extra_data_callback - alloc_thread_extra_data_callback, - ThreadLoopCallback thread_loop_callback, - sf_accept_done_callback accept_done_callback, - sf_set_body_length_callback set_body_length_func, - sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, - sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, - const int proto_header_size, const int task_arg_size) +static void *worker_thread_entrance(void *arg); + +static int sf_init_free_queues(const int task_arg_size) { #define ALLOC_CONNECTIONS_ONCE 1024 + + static bool sf_inited = false; int result; - int bytes; int m; int init_connections; int alloc_conn_once; - struct nio_thread_data *pThreadData; - struct nio_thread_data *pDataEnd; - pthread_t tid; - pthread_attr_t thread_attr; - sf_accept_done_func = accept_done_callback; - sf_set_parameters(proto_header_size, set_body_length_func, deal_func, - task_cleanup_func, timeout_callback); - - if ((result=set_rand_seed()) != 0) { - logCrit("file: "__FILE__", line: %d, " - "set_rand_seed fail, program exit!", __LINE__); - return result; + if (sf_inited) { + return 0; } - if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars. - thread_stack_size)) != 0) - { - logError("file: "__FILE__", line: %d, " - "init_pthread_attr fail, program exit!", __LINE__); + sf_inited = true; + if ((result=set_rand_seed()) != 0) { + logCrit("file: "__FILE__", line: %d, " + "set_rand_seed fail, program exit!", __LINE__); return result; } @@ -93,25 +81,72 @@ int sf_service_init(sf_alloc_thread_extra_data_callback return result; } - bytes = sizeof(struct nio_thread_data) * g_sf_global_vars.work_threads; - g_sf_global_vars.thread_data = (struct nio_thread_data *)malloc(bytes); - if (g_sf_global_vars.thread_data == NULL) { + return 0; +} + +int sf_service_init_ex(SFContext *sf_context, + sf_alloc_thread_extra_data_callback + alloc_thread_extra_data_callback, + ThreadLoopCallback thread_loop_callback, + sf_accept_done_callback accept_done_callback, + sf_set_body_length_callback set_body_length_func, + sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, + sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, + const int proto_header_size, const int task_arg_size) +{ + int result; + int bytes; + struct worker_thread_context *thread_contexts; + struct worker_thread_context *thread_ctx; + struct nio_thread_data *pThreadData; + struct nio_thread_data *pDataEnd; + pthread_t tid; + pthread_attr_t thread_attr; + + sf_context->accept_done_func = accept_done_callback; + sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func, + deal_func, task_cleanup_func, timeout_callback); + + if ((result=sf_init_free_queues(task_arg_size)) != 0) { + return result; + } + + if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars. + thread_stack_size)) != 0) + { + logError("file: "__FILE__", line: %d, " + "init_pthread_attr fail, program exit!", __LINE__); + return result; + } + + bytes = sizeof(struct nio_thread_data) * sf_context->work_threads; + sf_context->thread_data = (struct nio_thread_data *)malloc(bytes); + if (sf_context->thread_data == NULL) { + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, bytes, errno, strerror(errno)); + return errno != 0 ? errno : ENOMEM; + } + memset(sf_context->thread_data, 0, bytes); + + bytes = sizeof(struct worker_thread_context) * sf_context->work_threads; + thread_contexts = (struct worker_thread_context *)malloc(bytes); + if (thread_contexts == NULL) { logError("file: "__FILE__", line: %d, " "malloc %d bytes fail, errno: %d, error info: %s", __LINE__, bytes, errno, strerror(errno)); return errno != 0 ? errno : ENOMEM; } - memset(g_sf_global_vars.thread_data, 0, bytes); - g_worker_thread_count = 0; - pDataEnd = g_sf_global_vars.thread_data + g_sf_global_vars.work_threads; - for (pThreadData=g_sf_global_vars.thread_data; pThreadDatathread_count = 0; + pDataEnd = sf_context->thread_data + sf_context->work_threads; + for (pThreadData=sf_context->thread_data,thread_ctx=thread_contexts; + pThreadDatathread_loop_callback = thread_loop_callback; if (alloc_thread_extra_data_callback != NULL) { pThreadData->arg = alloc_thread_extra_data_callback( - (int)(pThreadData - g_sf_global_vars.thread_data)); + (int)(pThreadData - sf_context->thread_data)); } else { pThreadData->arg = NULL; @@ -161,52 +196,55 @@ int sf_service_init(sf_alloc_thread_extra_data_callback } #endif + thread_ctx->sf_context = sf_context; + thread_ctx->thread_data = pThreadData; if ((result=pthread_create(&tid, &thread_attr, - worker_thread_entrance, pThreadData)) != 0) + worker_thread_entrance, thread_ctx)) != 0) { logError("file: "__FILE__", line: %d, " - "create thread failed, startup threads: %d, " - "errno: %d, error info: %s", - __LINE__, g_worker_thread_count, - result, strerror(result)); + "create thread failed, startup threads: %d, " + "errno: %d, error info: %s", + __LINE__, (int)(pThreadData - sf_context->thread_data), + result, strerror(result)); break; } - else { - __sync_fetch_and_add(&g_worker_thread_count, 1); - } } - pthread_attr_destroy(&thread_attr); - return 0; + return result; } -int sf_service_destroy() +int sf_service_destroy_ex(SFContext *sf_context) { struct nio_thread_data *pDataEnd, *pThreadData; free_queue_destroy(); - pDataEnd = g_sf_global_vars.thread_data + g_sf_global_vars.work_threads; - for (pThreadData=g_sf_global_vars.thread_data; pThreadDatathread_data + sf_context->work_threads; + for (pThreadData=sf_context->thread_data; pThreadDatatimer); } - free(g_sf_global_vars.thread_data); - g_sf_global_vars.thread_data = NULL; + free(sf_context->thread_data); + sf_context->thread_data = NULL; return 0; } -static void *worker_thread_entrance(void* arg) +static void *worker_thread_entrance(void *arg) { - struct nio_thread_data *pThreadData; + struct worker_thread_context *thread_ctx; - pThreadData = (struct nio_thread_data *)arg; - ioevent_loop(pThreadData, sf_recv_notify_read, sf_get_task_cleanup_func(), - &g_sf_global_vars.continue_flag); - ioevent_destroy(&pThreadData->ev_puller); + thread_ctx = (struct worker_thread_context *)arg; - __sync_fetch_and_sub(&g_worker_thread_count, 1); + __sync_fetch_and_add(&thread_ctx->sf_context->thread_count, 1); + + ioevent_loop(thread_ctx->thread_data, + sf_recv_notify_read, + sf_get_task_cleanup_func(), + &g_sf_global_vars.continue_flag); + ioevent_destroy(&thread_ctx->thread_data->ev_puller); + + __sync_fetch_and_sub(&thread_ctx->sf_context->thread_count, 1); return NULL; } @@ -225,38 +263,38 @@ static int _socket_server(const char *bind_addr, int port, int *sock) return 0; } -int sf_socket_server() +int sf_socket_server_ex(SFContext *sf_context) { int result; const char *bind_addr; - if (g_sf_global_vars.outer_port == g_sf_global_vars.inner_port) { - if (*g_sf_global_vars.outer_bind_addr == '\0' || - *g_sf_global_vars.inner_bind_addr == '\0') { + if (sf_context->outer_port == sf_context->inner_port) { + if (*sf_context->outer_bind_addr == '\0' || + *sf_context->inner_bind_addr == '\0') { bind_addr = ""; - return _socket_server(bind_addr, g_sf_global_vars.outer_port, - &g_server_outer_sock); - } else if (strcmp(g_sf_global_vars.outer_bind_addr, - g_sf_global_vars.inner_bind_addr) == 0) { - bind_addr = g_sf_global_vars.outer_bind_addr; + return _socket_server(bind_addr, sf_context->outer_port, + &sf_context->outer_sock); + } else if (strcmp(sf_context->outer_bind_addr, + sf_context->inner_bind_addr) == 0) { + bind_addr = sf_context->outer_bind_addr; if (is_private_ip(bind_addr)) { - return _socket_server(bind_addr, g_sf_global_vars. - inner_port, &g_server_inner_sock); + return _socket_server(bind_addr, sf_context-> + inner_port, &sf_context->inner_sock); } else { - return _socket_server(bind_addr, g_sf_global_vars. - outer_port, &g_server_outer_sock); + return _socket_server(bind_addr, sf_context-> + outer_port, &sf_context->outer_sock); } } } - if ((result=_socket_server(g_sf_global_vars.outer_bind_addr, - g_sf_global_vars.outer_port, &g_server_outer_sock)) != 0) + if ((result=_socket_server(sf_context->outer_bind_addr, + sf_context->outer_port, &sf_context->outer_sock)) != 0) { return result; } - if ((result=_socket_server(g_sf_global_vars.inner_bind_addr, - g_sf_global_vars.inner_port, &g_server_inner_sock)) != 0) + if ((result=_socket_server(sf_context->inner_bind_addr, + sf_context->inner_port, &sf_context->inner_sock)) != 0) { return result; } @@ -266,7 +304,7 @@ int sf_socket_server() static void *accept_thread_entrance(void *arg) { - int server_sock; + struct accept_thread_context *accept_context; int incomesock; long task_ptr; struct sockaddr_in inaddr; @@ -274,11 +312,11 @@ static void *accept_thread_entrance(void *arg) struct fast_task_info *pTask; char szClientIp[IP_ADDRESS_SIZE]; - server_sock = (long)arg; + accept_context = (struct accept_thread_context *)arg; while (g_sf_global_vars.continue_flag) { sockaddr_len = sizeof(inaddr); - incomesock = accept(server_sock, (struct sockaddr*)&inaddr, - &sockaddr_len); + incomesock = accept(accept_context->server_sock, + (struct sockaddr*)&inaddr, &sockaddr_len); if (incomesock < 0) { //error if (!(errno == EINTR || errno == EAGAIN)) { logError("file: "__FILE__", line: %d, " @@ -307,12 +345,15 @@ static void *accept_thread_entrance(void *arg) } strcpy(pTask->client_ip, szClientIp); + pTask->ctx = accept_context->sf_context; pTask->nio_stage = SF_NIO_STAGE_INIT; pTask->event.fd = incomesock; - pTask->thread_data = g_sf_global_vars.thread_data + incomesock % - g_sf_global_vars.work_threads; - if (sf_accept_done_func != NULL) { - sf_accept_done_func(pTask, server_sock == g_server_inner_sock); + pTask->thread_data = accept_context->sf_context->thread_data + + incomesock % accept_context->sf_context->work_threads; + if (accept_context->sf_context->accept_done_func != NULL) { + accept_context->sf_context->accept_done_func(pTask, + accept_context->server_sock == + accept_context->sf_context->inner_sock); } task_ptr = (long)pTask; @@ -332,7 +373,8 @@ static void *accept_thread_entrance(void *arg) return NULL; } -void _accept_loop(int server_sock, const int accept_threads) +void _accept_loop(struct accept_thread_context *accept_context, + const int accept_threads) { pthread_t tid; pthread_attr_t thread_attr; @@ -353,7 +395,7 @@ void _accept_loop(int server_sock, const int accept_threads) for (i=0; i= 0) { - if (g_server_inner_sock >= 0) { - _accept_loop(g_server_inner_sock, g_sf_global_vars.accept_threads); + struct accept_thread_context *accept_contexts; + int count; + int bytes; + + if (sf_context->outer_sock >= 0) { + count = 2; + } else { + count = 1; + } + + bytes = sizeof(struct accept_thread_context) * count; + accept_contexts = (struct accept_thread_context *)malloc(bytes); + if (accept_contexts == NULL) { + logError("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, bytes, errno, strerror(errno)); + return; + } + + accept_contexts[0].sf_context = sf_context; + accept_contexts[0].server_sock = sf_context->inner_sock; + + if (sf_context->outer_sock >= 0) { + accept_contexts[1].sf_context = sf_context; + accept_contexts[1].server_sock = sf_context->outer_sock; + + if (sf_context->inner_sock >= 0) { + _accept_loop(accept_contexts, sf_context->accept_threads); } - _accept_loop(g_server_outer_sock, g_sf_global_vars.accept_threads - 1); - accept_thread_entrance((void *)(long)g_server_outer_sock); - } else { - _accept_loop(g_server_inner_sock, g_sf_global_vars.accept_threads - 1); - accept_thread_entrance((void *)(long)g_server_inner_sock); - } + if (block) { + _accept_loop(accept_contexts + 1, sf_context->accept_threads - 1); + accept_thread_entrance(accept_contexts + 1); + } else { + _accept_loop(accept_contexts + 1, sf_context->accept_threads); + } + } else { + if (block) { + _accept_loop(accept_contexts, sf_context->accept_threads - 1); + accept_thread_entrance(accept_contexts); + } else { + _accept_loop(accept_contexts, sf_context->accept_threads); + } + } } #if defined(DEBUG_FLAG) diff --git a/src/sf_service.h b/src/sf_service.h index bbefb6b..c0a364d 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -22,7 +22,8 @@ extern int g_server_inner_sock; extern int g_worker_thread_count; -int sf_service_init(sf_alloc_thread_extra_data_callback +int sf_service_init_ex(SFContext *sf_context, + sf_alloc_thread_extra_data_callback alloc_thread_extra_data_callback, ThreadLoopCallback thread_loop_callback, sf_accept_done_callback accept_done_callback, @@ -30,14 +31,31 @@ int sf_service_init(sf_alloc_thread_extra_data_callback sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size); -int sf_service_destroy(); + +#define sf_service_init(alloc_thread_extra_data_callback, \ + thread_loop_callback, accept_done_callback, set_body_length_func, \ + deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ + proto_header_size, task_arg_size) \ + sf_service_init_ex(&g_sf_context, alloc_thread_extra_data_callback, \ + thread_loop_callback, accept_done_callback, set_body_length_func, \ + deal_func, task_cleanup_func, timeout_callback, net_timeout_ms, \ + proto_header_size, task_arg_size) + +int sf_service_destroy_ex(SFContext *sf_context); + +#define sf_service_destroy() sf_service_destroy_ex(&g_sf_context) int sf_setup_signal_handler(); int sf_startup_schedule(pthread_t *schedule_tid); -int sf_socket_server(); -void sf_accept_loop(); void sf_set_current_time(); +int sf_socket_server_ex(SFContext *sf_context); +#define sf_socket_server() sf_socket_server_ex(&g_sf_context) + +void sf_accept_loop_ex(SFContext *sf_context, const bool block); + +#define sf_accept_loop() sf_accept_loop_ex(&g_sf_context, true) + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index 33be6e0..74f3a0b 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -9,13 +9,36 @@ #include #include #include "fastcommon/connection_pool.h" +#include "fastcommon/fast_task_queue.h" -struct fast_task_info; typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *pTask); typedef int (*sf_deal_task_func)(struct fast_task_info *pTask); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *pTask); +typedef struct sf_context { + struct nio_thread_data *thread_data; + int thread_count; + int outer_sock; + int inner_sock; + + int outer_port; + int inner_port; + int accept_threads; + int work_threads; + + char inner_bind_addr[IP_ADDRESS_SIZE]; + char outer_bind_addr[IP_ADDRESS_SIZE]; + + int header_size; + bool remove_from_ready_list; + sf_deal_task_func deal_task; + sf_set_body_length_callback set_body_length; + TaskCleanUpCallback task_cleanup_func; + sf_recv_timeout_callback timeout_callback; + sf_accept_done_callback accept_done_func; +} SFContext; + #endif