From 965c8277c708bc23ba0d4d1c9cb592d9c4fa3300 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sun, 8 Mar 2020 16:14:41 +0800 Subject: [PATCH] use Linux eventfd for notify --- src/sf_nio.c | 448 +++++++++++++++++++++++++++-------------------- src/sf_nio.h | 3 + src/sf_service.c | 100 +++++------ 3 files changed, 315 insertions(+), 236 deletions(-) diff --git a/src/sf_nio.c b/src/sf_nio.c index 1cbff3f..97accdd 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -24,7 +24,7 @@ #include "sf_global.h" #include "sf_nio.h" -#define SF_CTX ((SFContext *)(pTask->ctx)) +#define SF_CTX ((SFContext *)(task->ctx)) void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, @@ -38,67 +38,67 @@ void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_context->timeout_callback = timeout_callback; } -static void sf_task_detach_thread(struct fast_task_info *pTask) +static void sf_task_detach_thread(struct fast_task_info *task) { - ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd); + ioevent_detach(&task->thread_data->ev_puller, task->event.fd); - if (pTask->event.timer.expires > 0) { - fast_timer_remove(&pTask->thread_data->timer, - &pTask->event.timer); - pTask->event.timer.expires = 0; + if (task->event.timer.expires > 0) { + fast_timer_remove(&task->thread_data->timer, + &task->event.timer); + task->event.timer.expires = 0; } if (SF_CTX->remove_from_ready_list) { - ioevent_remove(&pTask->thread_data->ev_puller, pTask); + ioevent_remove(&task->thread_data->ev_puller, task); } } -void sf_task_switch_thread(struct fast_task_info *pTask, +void sf_task_switch_thread(struct fast_task_info *task, const int new_thread_index) { - sf_task_detach_thread(pTask); - pTask->thread_data = SF_CTX->thread_data + new_thread_index; + sf_task_detach_thread(task); + task->thread_data = SF_CTX->thread_data + new_thread_index; } -void sf_task_finish_clean_up(struct fast_task_info *pTask) +void sf_task_finish_clean_up(struct fast_task_info *task) { /* - assert(pTask->event.fd >= 0); - if (pTask->event.fd < 0) { + assert(task->event.fd >= 0); + if (task->event.fd < 0) { logWarning("file: "__FILE__", line: %d, " - "pTask: %p already cleaned", - __LINE__, pTask); + "task: %p already cleaned", + __LINE__, task); return; } */ - if (pTask->finish_callback != NULL) { - pTask->finish_callback(pTask); - pTask->finish_callback = NULL; + if (task->finish_callback != NULL) { + task->finish_callback(task); + task->finish_callback = NULL; } - sf_task_detach_thread(pTask); - close(pTask->event.fd); - pTask->event.fd = -1; + sf_task_detach_thread(task); + close(task->event.fd); + task->event.fd = -1; __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); - free_queue_push(pTask); + free_queue_push(task); } -static inline int set_write_event(struct fast_task_info *pTask) +static inline int set_write_event(struct fast_task_info *task) { int result; - if (pTask->event.callback == (IOEventCallback)sf_client_sock_write) { + if (task->event.callback == (IOEventCallback)sf_client_sock_write) { return 0; } - pTask->event.callback = (IOEventCallback)sf_client_sock_write; - if (ioevent_modify(&pTask->thread_data->ev_puller, - pTask->event.fd, IOEVENT_WRITE, pTask) != 0) + task->event.callback = (IOEventCallback)sf_client_sock_write; + if (ioevent_modify(&task->thread_data->ev_puller, + task->event.fd, IOEVENT_WRITE, task) != 0) { result = errno != 0 ? errno : ENOENT; - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " @@ -109,20 +109,20 @@ static inline int set_write_event(struct fast_task_info *pTask) return 0; } -static inline int set_read_event(struct fast_task_info *pTask) +static inline int set_read_event(struct fast_task_info *task) { int result; - if (pTask->event.callback == (IOEventCallback)sf_client_sock_read) { + if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } - pTask->event.callback = (IOEventCallback)sf_client_sock_read; - if (ioevent_modify(&pTask->thread_data->ev_puller, - pTask->event.fd, IOEVENT_READ, pTask) != 0) + task->event.callback = (IOEventCallback)sf_client_sock_read; + if (ioevent_modify(&task->thread_data->ev_puller, + task->event.fd, IOEVENT_READ, task) != 0) { result = errno != 0 ? errno : ENOENT; - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); logError("file: "__FILE__", line: %d, " "ioevent_modify fail, " @@ -134,38 +134,17 @@ static inline int set_read_event(struct fast_task_info *pTask) return 0; } -int sf_nio_notify(struct fast_task_info *pTask, const int stage) -{ - long task_addr; - - task_addr = (long)pTask; - pTask->nio_stage = stage; - if (write(pTask->thread_data->pipe_fds[1], &task_addr, - sizeof(task_addr)) != sizeof(task_addr)) - { - int result; - result = errno != 0 ? errno : EIO; - logError("file: "__FILE__", line: %d, " - "write to pipe %d fail, errno: %d, error info: %s", - __LINE__, pTask->thread_data->pipe_fds[1], - result, STRERROR(result)); - return result; - } - - return 0; -} - -static int sf_ioevent_add(struct fast_task_info *pTask) +static int sf_ioevent_add(struct fast_task_info *task) { int result; - result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd, + result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ, (IOEventCallback)sf_client_sock_read, g_sf_global_vars.network_timeout); return result > 0 ? -1 * result : result; } -static int sf_nio_init(struct fast_task_info *pTask) +static int sf_nio_init(struct fast_task_info *task) { int current_connections; @@ -175,15 +154,146 @@ static int sf_nio_init(struct fast_task_info *pTask) g_sf_global_vars.connection_stat.max_count = current_connections; } - return sf_ioevent_add(pTask); + return sf_ioevent_add(task); +} + +static int sf_nio_deal_task(struct fast_task_info *task) +{ + int result; + switch (task->nio_stage) { + case SF_NIO_STAGE_INIT: + task->nio_stage = SF_NIO_STAGE_RECV; + result = sf_nio_init(task); + break; + case SF_NIO_STAGE_RECV: + if ((result=set_read_event(task)) == 0) + { + sf_client_sock_read(task->event.fd, + IOEVENT_READ, task); + } + break; + case SF_NIO_STAGE_SEND: + result = sf_send_add_event(task); + break; + case SF_NIO_STAGE_CONTINUE: //continue deal + result = SF_CTX->deal_task(task); + break; + case SF_NIO_STAGE_FORWARDED: //forward by other thread + if ((result=sf_ioevent_add(task)) == 0) { + result = SF_CTX->deal_task(task); + } + break; + case SF_NIO_STAGE_CLOSE: + result = -EIO; //close this socket + break; + default: + logError("file: "__FILE__", line: %d, " + "client ip: %s, invalid stage: %d", + __LINE__, task->client_ip, task->nio_stage); + result = -EINVAL; + break; + } + + if (result < 0) { + SF_CTX->task_cleanup_func(task); + } + + return result; +} + +#if defined(OS_LINUX) +int sf_nio_notify(struct fast_task_info *task, const int stage) +{ + int64_t n; + int result; + bool notify; + + task->nio_stage = stage; + task->next = NULL; + + pthread_mutex_lock(&task->thread_data.waiting_queue.lock); + if (task->thread_data.waiting_queue.tail == NULL) { + task->thread_data.waiting_queue.head = task; + notify = true; + } else { + task->thread_data.waiting_queue.tail->next = task; + notify = false; + } + task->thread_data.waiting_queue.tail = task; + pthread_mutex_unlock(&task->thread_data.waiting_queue.lock); + + if (notify) { + n = 1; + if (write(NOTIFY_WRITE_FD(task->thread_data), + &n, sizeof(n)) != sizeof(n)) + { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write eventfd %d fail, errno: %d, error info: %s", + __LINE__, NOTIFY_WRITE_FD(task->thread_data), + result, STRERROR(result)); + return result; + } + } + + return 0; +} + +void sf_recv_notify_read(int sock, short event, void *arg) +{ + int64_t n; + struct nio_thread_data *thread_data; + struct fast_task_info *task; + struct fast_task_info *current; + + thread_data = (struct nio_thread_data *)arg; + if (read(sock, &n, sizeof(n)) < 0) { + logWarning("file: "__FILE__", line: %d, " + "read from eventfd %d fail, errno: %d, error info: %s", + __LINE__, sock, errno, STRERROR(errno)); + } + + pthread_mutex_lock(&thread_data.waiting_queue.lock); + current = thread_data->waiting_queue.head; + thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL; + pthread_mutex_unlock(&thread_data.waiting_queue.lock); + + while (current != NULL) { + task = current; + current = current->next; + + sf_nio_deal_task(task); + } +} + +#else + +int sf_nio_notify(struct fast_task_info *task, const int stage) +{ + long task_addr; + int result; + + task_addr = (long)task; + task->nio_stage = stage; + if (write(NOTIFY_WRITE_FD(task->thread_data), &task_addr, + sizeof(task_addr)) != sizeof(task_addr)) + { + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write to pipe %d fail, errno: %d, error info: %s", + __LINE__, NOTIFY_WRITE_FD(task->thread_data), + result, STRERROR(result)); + return result; + } + + return 0; } void sf_recv_notify_read(int sock, short event, void *arg) { int bytes; - int result; long task_ptr; - struct fast_task_info *pTask; + struct fast_task_info *task; while (1) { if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) { @@ -200,53 +310,18 @@ void sf_recv_notify_read(int sock, short event, void *arg) break; } - pTask = (struct fast_task_info *)task_ptr; - switch (pTask->nio_stage) { - case SF_NIO_STAGE_INIT: - pTask->nio_stage = SF_NIO_STAGE_RECV; - result = sf_nio_init(pTask); - break; - case SF_NIO_STAGE_RECV: - if ((result=set_read_event(pTask)) == 0) - { - sf_client_sock_read(pTask->event.fd, - IOEVENT_READ, pTask); - } - break; - case SF_NIO_STAGE_SEND: - result = sf_send_add_event(pTask); - break; - case SF_NIO_STAGE_CONTINUE: //continue deal - result = SF_CTX->deal_task(pTask); - break; - case SF_NIO_STAGE_FORWARDED: //forward by other thread - if ((result=sf_ioevent_add(pTask)) == 0) { - result = SF_CTX->deal_task(pTask); - } - break; - case SF_NIO_STAGE_CLOSE: - result = -EIO; //close this socket - break; - default: - logError("file: "__FILE__", line: %d, " - "client ip: %s, invalid stage: %d", - __LINE__, pTask->client_ip, pTask->nio_stage); - result = -EINVAL; - break; - } - - if (result < 0) { - SF_CTX->task_cleanup_func(pTask); - } + task = (struct fast_task_info *)task_ptr; + sf_nio_deal_task(task); } } +#endif -int sf_send_add_event(struct fast_task_info *pTask) +int sf_send_add_event(struct fast_task_info *task) { - pTask->offset = 0; - if (pTask->length > 0) { + task->offset = 0; + if (task->length > 0) { /* direct send */ - if (sf_client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask) < 0) { + if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { return errno != 0 ? errno : EIO; } } @@ -259,43 +334,43 @@ int sf_client_sock_read(int sock, short event, void *arg) int bytes; int recv_bytes; int total_read; - struct fast_task_info *pTask; + struct fast_task_info *task; - pTask = (struct fast_task_info *)arg; - if (pTask->nio_stage != SF_NIO_STAGE_RECV) { + task = (struct fast_task_info *)arg; + if (task->nio_stage != SF_NIO_STAGE_RECV) { return 0; } assert(sock >= 0); if (event & IOEVENT_TIMEOUT) { - if (pTask->offset == 0 && pTask->req_count > 0) { + if (task->offset == 0 && task->req_count > 0) { if (SF_CTX->timeout_callback != NULL) { - if (SF_CTX->timeout_callback(pTask) != 0) { - SF_CTX->task_cleanup_func(pTask); + if (SF_CTX->timeout_callback(task) != 0) { + SF_CTX->task_cleanup_func(task); return -1; } } - pTask->event.timer.expires = g_current_time + + task->event.timer.expires = g_current_time + g_sf_global_vars.network_timeout; - fast_timer_add(&pTask->thread_data->timer, - &pTask->event.timer); + fast_timer_add(&task->thread_data->timer, + &task->event.timer); } else { - if (pTask->length > 0) { + if (task->length > 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, recv timeout, " "recv offset: %d, expect length: %d", - __LINE__, pTask->client_ip, - pTask->offset, pTask->length); + __LINE__, task->client_ip, + task->offset, task->length); } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, req_count: %"PRId64", recv timeout", - __LINE__, pTask->client_ip, pTask->req_count); + __LINE__, task->client_ip, task->req_count); } - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } @@ -305,25 +380,25 @@ int sf_client_sock_read(int sock, short event, void *arg) if (event & IOEVENT_ERROR) { logDebug("file: "__FILE__", line: %d, " "client ip: %s, recv error event: %d, " - "close connection", __LINE__, pTask->client_ip, event); + "close connection", __LINE__, task->client_ip, event); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } total_read = 0; while (1) { - fast_timer_modify(&pTask->thread_data->timer, - &pTask->event.timer, g_current_time + + fast_timer_modify(&task->thread_data->timer, + &task->event.timer, g_current_time + g_sf_global_vars.network_timeout); - if (pTask->length == 0) { //recv header - recv_bytes = SF_CTX->header_size - pTask->offset; + if (task->length == 0) { //recv header + recv_bytes = SF_CTX->header_size - task->offset; } else { - recv_bytes = pTask->length - pTask->offset; + recv_bytes = task->length - task->offset; } - bytes = read(sock, pTask->data + pTask->offset, recv_bytes); + bytes = read(sock, task->data + task->offset, recv_bytes); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; @@ -331,107 +406,107 @@ int sf_client_sock_read(int sock, short event, void *arg) else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", - __LINE__, pTask->client_ip); + __LINE__, task->client_ip); continue; } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, recv fail, " "errno: %d, error info: %s", - __LINE__, pTask->client_ip, + __LINE__, task->client_ip, errno, strerror(errno)); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } } else if (bytes == 0) { - if (pTask->offset > 0) { - if (pTask->length > 0) { + if (task->offset > 0) { + if (task->length > 0) { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection " "disconnected, expect pkg length: %d, " "recv pkg length: %d", __LINE__, - pTask->client_ip, pTask->length, - pTask->offset); + task->client_ip, task->length, + task->offset); } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, connection " "disconnected, recv pkg length: %d", - __LINE__, pTask->client_ip, - pTask->offset); + __LINE__, task->client_ip, + task->offset); } } else { logDebug("file: "__FILE__", line: %d, " "client ip: %s, sock: %d, recv fail, " "connection disconnected", - __LINE__, pTask->client_ip, sock); + __LINE__, task->client_ip, sock); } - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } total_read += bytes; - pTask->offset += bytes; - if (pTask->length == 0) { //header - if (pTask->offset < SF_CTX->header_size) { + task->offset += bytes; + if (task->length == 0) { //header + if (task->offset < SF_CTX->header_size) { break; } - if (SF_CTX->set_body_length(pTask) != 0) { - SF_CTX->task_cleanup_func(pTask); + if (SF_CTX->set_body_length(task) != 0) { + SF_CTX->task_cleanup_func(task); return -1; } - if (pTask->length < 0) { + if (task->length < 0) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d < 0", - __LINE__, pTask->client_ip, - pTask->length); + __LINE__, task->client_ip, + task->length); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } - pTask->length += SF_CTX->header_size; - if (pTask->length > g_sf_global_vars.max_pkg_size) { + task->length += SF_CTX->header_size; + if (task->length > g_sf_global_vars.max_pkg_size) { logError("file: "__FILE__", line: %d, " "client ip: %s, pkg length: %d > " "max pkg size: %d", __LINE__, - pTask->client_ip, pTask->length, + task->client_ip, task->length, g_sf_global_vars.max_pkg_size); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } - if (pTask->length > pTask->size) { + if (task->length > task->size) { int old_size; - old_size = pTask->size; - if (free_queue_realloc_buffer(pTask, pTask->length) != 0) { + old_size = task->size; + if (free_queue_realloc_buffer(task, task->length) != 0) { logError("file: "__FILE__", line: %d, " "client ip: %s, realloc buffer size " "from %d to %d fail", __LINE__, - pTask->client_ip, pTask->size, pTask->length); + task->client_ip, task->size, task->length); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } logDebug("file: "__FILE__", line: %d, " "client ip: %s, task length: %d, realloc buffer size " - "from %d to %d", __LINE__, pTask->client_ip, - pTask->length, old_size, pTask->size); + "from %d to %d", __LINE__, task->client_ip, + task->length, old_size, task->size); } } - if (pTask->offset >= pTask->length) { //recv done - pTask->req_count++; - pTask->nio_stage = SF_NIO_STAGE_SEND; - if (SF_CTX->deal_task(pTask) < 0) { //fatal error - SF_CTX->task_cleanup_func(pTask); + if (task->offset >= task->length) { //recv done + task->req_count++; + task->nio_stage = SF_NIO_STAGE_SEND; + if (SF_CTX->deal_task(task) < 0) { //fatal error + SF_CTX->task_cleanup_func(task); return -1; } break; @@ -445,41 +520,41 @@ int sf_client_sock_write(int sock, short event, void *arg) { int bytes; int total_write; - struct fast_task_info *pTask; + struct fast_task_info *task; assert(sock >= 0); - pTask = (struct fast_task_info *)arg; + task = (struct fast_task_info *)arg; if (event & IOEVENT_TIMEOUT) { logError("file: "__FILE__", line: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, " - "remain: %d", __LINE__, pTask->client_ip, pTask->length, - pTask->offset, pTask->length - pTask->offset); + "remain: %d", __LINE__, task->client_ip, task->length, + task->offset, task->length - task->offset); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } if (event & IOEVENT_ERROR) { logDebug("file: "__FILE__", line: %d, " "client ip: %s, recv error event: %d, " - "close connection", __LINE__, pTask->client_ip, event); + "close connection", __LINE__, task->client_ip, event); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } total_write = 0; while (1) { - fast_timer_modify(&pTask->thread_data->timer, - &pTask->event.timer, g_current_time + + fast_timer_modify(&task->thread_data->timer, + &task->event.timer, g_current_time + g_sf_global_vars.network_timeout); - bytes = write(sock, pTask->data + pTask->offset, - pTask->length - pTask->offset); + bytes = write(sock, task->data + task->offset, + task->length - task->offset); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (set_write_event(pTask) != 0) { + if (set_write_event(task) != 0) { return -1; } break; @@ -487,36 +562,37 @@ int sf_client_sock_write(int sock, short event, void *arg) else if (errno == EINTR) { //should retry logDebug("file: "__FILE__", line: %d, " "client ip: %s, ignore interupt signal", - __LINE__, pTask->client_ip); + __LINE__, task->client_ip); continue; } else { logWarning("file: "__FILE__", line: %d, " "client ip: %s, send fail, " "errno: %d, error info: %s", - __LINE__, pTask->client_ip, + __LINE__, task->client_ip, errno, strerror(errno)); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } } else if (bytes == 0) { logWarning("file: "__FILE__", line: %d, " - "client ip: %s, sock: %d, send failed, connection disconnected", - __LINE__, pTask->client_ip, sock); + "client ip: %s, sock: %d, send failed, " + "connection disconnected", + __LINE__, task->client_ip, sock); - SF_CTX->task_cleanup_func(pTask); + SF_CTX->task_cleanup_func(task); return -1; } total_write += bytes; - pTask->offset += bytes; - if (pTask->offset >= pTask->length) { - pTask->offset = 0; - pTask->length = 0; - pTask->nio_stage = SF_NIO_STAGE_RECV; - if (set_read_event(pTask) != 0) { + task->offset += bytes; + if (task->offset >= task->length) { + task->offset = 0; + task->length = 0; + task->nio_stage = SF_NIO_STAGE_RECV; + if (set_read_event(task) != 0) { return -1; } break; diff --git a/src/sf_nio.h b/src/sf_nio.h index 611da0a..e2e9fcc 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -10,6 +10,9 @@ #include "sf_define.h" #include "sf_types.h" +#define NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0] +#define NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1] + #ifdef __cplusplus extern "C" { #endif diff --git a/src/sf_service.c b/src/sf_service.c index 46665d2..19a21c6 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -10,6 +10,9 @@ #include #include #include +#if defined(OS_LINUX) +#include +#endif #include "fastcommon/logger.h" #include "fastcommon/sockopt.h" #include "fastcommon/shared_func.h" @@ -98,7 +101,7 @@ int sf_service_init_ex(SFContext *sf_context, int bytes; struct worker_thread_context *thread_contexts; struct worker_thread_context *thread_ctx; - struct nio_thread_data *pThreadData; + struct nio_thread_data *thread_data; struct nio_thread_data *pDataEnd; pthread_t tid; pthread_attr_t thread_attr; @@ -140,19 +143,19 @@ int sf_service_init_ex(SFContext *sf_context, sf_context->thread_count = 0; pDataEnd = sf_context->thread_data + sf_context->work_threads; - for (pThreadData=sf_context->thread_data,thread_ctx=thread_contexts; - pThreadDatathread_data,thread_ctx=thread_contexts; + thread_datathread_loop_callback = thread_loop_callback; + thread_data->thread_loop_callback = thread_loop_callback; if (alloc_thread_extra_data_callback != NULL) { - pThreadData->arg = alloc_thread_extra_data_callback( - (int)(pThreadData - sf_context->thread_data)); + thread_data->arg = alloc_thread_extra_data_callback( + (int)(thread_data - sf_context->thread_data)); } else { - pThreadData->arg = NULL; + thread_data->arg = NULL; } - if (ioevent_init(&pThreadData->ev_puller, + if (ioevent_init(&thread_data->ev_puller, g_sf_global_vars.max_connections + 2, net_timeout_ms, 0) != 0) { result = errno != 0 ? errno : ENOMEM; @@ -163,7 +166,7 @@ int sf_service_init_ex(SFContext *sf_context, return result; } - result = fast_timer_init(&pThreadData->timer, + result = fast_timer_init(&thread_data->timer, 2 * g_sf_global_vars.network_timeout, g_current_time); if (result != 0) { logError("file: "__FILE__", line: %d, " @@ -173,7 +176,22 @@ int sf_service_init_ex(SFContext *sf_context, return result; } - if (pipe(pThreadData->pipe_fds) != 0) { +#if defined(OS_LINUX) + if ((NOTIFY_READ_FD(thread_data)=eventfd(0, EFD_NONBLOCK) < 0) { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "call eventfd fail, " + "errno: %d, error info: %s", + __LINE__, result, strerror(result)); + break; + } + NOTIFY_WRITE_FD(tdata) = NOTIFY_READ_FD(thread_data); + + if ((result=init_pthread_lock(&thread_data.waiting_queue.lock)) != 0) { + return result; + } +#else + if (pipe(thread_data->pipe_fds) != 0) { result = errno != 0 ? errno : EPERM; logError("file: "__FILE__", line: %d, " "call pipe fail, " @@ -181,15 +199,7 @@ int sf_service_init_ex(SFContext *sf_context, __LINE__, result, strerror(result)); break; } - -#if defined(OS_LINUX) - if ((result=fd_add_flags(pThreadData->pipe_fds[0], - O_NONBLOCK | O_NOATIME)) != 0) - { - break; - } -#else - if ((result=fd_add_flags(pThreadData->pipe_fds[0], + if ((result=fd_add_flags(NOTIFY_READ_FD(thread_data), O_NONBLOCK)) != 0) { break; @@ -197,14 +207,14 @@ int sf_service_init_ex(SFContext *sf_context, #endif thread_ctx->sf_context = sf_context; - thread_ctx->thread_data = pThreadData; + thread_ctx->thread_data = thread_data; if ((result=pthread_create(&tid, &thread_attr, worker_thread_entrance, thread_ctx)) != 0) { logError("file: "__FILE__", line: %d, " "create thread failed, startup threads: %d, " "errno: %d, error info: %s", - __LINE__, (int)(pThreadData - sf_context->thread_data), + __LINE__, (int)(thread_data - sf_context->thread_data), result, strerror(result)); break; } @@ -216,14 +226,14 @@ int sf_service_init_ex(SFContext *sf_context, int sf_service_destroy_ex(SFContext *sf_context) { - struct nio_thread_data *pDataEnd, *pThreadData; + struct nio_thread_data *pDataEnd, *thread_data; free_queue_destroy(); pDataEnd = sf_context->thread_data + sf_context->work_threads; - for (pThreadData=sf_context->thread_data; pThreadDatathread_data; thread_datatimer); + fast_timer_destroy(&thread_data->timer); } free(sf_context->thread_data); sf_context->thread_data = NULL; @@ -306,10 +316,9 @@ static void *accept_thread_entrance(void *arg) { struct accept_thread_context *accept_context; int incomesock; - long task_ptr; struct sockaddr_in inaddr; socklen_t sockaddr_len; - struct fast_task_info *pTask; + struct fast_task_info *task; char szClientIp[IP_ADDRESS_SIZE]; accept_context = (struct accept_thread_context *)arg; @@ -320,8 +329,8 @@ static void *accept_thread_entrance(void *arg) if (incomesock < 0) { //error if (!(errno == EINTR || errno == EAGAIN)) { logError("file: "__FILE__", line: %d, " - "accept fail, errno: %d, error info: %s", - __LINE__, errno, strerror(errno)); + "accept fail, errno: %d, error info: %s", + __LINE__, errno, strerror(errno)); } continue; @@ -334,39 +343,30 @@ static void *accept_thread_entrance(void *arg) continue; } - pTask = free_queue_pop(); - if (pTask == NULL) { + task = free_queue_pop(); + if (task == NULL) { logError("file: "__FILE__", line: %d, " - "malloc task buff failed, you should " - "increase the parameter: max_connections", - __LINE__); + "malloc task buff failed, you should " + "increase the parameter: max_connections", + __LINE__); close(incomesock); continue; } - strcpy(pTask->client_ip, szClientIp); + strcpy(task->client_ip, szClientIp); - pTask->ctx = accept_context->sf_context; - pTask->nio_stage = SF_NIO_STAGE_INIT; - pTask->event.fd = incomesock; - pTask->thread_data = accept_context->sf_context->thread_data + + task->ctx = accept_context->sf_context; + task->event.fd = incomesock; + task->thread_data = accept_context->sf_context->thread_data + incomesock % accept_context->sf_context->work_threads; if (accept_context->sf_context->accept_done_func != NULL) { - accept_context->sf_context->accept_done_func(pTask, + accept_context->sf_context->accept_done_func(task, accept_context->server_sock == accept_context->sf_context->inner_sock); } - task_ptr = (long)pTask; - if (write(pTask->thread_data->pipe_fds[1], &task_ptr, - sizeof(task_ptr)) != sizeof(task_ptr)) - { - logError("file: "__FILE__", line: %d, " - "call write to pipe fd: %d fail, " - "errno: %d, error info: %s", - __LINE__, pTask->thread_data->pipe_fds[1], - errno, strerror(errno)); + if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { close(incomesock); - free_queue_push(pTask); + free_queue_push(task); } }