diff --git a/src/sf_define.h b/src/sf_define.h index 15ae9c7..5539b50 100644 --- a/src/sf_define.h +++ b/src/sf_define.h @@ -10,6 +10,12 @@ #define SF_DEF_MIN_BUFF_SIZE (64 * 1024) #define SF_DEF_MAX_BUFF_SIZE (64 * 1024) +#define SF_NIO_STAGE_INIT 0 //set ioevent +#define SF_NIO_STAGE_RECV 1 //recv +#define SF_NIO_STAGE_SEND 2 //send +#define SF_NIO_STAGE_FORWARDED 3 //deal the forwarded request +#define SF_NIO_STAGE_CLOSE 4 //cleanup the task + #ifdef __cplusplus extern "C" { #endif diff --git a/src/sf_global.h b/src/sf_global.h index 563a8e1..08c4a1b 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -6,6 +6,7 @@ #include "fastcommon/common_define.h" #include "fastcommon/ini_file_reader.h" #include "fastcommon/ioevent.h" +#include "sf_define.h" typedef struct sf_connection_stat { volatile int current_count; diff --git a/src/sf_nio.c b/src/sf_nio.c index 309b766..286953e 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -89,45 +89,6 @@ void sf_task_finish_clean_up(struct fast_task_info *pTask) free_queue_push(pTask); } -void sf_recv_notify_read(int sock, short event, void *arg) -{ - int bytes; - int current_connections; - long task_ptr; - struct fast_task_info *pTask; - - while (1) { - if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { - logError("file: "__FILE__", line: %d, " - "call read failed, " - "errno: %d, error info: %s", - __LINE__, errno, strerror(errno)); - } - - break; - } - else if (bytes == 0) { - break; - } - - current_connections = __sync_add_and_fetch( - &g_sf_global_vars.connection_stat.current_count, 1); - if (current_connections > g_sf_global_vars.connection_stat.max_count) { - g_sf_global_vars.connection_stat.max_count = current_connections; - } - - pTask = (struct fast_task_info *)task_ptr; - if (ioevent_set(pTask, pTask->thread_data, pTask->event.fd, - IOEVENT_READ, (IOEventCallback)sf_client_sock_read, - g_sf_global_vars.network_timeout) != 0) - { - sf_task_cleanup_func(pTask); - continue; - } - } -} - static inline int set_write_event(struct fast_task_info *pTask) { int result; @@ -177,6 +138,102 @@ static inline int set_read_event(struct fast_task_info *pTask) return 0; } +int sf_nio_notify(struct fast_task_info *pTask, const int stage) +{ + long task_addr; + + task_addr = (long)pTask; + pTask->nio_stage = stage; + if (write(pTask->thread_data->pipe_fds[1], &task_addr, + sizeof(task_addr)) != sizeof(task_addr)) + { + int result; + result = errno != 0 ? errno : EIO; + logError("file: "__FILE__", line: %d, " + "write to pipe %d fail, errno: %d, error info: %s", + __LINE__, pTask->thread_data->pipe_fds[1], + result, STRERROR(result)); + return result; + } + + return 0; +} + +static int sf_nio_init(struct fast_task_info *pTask) +{ + int current_connections; + int result; + + current_connections = __sync_add_and_fetch( + &g_sf_global_vars.connection_stat.current_count, 1); + if (current_connections > g_sf_global_vars.connection_stat.max_count) { + g_sf_global_vars.connection_stat.max_count = current_connections; + } + + pTask->nio_stage = SF_NIO_STAGE_RECV; + result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd, + IOEVENT_READ, (IOEventCallback)sf_client_sock_read, + g_sf_global_vars.network_timeout); + return result > 0 ? -1 * result : result; +} + +void sf_recv_notify_read(int sock, short event, void *arg) +{ + int bytes; + int result; + long task_ptr; + struct fast_task_info *pTask; + + while (1) { + if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) { + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { + logError("file: "__FILE__", line: %d, " + "call read failed, " + "errno: %d, error info: %s", + __LINE__, errno, strerror(errno)); + } + + break; + } + else if (bytes == 0) { + break; + } + + pTask = (struct fast_task_info *)task_ptr; + switch (pTask->nio_stage) { + case SF_NIO_STAGE_INIT: + result = sf_nio_init(pTask); + break; + case SF_NIO_STAGE_RECV: + if ((result=set_read_event(pTask)) == 0) + { + sf_client_sock_read(pTask->event.fd, + IOEVENT_READ, pTask); + } + break; + case SF_NIO_STAGE_SEND: + result = sf_send_add_event(pTask); + break; + case SF_NIO_STAGE_FORWARDED: + result = sf_deal_task(pTask); + break; + case SF_NIO_STAGE_CLOSE: + result = -EIO; //close this socket + break; + default: + logError("file: "__FILE__", line: %d, " + "client ip: %s, invalid stage: %d", + __LINE__, pTask->client_ip, pTask->nio_stage); + result = -EINVAL; + break; + } + + if (result < 0) { + sf_task_cleanup_func(pTask); + } + } +} + int sf_send_add_event(struct fast_task_info *pTask) { pTask->offset = 0; @@ -342,6 +399,7 @@ int sf_client_sock_read(int sock, short event, void *arg) if (pTask->offset >= pTask->length) { //recv done pTask->req_count++; + pTask->nio_stage = SF_NIO_STAGE_SEND; if (sf_deal_task(pTask) < 0) { //fatal error sf_task_cleanup_func(pTask); return -1; @@ -427,6 +485,7 @@ int sf_client_sock_write(int sock, short event, void *arg) if (pTask->offset >= pTask->length) { pTask->offset = 0; pTask->length = 0; + pTask->nio_stage = SF_NIO_STAGE_RECV; if (set_read_event(pTask) != 0) { return -1; } diff --git a/src/sf_nio.h b/src/sf_nio.h index 45e2cef..4574b75 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -27,6 +27,8 @@ int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *pTask); +int sf_nio_notify(struct fast_task_info *pTask, const int stage); + static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask) { return (pTask->event.callback == (IOEventCallback)sf_client_sock_read); diff --git a/src/sf_service.c b/src/sf_service.c index 0812aa4..4c7cbf8 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -265,7 +265,8 @@ static void *accept_thread_entrance(void* arg) server_sock = (long)arg; while (g_sf_global_vars.continue_flag) { sockaddr_len = sizeof(inaddr); - incomesock = accept(server_sock, (struct sockaddr*)&inaddr, &sockaddr_len); + incomesock = accept(server_sock, (struct sockaddr*)&inaddr, + &sockaddr_len); if (incomesock < 0) { //error if (!(errno == EINTR || errno == EAGAIN)) { logError("file: "__FILE__", line: %d, " @@ -294,8 +295,10 @@ static void *accept_thread_entrance(void* arg) } strcpy(pTask->client_ip, szClientIp); + pTask->nio_stage = SF_NIO_STAGE_INIT; pTask->event.fd = incomesock; - pTask->thread_data = g_sf_global_vars.thread_data + incomesock % g_sf_global_vars.work_threads; + pTask->thread_data = g_sf_global_vars.thread_data + incomesock % + g_sf_global_vars.work_threads; if (sf_accept_done_func != NULL) { sf_accept_done_func(pTask, server_sock == g_server_inner_sock); }