From e181349daa78822e00537dd8fcf2dd3aec2c4ba0 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 3 Mar 2020 10:21:10 +0800 Subject: [PATCH] add function sf_nio_forward_request --- src/sf_nio.c | 61 ++++++++++++++++++++++++++++++++++------------------ src/sf_nio.h | 11 ++++++++++ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/src/sf_nio.c b/src/sf_nio.c index 5fb3f79..db957e2 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -22,7 +22,6 @@ #include "fastcommon/fast_task_queue.h" #include "fastcommon/ioevent_loop.h" #include "sf_global.h" -#include "sf_types.h" #include "sf_nio.h" static int sf_header_size = 0; @@ -54,10 +53,32 @@ TaskCleanUpCallback sf_get_task_cleanup_func() return sf_task_cleanup_func; } +static void sf_task_detach_thread(struct fast_task_info *pTask) +{ + ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd); + + if (pTask->event.timer.expires > 0) { + fast_timer_remove(&pTask->thread_data->timer, + &pTask->event.timer); + pTask->event.timer.expires = 0; + } + + if (sf_remove_from_ready_list) { + ioevent_remove(&pTask->thread_data->ev_puller, pTask); + } +} + +void sf_task_switch_thread(struct fast_task_info *pTask, + const int new_thread_index) +{ + sf_task_detach_thread(pTask); + pTask->thread_data = g_sf_global_vars.thread_data + new_thread_index; +} + void sf_task_finish_clean_up(struct fast_task_info *pTask) { - assert(pTask->event.fd >= 0); /* + assert(pTask->event.fd >= 0); if (pTask->event.fd < 0) { logWarning("file: "__FILE__", line: %d, " "pTask: %p already cleaned", @@ -71,20 +92,10 @@ void sf_task_finish_clean_up(struct fast_task_info *pTask) pTask->finish_callback = NULL; } - ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd); + sf_task_detach_thread(pTask); close(pTask->event.fd); pTask->event.fd = -1; - if (pTask->event.timer.expires > 0) { - fast_timer_remove(&pTask->thread_data->timer, - &pTask->event.timer); - pTask->event.timer.expires = 0; - } - - if (sf_remove_from_ready_list) { - ioevent_remove(&pTask->thread_data->ev_puller, pTask); - } - __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); free_queue_push(pTask); } @@ -159,10 +170,19 @@ int sf_nio_notify(struct fast_task_info *pTask, const int stage) return 0; } +static int sf_ioevent_add(struct fast_task_info *pTask) +{ + int result; + + result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd, + IOEVENT_READ, (IOEventCallback)sf_client_sock_read, + g_sf_global_vars.network_timeout); + return result > 0 ? -1 * result : result; +} + static int sf_nio_init(struct fast_task_info *pTask) { int current_connections; - int result; current_connections = __sync_add_and_fetch( &g_sf_global_vars.connection_stat.current_count, 1); @@ -170,11 +190,7 @@ static int sf_nio_init(struct fast_task_info *pTask) g_sf_global_vars.connection_stat.max_count = current_connections; } - pTask->nio_stage = SF_NIO_STAGE_RECV; - result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd, - IOEVENT_READ, (IOEventCallback)sf_client_sock_read, - g_sf_global_vars.network_timeout); - return result > 0 ? -1 * result : result; + return sf_ioevent_add(pTask); } void sf_recv_notify_read(int sock, short event, void *arg) @@ -202,6 +218,7 @@ void sf_recv_notify_read(int sock, short event, void *arg) pTask = (struct fast_task_info *)task_ptr; switch (pTask->nio_stage) { case SF_NIO_STAGE_INIT: + pTask->nio_stage = SF_NIO_STAGE_RECV; result = sf_nio_init(pTask); break; case SF_NIO_STAGE_RECV: @@ -214,8 +231,10 @@ void sf_recv_notify_read(int sock, short event, void *arg) case SF_NIO_STAGE_SEND: result = sf_send_add_event(pTask); break; - case SF_NIO_STAGE_FORWARDED: - result = sf_deal_task(pTask); + case SF_NIO_STAGE_FORWARDED: //forward by other thread + if ((result=sf_ioevent_add(pTask)) == 0) { + result = sf_deal_task(pTask); + } break; case SF_NIO_STAGE_CLOSE: result = -EIO; //close this socket diff --git a/src/sf_nio.h b/src/sf_nio.h index 4574b75..c800674 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -7,6 +7,7 @@ #include #include #include "fastcommon/fast_task_queue.h" +#include "sf_define.h" #include "sf_types.h" #ifdef __cplusplus @@ -27,8 +28,18 @@ int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *pTask); +void sf_task_switch_thread(struct fast_task_info *pTask, + const int new_thread_index); + int sf_nio_notify(struct fast_task_info *pTask, const int stage); +static inline int sf_nio_forward_request(struct fast_task_info *pTask, + const int new_thread_index) +{ + sf_task_switch_thread(pTask, new_thread_index); + return sf_nio_notify(pTask, SF_NIO_STAGE_FORWARDED); +} + static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask) { return (pTask->event.callback == (IOEventCallback)sf_client_sock_read);