task nio_stage use atomic opt.

connection_manager
YuQing 2020-09-09 14:46:58 +08:00
parent 9bee7ad62b
commit b60912bfd6
4 changed files with 134 additions and 53 deletions

View File

@ -13,11 +13,22 @@
#define SF_NIO_STAGE_INIT 0 //set ioevent #define SF_NIO_STAGE_INIT 0 //set ioevent
#define SF_NIO_STAGE_CONNECT 1 //do connect (client only) #define SF_NIO_STAGE_CONNECT 1 //do connect (client only)
#define SF_NIO_STAGE_HANDSHAKE 2 //notify the thread to handshake (client only) #define SF_NIO_STAGE_HANDSHAKE 2 //notify the thread to handshake (client only)
#define SF_NIO_STAGE_RECV 3 //do recv #define SF_NIO_STAGE_RECV 4 //do recv
#define SF_NIO_STAGE_SEND 4 //do send #define SF_NIO_STAGE_SEND 8 //do send
#define SF_NIO_STAGE_FORWARDED 5 //deal the forwarded request #define SF_NIO_STAGE_FORWARDED 16 //deal the forwarded request
#define SF_NIO_STAGE_CONTINUE 6 //notify the thread continue deal #define SF_NIO_STAGE_CONTINUE 32 //notify the thread continue deal
#define SF_NIO_STAGE_CLOSE 9 //cleanup the task #define SF_NIO_STAGE_CLOSE 256 //cleanup the task
#define SF_NIO_FLAG_INPROGRESS 1024
#define SF_NIO_STAGE_FLAGS (SF_NIO_FLAG_INPROGRESS)
#define SF_NIO_STAGE_RECV_INPROGRESS (SF_NIO_STAGE_RECV | SF_NIO_FLAG_INPROGRESS)
#define SF_NIO_STAGE_SEND_INPROGRESS (SF_NIO_STAGE_SEND | SF_NIO_FLAG_INPROGRESS)
#define SF_NIO_TASK_STAGE_FETCH(task) __sync_add_and_fetch(&task->nio_stage, 0)
#define SF_NIO_STAGE_ONLY(stage) (stage & (~SF_NIO_STAGE_FLAGS))
#define SF_NIO_STAGE_IS_INPROGRESS(stage) \
((stage & SF_NIO_FLAG_INPROGRESS) != 0)
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {

View File

@ -50,7 +50,9 @@ extern SFContext g_sf_context;
#define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag #define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag
#define SF_G_CONNECT_TIMEOUT g_sf_global_vars.connect_timeout #define SF_G_CONNECT_TIMEOUT g_sf_global_vars.connect_timeout
#define SF_G_NETWORK_TIMEOUT g_sf_global_vars.network_timeout #define SF_G_NETWORK_TIMEOUT g_sf_global_vars.network_timeout
#define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections
#define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size
#define SF_G_WORK_THREADS g_sf_context.work_threads #define SF_G_WORK_THREADS g_sf_context.work_threads
#define SF_G_ALIVE_THREAD_COUNT g_sf_context.thread_count #define SF_G_ALIVE_THREAD_COUNT g_sf_context.thread_count
#define SF_G_THREAD_INDEX(tdata) (int)(tdata - g_sf_context.thread_data) #define SF_G_THREAD_INDEX(tdata) (int)(tdata - g_sf_context.thread_data)

View File

@ -8,7 +8,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <errno.h> #include <errno.h>
#include <signal.h> #include <signal.h>
#include <assert.h> //#include <assert.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -113,7 +113,7 @@ int sf_set_read_event(struct fast_task_info *task)
{ {
int result; int result;
task->nio_stage = SF_NIO_STAGE_RECV; sf_nio_set_stage(task, SF_NIO_STAGE_RECV);
if (task->event.callback == (IOEventCallback)sf_client_sock_read) { if (task->event.callback == (IOEventCallback)sf_client_sock_read) {
return 0; return 0;
} }
@ -184,7 +184,7 @@ static int sf_client_sock_connect(int sock, short event, void *arg)
return -1; return -1;
} }
task->nio_stage = SF_NIO_STAGE_HANDSHAKE; sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE);
return SF_CTX->deal_task(task); return SF_CTX->deal_task(task);
} }
@ -207,7 +207,7 @@ static int sf_connect_server(struct fast_task_info *task)
return result; return result;
} }
task->nio_stage = SF_NIO_STAGE_HANDSHAKE; sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE);
return SF_CTX->deal_task(task); return SF_CTX->deal_task(task);
} else if (result == EINPROGRESS) { } else if (result == EINPROGRESS) {
return sf_ioevent_add(task, (IOEventCallback) return sf_ioevent_add(task, (IOEventCallback)
@ -224,9 +224,12 @@ static int sf_connect_server(struct fast_task_info *task)
static int sf_nio_deal_task(struct fast_task_info *task) static int sf_nio_deal_task(struct fast_task_info *task)
{ {
int result; int result;
switch (task->nio_stage) { int stage;
stage = SF_NIO_TASK_STAGE_FETCH(task);
switch (SF_NIO_STAGE_ONLY(stage)) {
case SF_NIO_STAGE_INIT: case SF_NIO_STAGE_INIT:
task->nio_stage = SF_NIO_STAGE_RECV; sf_nio_set_stage(task, SF_NIO_STAGE_RECV);
result = sf_nio_init(task); result = sf_nio_init(task);
break; break;
case SF_NIO_STAGE_CONNECT: case SF_NIO_STAGE_CONNECT:
@ -259,7 +262,7 @@ static int sf_nio_deal_task(struct fast_task_info *task)
default: default:
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, invalid stage: %d", "client ip: %s, invalid stage: %d",
__LINE__, task->client_ip, task->nio_stage); __LINE__, task->client_ip, stage);
result = -EINVAL; result = -EINVAL;
break; break;
} }
@ -271,13 +274,37 @@ static int sf_nio_deal_task(struct fast_task_info *task)
return result; return result;
} }
int sf_nio_notify(struct fast_task_info *task, const int stage) int sf_nio_notify(struct fast_task_info *task, const int new_stage)
{ {
int64_t n; int64_t n;
int result; int result;
int old_stage;
bool notify; bool notify;
task->nio_stage = stage; old_stage = SF_NIO_TASK_STAGE_FETCH(task);
if (!(new_stage == SF_NIO_STAGE_INIT ||
new_stage == SF_NIO_STAGE_CONNECT ||
new_stage == SF_NIO_STAGE_CLOSE))
{
if (SF_NIO_STAGE_IS_INPROGRESS(old_stage)) {
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, nio stage in progress, "
"current stage: %d, skip set to %d", __LINE__,
task->client_ip, old_stage, new_stage);
return EBUSY;
}
}
if (!__sync_bool_compare_and_swap(&task->nio_stage,
old_stage, new_stage))
{
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, skip set stage to %d because stage "
"changed, current stage: %d", __LINE__, task->client_ip,
new_stage, SF_NIO_TASK_STAGE_FETCH(task));
return EEXIST;
}
task->next = NULL; task->next = NULL;
pthread_mutex_lock(&task->thread_data->waiting_queue.lock); pthread_mutex_lock(&task->thread_data->waiting_queue.lock);
@ -342,6 +369,7 @@ int sf_send_add_event(struct fast_task_info *task)
task->offset = 0; task->offset = 0;
if (task->length > 0) { if (task->length > 0) {
/* direct send */ /* direct send */
sf_nio_set_stage(task, SF_NIO_STAGE_SEND);
if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) {
return errno != 0 ? errno : EIO; return errno != 0 ? errno : EIO;
} }
@ -352,17 +380,19 @@ int sf_send_add_event(struct fast_task_info *task)
int sf_client_sock_read(int sock, short event, void *arg) int sf_client_sock_read(int sock, short event, void *arg)
{ {
int stage;
int bytes; int bytes;
int recv_bytes; int recv_bytes;
int total_read; int total_read;
struct fast_task_info *task; struct fast_task_info *task;
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if (task->canceled || (task->nio_stage != SF_NIO_STAGE_RECV)) { stage = SF_NIO_TASK_STAGE_FETCH(task);
if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_RECV)) {
return 0; return 0;
} }
assert(sock >= 0); //assert(sock >= 0);
if (event & IOEVENT_TIMEOUT) { if (event & IOEVENT_TIMEOUT) {
if (task->offset == 0 && task->req_count > 0) { if (task->offset == 0 && task->req_count > 0) {
if (SF_CTX->timeout_callback != NULL) { if (SF_CTX->timeout_callback != NULL) {
@ -376,16 +406,14 @@ int sf_client_sock_read(int sock, short event, void *arg)
task->network_timeout; task->network_timeout;
fast_timer_add(&task->thread_data->timer, fast_timer_add(&task->thread_data->timer,
&task->event.timer); &task->event.timer);
} } else {
else {
if (task->length > 0) { if (task->length > 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv timeout, " "client ip: %s, recv timeout, "
"recv offset: %d, expect length: %d", "recv offset: %d, expect length: %d",
__LINE__, task->client_ip, __LINE__, task->client_ip,
task->offset, task->length); task->offset, task->length);
} } else {
else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, req_count: %"PRId64", recv timeout", "client ip: %s, req_count: %"PRId64", recv timeout",
__LINE__, task->client_ip, task->req_count); __LINE__, task->client_ip, task->req_count);
@ -407,6 +435,18 @@ int sf_client_sock_read(int sock, short event, void *arg)
return -1; return -1;
} }
if (stage != SF_NIO_STAGE_RECV_INPROGRESS) {
if (!__sync_bool_compare_and_swap(&task->nio_stage,
stage, SF_NIO_STAGE_RECV_INPROGRESS))
{
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, nio stage change from %d to %d, "
"skip read!", __LINE__, task->client_ip, stage,
SF_NIO_TASK_STAGE_FETCH(task));
return 0;
}
}
total_read = 0; total_read = 0;
while (1) { while (1) {
fast_timer_modify(&task->thread_data->timer, fast_timer_modify(&task->thread_data->timer,
@ -414,8 +454,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
task->network_timeout); task->network_timeout);
if (task->length == 0) { //recv header if (task->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->offset; recv_bytes = SF_CTX->header_size - task->offset;
} } else {
else {
recv_bytes = task->length - task->offset; recv_bytes = task->length - task->offset;
} }
@ -423,14 +462,12 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (bytes < 0) { if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; break;
} } else if (errno == EINTR) { //should retry
else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal", "client ip: %s, ignore interupt signal",
__LINE__, task->client_ip); __LINE__, task->client_ip);
continue; continue;
} } else {
else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, recv fail, " "client ip: %s, recv fail, "
"errno: %d, error info: %s", "errno: %d, error info: %s",
@ -440,8 +477,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
iovent_add_to_deleted_list(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
} } else if (bytes == 0) {
else if (bytes == 0) {
if (task->offset > 0) { if (task->offset > 0) {
if (task->length > 0) { if (task->length > 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
@ -450,16 +486,14 @@ int sf_client_sock_read(int sock, short event, void *arg)
"recv pkg length: %d", __LINE__, "recv pkg length: %d", __LINE__,
task->client_ip, task->length, task->client_ip, task->length,
task->offset); task->offset);
} } else {
else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, connection " "client ip: %s, connection "
"disconnected, recv pkg length: %d", "disconnected, recv pkg length: %d",
__LINE__, task->client_ip, __LINE__, task->client_ip,
task->offset); task->offset);
} }
} } else {
else {
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, recv fail, " "client ip: %s, sock: %d, recv fail, "
"connection disconnected", "connection disconnected",
@ -537,7 +571,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (task->offset >= task->length) { //recv done if (task->offset >= task->length) { //recv done
task->req_count++; task->req_count++;
task->nio_stage = SF_NIO_STAGE_SEND; sf_nio_set_stage(task, SF_NIO_STAGE_SEND);
if (SF_CTX->deal_task(task) < 0) { //fatal error if (SF_CTX->deal_task(task) < 0) { //fatal error
iovent_add_to_deleted_list(task); iovent_add_to_deleted_list(task);
return -1; return -1;
@ -551,13 +585,15 @@ int sf_client_sock_read(int sock, short event, void *arg)
int sf_client_sock_write(int sock, short event, void *arg) int sf_client_sock_write(int sock, short event, void *arg)
{ {
int stage;
int bytes; int bytes;
int total_write; int total_write;
struct fast_task_info *task; struct fast_task_info *task;
assert(sock >= 0); //assert(sock >= 0);
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if (task->canceled) { stage = SF_NIO_TASK_STAGE_FETCH(task);
if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_SEND)) {
return 0; return 0;
} }
@ -580,6 +616,18 @@ int sf_client_sock_write(int sock, short event, void *arg)
return -1; return -1;
} }
if (stage != SF_NIO_STAGE_SEND_INPROGRESS) {
if (!__sync_bool_compare_and_swap(&task->nio_stage,
stage, SF_NIO_STAGE_SEND_INPROGRESS))
{
logWarning("file: "__FILE__", line: %d, "
"client ip: %s, nio stage change from %d to %d, "
"skip write!", __LINE__, task->client_ip, stage,
SF_NIO_TASK_STAGE_FETCH(task));
return 0;
}
}
total_write = 0; total_write = 0;
while (1) { while (1) {
fast_timer_modify(&task->thread_data->timer, fast_timer_modify(&task->thread_data->timer,
@ -595,14 +643,12 @@ int sf_client_sock_write(int sock, short event, void *arg)
return -1; return -1;
} }
break; break;
} } else if (errno == EINTR) { //should retry
else if (errno == EINTR) { //should retry
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"client ip: %s, ignore interupt signal", "client ip: %s, ignore interupt signal",
__LINE__, task->client_ip); __LINE__, task->client_ip);
continue; continue;
} } else {
else {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, send fail, " "client ip: %s, send fail, "
"errno: %d, error info: %s", "errno: %d, error info: %s",
@ -612,8 +658,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
iovent_add_to_deleted_list(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
} } else if (bytes == 0) {
else if (bytes == 0) {
logWarning("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"client ip: %s, sock: %d, send failed, " "client ip: %s, sock: %d, send failed, "
"connection disconnected", "connection disconnected",

View File

@ -44,29 +44,52 @@ static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex(
sf_get_task_cleanup_func_ex(&g_sf_context) sf_get_task_cleanup_func_ex(&g_sf_context)
void sf_recv_notify_read(int sock, short event, void *arg); void sf_recv_notify_read(int sock, short event, void *arg);
int sf_send_add_event(struct fast_task_info *pTask); int sf_send_add_event(struct fast_task_info *task);
int sf_client_sock_write(int sock, short event, void *arg); int sf_client_sock_write(int sock, short event, void *arg);
int sf_client_sock_read(int sock, short event, void *arg); int sf_client_sock_read(int sock, short event, void *arg);
void sf_task_finish_clean_up(struct fast_task_info *pTask); void sf_task_finish_clean_up(struct fast_task_info *task);
int sf_nio_notify(struct fast_task_info *pTask, const int stage); int sf_nio_notify(struct fast_task_info *task, const int new_stage);
int sf_set_read_event(struct fast_task_info *task); int sf_set_read_event(struct fast_task_info *task);
void sf_task_switch_thread(struct fast_task_info *pTask, void sf_task_switch_thread(struct fast_task_info *task,
const int new_thread_index); const int new_thread_index);
static inline int sf_nio_forward_request(struct fast_task_info *pTask, static inline int sf_nio_forward_request(struct fast_task_info *task,
const int new_thread_index) const int new_thread_index)
{ {
sf_task_switch_thread(pTask, new_thread_index); sf_task_switch_thread(task, new_thread_index);
return sf_nio_notify(pTask, SF_NIO_STAGE_FORWARDED); return sf_nio_notify(task, SF_NIO_STAGE_FORWARDED);
} }
static inline bool sf_client_sock_in_read_stage(struct fast_task_info *pTask) static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task)
{ {
return (pTask->event.callback == (IOEventCallback)sf_client_sock_read); return (task->event.callback == (IOEventCallback)sf_client_sock_read);
}
static inline void sf_nio_set_stage(struct fast_task_info *task,
const int new_stage)
{
int old_stage;
old_stage = __sync_add_and_fetch(&task->nio_stage, 0);
if (new_stage != old_stage) {
__sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage);
}
}
static inline bool sf_nio_swap_stage(struct fast_task_info *task,
const int old_stage, const int new_stage)
{
return __sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage);
}
static inline bool sf_nio_task_inprogress(struct fast_task_info *task)
{
int stage;
stage = __sync_add_and_fetch(&task->nio_stage, 0);
return SF_NIO_STAGE_IS_INPROGRESS(stage);
} }
#ifdef __cplusplus #ifdef __cplusplus