//sf_service.c #include #include #include #include #include #include #include #include #include #include #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/shared_func.h" #include "fastcommon/pthread_func.h" #include "fastcommon/sched_thread.h" #include "fastcommon/ioevent_loop.h" #include "sf_global.h" #include "sf_nio.h" #include "sf_service.h" #if defined(OS_LINUX) #include #endif static bool bTerminateFlag = false; static void sigQuitHandler(int sig); static void sigHupHandler(int sig); static void sigUsrHandler(int sig); #if defined(DEBUG_FLAG) static void sigDumpHandler(int sig); #endif struct worker_thread_context { SFContext *sf_context; struct nio_thread_data *thread_data; }; struct accept_thread_context { SFContext *sf_context; int server_sock; }; static void *worker_thread_entrance(void *arg); static int sf_init_free_queues(const int task_arg_size) { #define ALLOC_CONNECTIONS_ONCE 1024 static bool sf_inited = false; int result; int m; int init_connections; int alloc_conn_once; if (sf_inited) { return 0; } sf_inited = true; if ((result=set_rand_seed()) != 0) { logCrit("file: "__FILE__", line: %d, " "set_rand_seed fail, program exit!", __LINE__); return result; } m = g_sf_global_vars.min_buff_size / (64 * 1024); if (m == 0) { m = 1; } else if (m > 16) { m = 16; } alloc_conn_once = ALLOC_CONNECTIONS_ONCE / m; init_connections = g_sf_global_vars.max_connections < alloc_conn_once ? g_sf_global_vars.max_connections : alloc_conn_once; if ((result=free_queue_init_ex(g_sf_global_vars.max_connections, init_connections, alloc_conn_once, g_sf_global_vars. min_buff_size, g_sf_global_vars.max_buff_size, task_arg_size)) != 0) { return result; } return 0; } int sf_service_init_ex(SFContext *sf_context, sf_alloc_thread_extra_data_callback alloc_thread_extra_data_callback, ThreadLoopCallback thread_loop_callback, sf_accept_done_callback accept_done_callback, sf_set_body_length_callback set_body_length_func, sf_deal_task_func deal_func, TaskCleanUpCallback task_cleanup_func, sf_recv_timeout_callback timeout_callback, const int net_timeout_ms, const int proto_header_size, const int task_arg_size) { int result; int bytes; struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_ctx; struct nio_thread_data *thread_data; struct nio_thread_data *pDataEnd; pthread_t tid; pthread_attr_t thread_attr; sf_context->realloc_task_buffer = g_sf_global_vars. min_buff_size < g_sf_global_vars.max_buff_size; sf_context->accept_done_func = accept_done_callback; sf_set_parameters_ex(sf_context, proto_header_size, set_body_length_func, deal_func, task_cleanup_func, timeout_callback); if ((result=sf_init_free_queues(task_arg_size)) != 0) { return result; } if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars. thread_stack_size)) != 0) { logError("file: "__FILE__", line: %d, " "init_pthread_attr fail, program exit!", __LINE__); return result; } bytes = sizeof(struct nio_thread_data) * sf_context->work_threads; sf_context->thread_data = (struct nio_thread_data *)malloc(bytes); if (sf_context->thread_data == NULL) { logError("file: "__FILE__", line: %d, " "malloc %d bytes fail, errno: %d, error info: %s", __LINE__, bytes, errno, strerror(errno)); return errno != 0 ? errno : ENOMEM; } memset(sf_context->thread_data, 0, bytes); bytes = sizeof(struct worker_thread_context) * sf_context->work_threads; thread_contexts = (struct worker_thread_context *)malloc(bytes); if (thread_contexts == NULL) { logError("file: "__FILE__", line: %d, " "malloc %d bytes fail, errno: %d, error info: %s", __LINE__, bytes, errno, strerror(errno)); return errno != 0 ? errno : ENOMEM; } sf_context->thread_count = 0; pDataEnd = sf_context->thread_data + sf_context->work_threads; for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts; thread_datathread_loop_callback = thread_loop_callback; if (alloc_thread_extra_data_callback != NULL) { thread_data->arg = alloc_thread_extra_data_callback( (int)(thread_data - sf_context->thread_data)); } else { thread_data->arg = NULL; } if (ioevent_init(&thread_data->ev_puller, g_sf_global_vars.max_connections + 2, net_timeout_ms, 0) != 0) { result = errno != 0 ? errno : ENOMEM; logError("file: "__FILE__", line: %d, " "ioevent_init fail, " "errno: %d, error info: %s", __LINE__, result, strerror(result)); return result; } result = fast_timer_init(&thread_data->timer, 2 * g_sf_global_vars.network_timeout, g_current_time); if (result != 0) { logError("file: "__FILE__", line: %d, " "fast_timer_init fail, " "errno: %d, error info: %s", __LINE__, result, strerror(result)); return result; } if ((result=init_pthread_lock(&thread_data->waiting_queue.lock)) != 0) { return result; } #if defined(OS_LINUX) FC_NOTIFY_READ_FD(thread_data) = eventfd(0, EFD_NONBLOCK); if (FC_NOTIFY_READ_FD(thread_data) < 0) { result = errno != 0 ? errno : EPERM; logError("file: "__FILE__", line: %d, " "call eventfd fail, " "errno: %d, error info: %s", __LINE__, result, strerror(result)); break; } FC_NOTIFY_WRITE_FD(thread_data) = FC_NOTIFY_READ_FD(thread_data); #else if (pipe(thread_data->pipe_fds) != 0) { result = errno != 0 ? errno : EPERM; logError("file: "__FILE__", line: %d, " "call pipe fail, " "errno: %d, error info: %s", __LINE__, result, strerror(result)); break; } if ((result=fd_add_flags(FC_NOTIFY_READ_FD(thread_data), O_NONBLOCK)) != 0) { break; } #endif thread_ctx->sf_context = sf_context; thread_ctx->thread_data = thread_data; if ((result=pthread_create(&tid, &thread_attr, worker_thread_entrance, thread_ctx)) != 0) { logError("file: "__FILE__", line: %d, " "create thread failed, startup threads: %d, " "errno: %d, error info: %s", __LINE__, (int)(thread_data - sf_context->thread_data), result, strerror(result)); break; } } pthread_attr_destroy(&thread_attr); return result; } int sf_service_destroy_ex(SFContext *sf_context) { struct nio_thread_data *pDataEnd, *thread_data; free_queue_destroy(); pDataEnd = sf_context->thread_data + sf_context->work_threads; for (thread_data=sf_context->thread_data; thread_datatimer); } free(sf_context->thread_data); sf_context->thread_data = NULL; return 0; } static void *worker_thread_entrance(void *arg) { struct worker_thread_context *thread_ctx; thread_ctx = (struct worker_thread_context *)arg; __sync_fetch_and_add(&thread_ctx->sf_context->thread_count, 1); ioevent_loop(thread_ctx->thread_data, sf_recv_notify_read, thread_ctx->sf_context->task_cleanup_func, &g_sf_global_vars.continue_flag); ioevent_destroy(&thread_ctx->thread_data->ev_puller); __sync_fetch_and_sub(&thread_ctx->sf_context->thread_count, 1); return NULL; } static int _socket_server(const char *bind_addr, int port, int *sock) { int result; *sock = socketServer(bind_addr, port, &result); if (*sock < 0) { return result; } if ((result=tcpsetserveropt(*sock, g_sf_global_vars.network_timeout)) != 0) { return result; } return 0; } int sf_socket_server_ex(SFContext *sf_context) { int result; const char *bind_addr; sf_context->inner_sock = sf_context->outer_sock = -1; if (sf_context->outer_port == sf_context->inner_port) { if (*sf_context->outer_bind_addr == '\0' || *sf_context->inner_bind_addr == '\0') { bind_addr = ""; return _socket_server(bind_addr, sf_context->outer_port, &sf_context->outer_sock); } else if (strcmp(sf_context->outer_bind_addr, sf_context->inner_bind_addr) == 0) { bind_addr = sf_context->outer_bind_addr; if (is_private_ip(bind_addr)) { return _socket_server(bind_addr, sf_context-> inner_port, &sf_context->inner_sock); } else { return _socket_server(bind_addr, sf_context-> outer_port, &sf_context->outer_sock); } } } if ((result=_socket_server(sf_context->outer_bind_addr, sf_context->outer_port, &sf_context->outer_sock)) != 0) { return result; } if ((result=_socket_server(sf_context->inner_bind_addr, sf_context->inner_port, &sf_context->inner_sock)) != 0) { return result; } return 0; } static void *accept_thread_entrance(void *arg) { struct accept_thread_context *accept_context; int incomesock; struct sockaddr_in inaddr; socklen_t sockaddr_len; struct fast_task_info *task; char szClientIp[IP_ADDRESS_SIZE]; accept_context = (struct accept_thread_context *)arg; while (g_sf_global_vars.continue_flag) { sockaddr_len = sizeof(inaddr); incomesock = accept(accept_context->server_sock, (struct sockaddr*)&inaddr, &sockaddr_len); if (incomesock < 0) { //error if (!(errno == EINTR || errno == EAGAIN)) { logError("file: "__FILE__", line: %d, " "accept fail, errno: %d, error info: %s", __LINE__, errno, strerror(errno)); } continue; } getPeerIpaddr(incomesock, szClientIp, IP_ADDRESS_SIZE); if (tcpsetnonblockopt(incomesock) != 0) { close(incomesock); continue; } task = free_queue_pop(); if (task == NULL) { logError("file: "__FILE__", line: %d, " "malloc task buff failed, you should " "increase the parameter: max_connections", __LINE__); close(incomesock); continue; } strcpy(task->client_ip, szClientIp); task->canceled = false; task->ctx = accept_context->sf_context; task->event.fd = incomesock; task->thread_data = accept_context->sf_context->thread_data + incomesock % accept_context->sf_context->work_threads; if (accept_context->sf_context->accept_done_func != NULL) { accept_context->sf_context->accept_done_func(task, accept_context->server_sock == accept_context->sf_context->inner_sock); } if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { close(incomesock); free_queue_push(task); } } return NULL; } void _accept_loop(struct accept_thread_context *accept_context, const int accept_threads) { pthread_t tid; pthread_attr_t thread_attr; int result; int i; if (accept_threads <= 0) { return; } if ((result=init_pthread_attr(&thread_attr, g_sf_global_vars. thread_stack_size)) != 0) { logWarning("file: "__FILE__", line: %d, " "init_pthread_attr fail!", __LINE__); } else { for (i=0; iouter_sock >= 0) { count = 2; } else { count = 1; } bytes = sizeof(struct accept_thread_context) * count; accept_contexts = (struct accept_thread_context *)malloc(bytes); if (accept_contexts == NULL) { logError("file: "__FILE__", line: %d, " "malloc %d bytes fail, errno: %d, error info: %s", __LINE__, bytes, errno, strerror(errno)); return; } accept_contexts[0].sf_context = sf_context; accept_contexts[0].server_sock = sf_context->inner_sock; if (sf_context->outer_sock >= 0) { accept_contexts[1].sf_context = sf_context; accept_contexts[1].server_sock = sf_context->outer_sock; if (sf_context->inner_sock >= 0) { _accept_loop(accept_contexts, sf_context->accept_threads); } if (block) { _accept_loop(accept_contexts + 1, sf_context->accept_threads - 1); accept_thread_entrance(accept_contexts + 1); } else { _accept_loop(accept_contexts + 1, sf_context->accept_threads); } } else { if (block) { _accept_loop(accept_contexts, sf_context->accept_threads - 1); accept_thread_entrance(accept_contexts); } else { _accept_loop(accept_contexts, sf_context->accept_threads); } } } #if defined(DEBUG_FLAG) static void sigDumpHandler(int sig) { static bool bDumpFlag = false; char filename[256]; if (bDumpFlag) { return; } bDumpFlag = true; snprintf(filename, sizeof(filename), "%s/logs/sf_dump.log", g_sf_global_vars.base_path); //manager_dump_global_vars_to_file(filename); bDumpFlag = false; } #endif static void sigQuitHandler(int sig) { if (!bTerminateFlag) { bTerminateFlag = true; g_sf_global_vars.continue_flag = false; logCrit("file: "__FILE__", line: %d, " "catch signal %d, program exiting...", __LINE__, sig); } } static void sigHupHandler(int sig) { logInfo("file: "__FILE__", line: %d, " "catch signal %d", __LINE__, sig); } static void sigUsrHandler(int sig) { logInfo("file: "__FILE__", line: %d, " "catch signal %d, ignore it", __LINE__, sig); } int sf_setup_signal_handler() { struct sigaction act; memset(&act, 0, sizeof(act)); sigemptyset(&act.sa_mask); act.sa_handler = sigUsrHandler; if(sigaction(SIGUSR1, &act, NULL) < 0 || sigaction(SIGUSR2, &act, NULL) < 0) { logCrit("file: "__FILE__", line: %d, " "call sigaction fail, errno: %d, error info: %s", __LINE__, errno, strerror(errno)); logCrit("exit abnormally!\n"); return errno; } act.sa_handler = sigHupHandler; if(sigaction(SIGHUP, &act, NULL) < 0) { logCrit("file: "__FILE__", line: %d, " "call sigaction fail, errno: %d, error info: %s", __LINE__, errno, strerror(errno)); logCrit("exit abnormally!\n"); return errno; } act.sa_handler = SIG_IGN; if(sigaction(SIGPIPE, &act, NULL) < 0) { logCrit("file: "__FILE__", line: %d, " "call sigaction fail, errno: %d, error info: %s", __LINE__, errno, strerror(errno)); logCrit("exit abnormally!\n"); return errno; } act.sa_handler = sigQuitHandler; if(sigaction(SIGINT, &act, NULL) < 0 || sigaction(SIGTERM, &act, NULL) < 0 || sigaction(SIGQUIT, &act, NULL) < 0) { logCrit("file: "__FILE__", line: %d, " "call sigaction fail, errno: %d, error info: %s", __LINE__, errno, strerror(errno)); logCrit("exit abnormally!\n"); return errno; } #if defined(DEBUG_FLAG) memset(&act, 0, sizeof(act)); sigemptyset(&act.sa_mask); act.sa_handler = sigDumpHandler; if(sigaction(SIGUSR1, &act, NULL) < 0 || sigaction(SIGUSR2, &act, NULL) < 0) { logCrit("file: "__FILE__", line: %d, " "call sigaction fail, errno: %d, error info: %s", __LINE__, errno, strerror(errno)); logCrit("exit abnormally!\n"); return errno; } #endif return 0; } int sf_startup_schedule(pthread_t *schedule_tid) { #define SCHEDULE_ENTRIES_COUNT 3 ScheduleArray scheduleArray; ScheduleEntry scheduleEntries[SCHEDULE_ENTRIES_COUNT]; int index; scheduleArray.entries = scheduleEntries; scheduleArray.count = 0; memset(scheduleEntries, 0, sizeof(scheduleEntries)); index = scheduleArray.count++; INIT_SCHEDULE_ENTRY(scheduleEntries[index], sched_generate_next_id(), TIME_NONE, TIME_NONE, 0, g_sf_global_vars.sync_log_buff_interval, log_sync_func, &g_log_context); if (g_sf_global_vars.rotate_error_log) { log_set_rotate_time_format(&g_log_context, "%Y%m%d"); index = scheduleArray.count++; INIT_SCHEDULE_ENTRY(scheduleEntries[index], sched_generate_next_id(), 0, 0, 0, 86400, log_notify_rotate, &g_log_context); if (g_sf_global_vars.log_file_keep_days > 0) { log_set_keep_days(&g_log_context, g_sf_global_vars.log_file_keep_days); index = scheduleArray.count++; INIT_SCHEDULE_ENTRY(scheduleEntries[index], sched_generate_next_id(), 1, 0, 0, 86400, log_delete_old_files, &g_log_context); } } return sched_start(&scheduleArray, schedule_tid, g_sf_global_vars.thread_stack_size, (bool * volatile) &g_sf_global_vars.continue_flag); } void sf_set_current_time() { g_current_time = time(NULL); g_sf_global_vars.up_time = g_current_time; srand(g_sf_global_vars.up_time); } void sf_enable_thread_notify_ex(SFContext *sf_context, const bool enabled) { struct nio_thread_data *thread_data; struct nio_thread_data *pDataEnd; pDataEnd = sf_context->thread_data + sf_context->work_threads; for (thread_data=sf_context->thread_data; thread_datanotify.enabled = enabled; } }