add function sf_nio_forward_request

connection_manager
YuQing 2020-03-03 10:21:10 +08:00
parent 5825a64e72
commit e181349daa
2 changed files with 51 additions and 21 deletions

View File

@ -22,7 +22,6 @@
#include "fastcommon/fast_task_queue.h" #include "fastcommon/fast_task_queue.h"
#include "fastcommon/ioevent_loop.h" #include "fastcommon/ioevent_loop.h"
#include "sf_global.h" #include "sf_global.h"
#include "sf_types.h"
#include "sf_nio.h" #include "sf_nio.h"
static int sf_header_size = 0; static int sf_header_size = 0;
@ -54,10 +53,32 @@ TaskCleanUpCallback sf_get_task_cleanup_func()
return sf_task_cleanup_func; return sf_task_cleanup_func;
} }
static void sf_task_detach_thread(struct fast_task_info *pTask)
{
ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd);
if (pTask->event.timer.expires > 0) {
fast_timer_remove(&pTask->thread_data->timer,
&pTask->event.timer);
pTask->event.timer.expires = 0;
}
if (sf_remove_from_ready_list) {
ioevent_remove(&pTask->thread_data->ev_puller, pTask);
}
}
void sf_task_switch_thread(struct fast_task_info *pTask,
const int new_thread_index)
{
sf_task_detach_thread(pTask);
pTask->thread_data = g_sf_global_vars.thread_data + new_thread_index;
}
void sf_task_finish_clean_up(struct fast_task_info *pTask) void sf_task_finish_clean_up(struct fast_task_info *pTask)
{ {
assert(pTask->event.fd >= 0);
/* /*
assert(pTask->event.fd >= 0);
if (pTask->event.fd < 0) { if (pTask->event.fd < 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"pTask: %p already cleaned", "pTask: %p already cleaned",
@ -71,20 +92,10 @@ void sf_task_finish_clean_up(struct fast_task_info *pTask)
pTask->finish_callback = NULL; pTask->finish_callback = NULL;
} }
ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd); sf_task_detach_thread(pTask);
close(pTask->event.fd); close(pTask->event.fd);
pTask->event.fd = -1; pTask->event.fd = -1;
if (pTask->event.timer.expires > 0) {
fast_timer_remove(&pTask->thread_data->timer,
&pTask->event.timer);
pTask->event.timer.expires = 0;
}
if (sf_remove_from_ready_list) {
ioevent_remove(&pTask->thread_data->ev_puller, pTask);
}
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
free_queue_push(pTask); free_queue_push(pTask);
} }
@ -159,10 +170,19 @@ int sf_nio_notify(struct fast_task_info *pTask, const int stage)
return 0; return 0;
} }
static int sf_ioevent_add(struct fast_task_info *pTask)
{
int result;
result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd,
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
g_sf_global_vars.network_timeout);
return result > 0 ? -1 * result : result;
}
static int sf_nio_init(struct fast_task_info *pTask) static int sf_nio_init(struct fast_task_info *pTask)
{ {
int current_connections; int current_connections;
int result;
current_connections = __sync_add_and_fetch( current_connections = __sync_add_and_fetch(
&g_sf_global_vars.connection_stat.current_count, 1); &g_sf_global_vars.connection_stat.current_count, 1);
@ -170,11 +190,7 @@ static int sf_nio_init(struct fast_task_info *pTask)
g_sf_global_vars.connection_stat.max_count = current_connections; g_sf_global_vars.connection_stat.max_count = current_connections;
} }
pTask->nio_stage = SF_NIO_STAGE_RECV; return sf_ioevent_add(pTask);
result = ioevent_set(pTask, pTask->thread_data, pTask->event.fd,
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
g_sf_global_vars.network_timeout);
return result > 0 ? -1 * result : result;
} }
void sf_recv_notify_read(int sock, short event, void *arg) void sf_recv_notify_read(int sock, short event, void *arg)
@ -202,6 +218,7 @@ void sf_recv_notify_read(int sock, short event, void *arg)
pTask = (struct fast_task_info *)task_ptr; pTask = (struct fast_task_info *)task_ptr;
switch (pTask->nio_stage) { switch (pTask->nio_stage) {
case SF_NIO_STAGE_INIT: case SF_NIO_STAGE_INIT:
pTask->nio_stage = SF_NIO_STAGE_RECV;
result = sf_nio_init(pTask); result = sf_nio_init(pTask);
break; break;
case SF_NIO_STAGE_RECV: case SF_NIO_STAGE_RECV:
@ -214,8 +231,10 @@ void sf_recv_notify_read(int sock, short event, void *arg)
case SF_NIO_STAGE_SEND: case SF_NIO_STAGE_SEND:
result = sf_send_add_event(pTask); result = sf_send_add_event(pTask);
break; break;
case SF_NIO_STAGE_FORWARDED: case SF_NIO_STAGE_FORWARDED: //forward by other thread
result = sf_deal_task(pTask); if ((result=sf_ioevent_add(pTask)) == 0) {
result = sf_deal_task(pTask);
}
break; break;
case SF_NIO_STAGE_CLOSE: case SF_NIO_STAGE_CLOSE:
result = -EIO; //close this socket result = -EIO; //close this socket

View File

@ -7,6 +7,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "fastcommon/fast_task_queue.h" #include "fastcommon/fast_task_queue.h"
#include "sf_define.h"
#include "sf_types.h" #include "sf_types.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -27,8 +28,18 @@ int sf_client_sock_read(int sock, short event, void *arg);
void sf_task_finish_clean_up(struct fast_task_info *pTask); void sf_task_finish_clean_up(struct fast_task_info *pTask);
void sf_task_switch_thread(struct fast_task_info *pTask,
const int new_thread_index);
int sf_nio_notify(struct fast_task_info *pTask, const int stage); int sf_nio_notify(struct fast_task_info *pTask, const int stage);
static inline int sf_nio_forward_request(struct fast_task_info *pTask,
const int new_thread_index)
{
sf_task_switch_thread(pTask, new_thread_index);
return sf_nio_notify(pTask, SF_NIO_STAGE_FORWARDED);
}
static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask) static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask)
{ {
return (pTask->event.callback == (IOEventCallback)sf_client_sock_read); return (pTask->event.callback == (IOEventCallback)sf_client_sock_read);