diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 25c6d0f..c30b21f 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -333,7 +333,7 @@ int idempotency_client_channel_push(struct idempotency_client_channel *channel, if (notify) { if (__sync_add_and_fetch(&channel->in_ioevent, 0)) { if (__sync_add_and_fetch(&channel->established, 0)) { - sf_nio_notify_silence(channel->task, SF_NIO_STAGE_CONTINUE); + sf_nio_notify(channel->task, SF_NIO_STAGE_CONTINUE); } } else { return idempotency_client_channel_check_reconnect(channel); diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 99582d6..253e576 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -324,13 +324,11 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) return 0; } -static int receipt_deal_task(struct fast_task_info *task) +static int receipt_deal_task(struct fast_task_info *task, const int stage) { int result; - int stage; do { - stage = SF_NIO_TASK_STAGE_FETCH(task); if (stage == SF_NIO_STAGE_HANDSHAKE) { setup_channel_request(task); result = 0; diff --git a/src/sf_define.h b/src/sf_define.h index d7b9d45..fe009d0 100644 --- a/src/sf_define.h +++ b/src/sf_define.h @@ -28,23 +28,13 @@ #define SF_NIO_STAGE_INIT 0 //set ioevent #define SF_NIO_STAGE_CONNECT 1 //do connect (client only) #define SF_NIO_STAGE_HANDSHAKE 2 //notify the thread to handshake (client only) -#define SF_NIO_STAGE_RECV 4 //do recv -#define SF_NIO_STAGE_SEND 8 //do send -#define SF_NIO_STAGE_FORWARDED 16 //deal the forwarded request -#define SF_NIO_STAGE_CONTINUE 32 //notify the thread continue deal -#define SF_NIO_STAGE_CLOSE 256 //cleanup the task - -#define SF_NIO_FLAG_INPROGRESS 1024 -#define SF_NIO_STAGE_FLAGS (SF_NIO_FLAG_INPROGRESS) -#define SF_NIO_STAGE_RECV_INPROGRESS (SF_NIO_STAGE_RECV | SF_NIO_FLAG_INPROGRESS) -#define SF_NIO_STAGE_SEND_INPROGRESS (SF_NIO_STAGE_SEND | SF_NIO_FLAG_INPROGRESS) - -#define SF_NIO_TASK_STAGE_FETCH(task) __sync_add_and_fetch(&task->nio_stage, 0) -#define SF_NIO_STAGE_ONLY(stage) (stage & (~SF_NIO_STAGE_FLAGS)) - -#define SF_NIO_STAGE_IS_INPROGRESS(stage) \ - ((stage & SF_NIO_FLAG_INPROGRESS) != 0) +#define SF_NIO_STAGE_RECV 3 //do recv +#define SF_NIO_STAGE_SEND 4 //do send +#define SF_NIO_STAGE_FORWARDED 5 //deal the forwarded request +#define SF_NIO_STAGE_CONTINUE 6 //notify the thread continue deal +#define SF_NIO_STAGE_CLOSE 127 //cleanup the task +#define SF_NIO_TASK_STAGE_FETCH(task) task->nio_stages.current #define SF_CLUSTER_ERROR_BINLOG_INCONSISTENT 9998 #define SF_CLUSTER_ERROR_LEADER_INCONSISTENT 9999 diff --git a/src/sf_nio.c b/src/sf_nio.c index b5c937b..e7659e3 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -128,7 +128,7 @@ int sf_set_read_event(struct fast_task_info *task) { int result; - sf_nio_set_stage(task, SF_NIO_STAGE_RECV); + task->nio_stages.current = SF_NIO_STAGE_RECV; if (task->event.callback == (IOEventCallback)sf_client_sock_read) { return 0; } @@ -202,8 +202,7 @@ static int sf_client_sock_connect(int sock, short event, void *arg) logInfo("file: "__FILE__", line: %d, " "connect to server %s:%u successfully", __LINE__, task->server_ip, task->port); - sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE); - return SF_CTX->deal_task(task); + return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); } static int sf_connect_server(struct fast_task_info *task) @@ -228,8 +227,7 @@ static int sf_connect_server(struct fast_task_info *task) logInfo("file: "__FILE__", line: %d, " "connect to server %s:%u successfully", __LINE__, task->server_ip, task->port); - sf_nio_set_stage(task, SF_NIO_STAGE_HANDSHAKE); - return SF_CTX->deal_task(task); + return SF_CTX->deal_task(task, SF_NIO_STAGE_HANDSHAKE); } else if (result == EINPROGRESS) { result = ioevent_set(task, task->thread_data, task->event.fd, IOEVENT_READ | IOEVENT_WRITE, (IOEventCallback) @@ -249,12 +247,10 @@ static int sf_connect_server(struct fast_task_info *task) static int sf_nio_deal_task(struct fast_task_info *task) { int result; - int stage; - stage = SF_NIO_TASK_STAGE_FETCH(task); - switch (SF_NIO_STAGE_ONLY(stage)) { + switch (task->nio_stages.notify) { case SF_NIO_STAGE_INIT: - sf_nio_set_stage(task, SF_NIO_STAGE_RECV); + task->nio_stages.current = SF_NIO_STAGE_RECV; result = sf_nio_init(task); break; case SF_NIO_STAGE_CONNECT: @@ -271,14 +267,14 @@ static int sf_nio_deal_task(struct fast_task_info *task) result = sf_send_add_event(task); break; case SF_NIO_STAGE_CONTINUE: //continue deal - result = SF_CTX->deal_task(task); + result = SF_CTX->deal_task(task, SF_NIO_STAGE_CONTINUE); break; case SF_NIO_STAGE_FORWARDED: //forward by other thread if ((result=sf_ioevent_add(task, (IOEventCallback) sf_client_sock_read, task->network_timeout)) == 0) { - result = SF_CTX->deal_task(task); + result = SF_CTX->deal_task(task, SF_NIO_STAGE_SEND); } break; case SF_NIO_STAGE_CLOSE: @@ -286,8 +282,8 @@ static int sf_nio_deal_task(struct fast_task_info *task) break; default: logError("file: "__FILE__", line: %d, " - "client ip: %s, invalid stage: %d", - __LINE__, task->client_ip, stage); + "client ip: %s, invalid notify stage: %d", + __LINE__, task->client_ip, task->nio_stages.notify); result = -EINVAL; break; } @@ -299,49 +295,16 @@ static int sf_nio_deal_task(struct fast_task_info *task) return result; } -int sf_nio_notify_ex(struct fast_task_info *task, const int new_stage, - const int log_level, const char *file, const int line) +int sf_nio_notify(struct fast_task_info *task, const int stage) { int64_t n; int result; - int old_stage; bool notify; - old_stage = SF_NIO_TASK_STAGE_FETCH(task); - if (!(new_stage == SF_NIO_STAGE_INIT || - new_stage == SF_NIO_STAGE_CONNECT || - new_stage == SF_NIO_STAGE_CLOSE)) - { - if (SF_NIO_STAGE_IS_INPROGRESS(old_stage)) { - if (FC_LOG_BY_LEVEL(log_level)) { - log_it_ex(&g_log_context, log_level, - "file: "__FILE__", line: %d, " - "from caller {file: %s, line: %d}, " - "client ip: %s, nio stage in progress, " - "current stage: %d, skip set to %d", __LINE__, - file, line, task->client_ip, old_stage, new_stage); - } - return EBUSY; - } - } - - if (!__sync_bool_compare_and_swap(&task->nio_stage, - old_stage, new_stage)) - { - if (FC_LOG_BY_LEVEL(log_level)) { - log_it_ex(&g_log_context, log_level, - "file: "__FILE__", line: %d, " - "from caller {file: %s, line: %d}, " - "client ip: %s, skip set stage to %d because stage " - "changed, current stage: %d", __LINE__, file, line, - task->client_ip, new_stage, SF_NIO_TASK_STAGE_FETCH(task)); - } - return EEXIST; - } - + PTHREAD_MUTEX_LOCK(&task->thread_data->waiting_queue.lock); + task->nio_stages.notify = stage; task->next = NULL; - pthread_mutex_lock(&task->thread_data->waiting_queue.lock); if (task->thread_data->waiting_queue.tail == NULL) { task->thread_data->waiting_queue.head = task; notify = true; @@ -350,7 +313,7 @@ int sf_nio_notify_ex(struct fast_task_info *task, const int new_stage, notify = false; } task->thread_data->waiting_queue.tail = task; - pthread_mutex_unlock(&task->thread_data->waiting_queue.lock); + PTHREAD_MUTEX_UNLOCK(&task->thread_data->waiting_queue.lock); if (notify) { n = 1; @@ -383,10 +346,10 @@ void sf_recv_notify_read(int sock, short event, void *arg) __LINE__, sock, errno, STRERROR(errno)); } - pthread_mutex_lock(&thread_data->waiting_queue.lock); + PTHREAD_MUTEX_LOCK(&thread_data->waiting_queue.lock); current = thread_data->waiting_queue.head; thread_data->waiting_queue.head = thread_data->waiting_queue.tail = NULL; - pthread_mutex_unlock(&thread_data->waiting_queue.lock); + PTHREAD_MUTEX_UNLOCK(&thread_data->waiting_queue.lock); while (current != NULL) { task = current; @@ -403,7 +366,7 @@ int sf_send_add_event(struct fast_task_info *task) task->offset = 0; if (task->length > 0) { /* direct send */ - sf_nio_set_stage(task, SF_NIO_STAGE_SEND); + task->nio_stages.current = SF_NIO_STAGE_SEND; if (sf_client_sock_write(task->event.fd, IOEVENT_WRITE, task) < 0) { return errno != 0 ? errno : EIO; } @@ -414,15 +377,13 @@ int sf_send_add_event(struct fast_task_info *task) int sf_client_sock_read(int sock, short event, void *arg) { - int stage; int bytes; int recv_bytes; int total_read; struct fast_task_info *task; task = (struct fast_task_info *)arg; - stage = SF_NIO_TASK_STAGE_FETCH(task); - if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_RECV)) { + if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_RECV)) { return 0; } @@ -469,18 +430,6 @@ int sf_client_sock_read(int sock, short event, void *arg) return -1; } - if (stage != SF_NIO_STAGE_RECV_INPROGRESS) { - if (!__sync_bool_compare_and_swap(&task->nio_stage, - stage, SF_NIO_STAGE_RECV_INPROGRESS)) - { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, nio stage change from %d to %d, " - "skip read!", __LINE__, task->client_ip, stage, - SF_NIO_TASK_STAGE_FETCH(task)); - return 0; - } - } - total_read = 0; while (1) { fast_timer_modify(&task->thread_data->timer, @@ -605,8 +554,8 @@ int sf_client_sock_read(int sock, short event, void *arg) if (task->offset >= task->length) { //recv done task->req_count++; - sf_nio_set_stage(task, SF_NIO_STAGE_SEND); - if (SF_CTX->deal_task(task) < 0) { //fatal error + task->nio_stages.current = SF_NIO_STAGE_SEND; + if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error ioevent_add_to_deleted_list(task); return -1; } @@ -619,15 +568,13 @@ int sf_client_sock_read(int sock, short event, void *arg) int sf_client_sock_write(int sock, short event, void *arg) { - int stage; int bytes; int total_write; struct fast_task_info *task; //assert(sock >= 0); task = (struct fast_task_info *)arg; - stage = SF_NIO_TASK_STAGE_FETCH(task); - if (task->canceled || (SF_NIO_STAGE_ONLY(stage) != SF_NIO_STAGE_SEND)) { + if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_SEND)) { return 0; } @@ -650,18 +597,6 @@ int sf_client_sock_write(int sock, short event, void *arg) return -1; } - if (stage != SF_NIO_STAGE_SEND_INPROGRESS) { - if (!__sync_bool_compare_and_swap(&task->nio_stage, - stage, SF_NIO_STAGE_SEND_INPROGRESS)) - { - logWarning("file: "__FILE__", line: %d, " - "client ip: %s, nio stage change from %d to %d, " - "skip write!", __LINE__, task->client_ip, stage, - SF_NIO_TASK_STAGE_FETCH(task)); - return 0; - } - } - total_write = 0; while (1) { fast_timer_modify(&task->thread_data->timer, diff --git a/src/sf_nio.h b/src/sf_nio.h index 358560e..f52431a 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -58,6 +58,9 @@ static inline TaskCleanUpCallback sf_get_task_cleanup_func_ex( #define sf_get_task_cleanup_func() \ sf_get_task_cleanup_func_ex(&g_sf_context) +#define sf_nio_task_is_idle(task) \ + (task->offset == 0 && task->length == 0) + void sf_recv_notify_read(int sock, short event, void *arg); int sf_send_add_event(struct fast_task_info *task); int sf_client_sock_write(int sock, short event, void *arg); @@ -65,17 +68,7 @@ int sf_client_sock_read(int sock, short event, void *arg); void sf_task_finish_clean_up(struct fast_task_info *task); -int sf_nio_notify_ex(struct fast_task_info *task, const int new_stage, - const int log_level, const char *file, const int line); - -#define sf_nio_notify(task, new_stage) \ - sf_nio_notify_ex(task, new_stage, LOG_WARNING, __FILE__, __LINE__) - -#define sf_nio_notify_silence(task, new_stage) \ - sf_nio_notify_ex(task, new_stage, LOG_NOTHING, __FILE__, __LINE__) - -#define sf_nio_task_is_idle(task) \ - (task->offset == 0 && task->length == 0) +int sf_nio_notify(struct fast_task_info *task, const int stage); int sf_set_read_event(struct fast_task_info *task); @@ -96,29 +89,6 @@ static inline bool sf_client_sock_in_read_stage(struct fast_task_info *task) return (task->event.callback == (IOEventCallback)sf_client_sock_read); } -static inline void sf_nio_set_stage(struct fast_task_info *task, - const int new_stage) -{ - int old_stage; - old_stage = __sync_add_and_fetch(&task->nio_stage, 0); - if (new_stage != old_stage) { - __sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage); - } -} - -static inline bool sf_nio_swap_stage(struct fast_task_info *task, - const int old_stage, const int new_stage) -{ - return __sync_bool_compare_and_swap(&task->nio_stage, old_stage, new_stage); -} - -static inline bool sf_nio_task_inprogress(struct fast_task_info *task) -{ - int stage; - stage = __sync_add_and_fetch(&task->nio_stage, 0); - return SF_NIO_STAGE_IS_INPROGRESS(stage); -} - #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index 73a2a0c..e8d9296 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -35,7 +35,7 @@ typedef void (*sf_accept_done_callback)(struct fast_task_info *task, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); -typedef int (*sf_deal_task_func)(struct fast_task_info *task); +typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task); typedef struct sf_context {