From 719f8b2b321d8ddfe0b83cc4373e74e3672f786a Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 28 Oct 2020 14:16:58 +0800 Subject: [PATCH] task use reffer_count for share --- src/idempotency/client/client_channel.c | 14 +---- src/sf_nio.c | 75 ++++++++++++++++--------- src/sf_service.c | 16 ++---- src/sf_service.h | 57 +++++++++++++++++++ 4 files changed, 114 insertions(+), 48 deletions(-) diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 537cc9b..6d50c65 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -176,21 +176,13 @@ struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, { struct fast_task_info *task; - task = free_queue_pop(); - if (task == NULL) { - logError("file: "__FILE__", line: %d, " - "malloc task buff failed, you should " - "increase the parameter: max_connections", - __LINE__); + if ((task=sf_alloc_init_task(&g_sf_context, -1)) == NULL) { *err_no = ENOMEM; return NULL; } snprintf(task->server_ip, sizeof(task->server_ip), "%s", server_ip); task->port = port; - task->canceled = false; - task->ctx = &g_sf_context; - task->event.fd = -1; task->arg = channel; task->thread_data = g_sf_context.thread_data + hash_code % g_sf_context.work_threads; @@ -198,7 +190,7 @@ struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel, channel->last_connect_time = g_current_time; if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) { channel->in_ioevent = 0; //rollback - free_queue_push(task); + sf_release_task(task); return NULL; } return task; @@ -223,7 +215,7 @@ int idempotency_client_channel_check_reconnect( __LINE__, channel->task->server_ip, channel->task->port); - channel->task->canceled = false; + __sync_bool_compare_and_swap(&channel->task->canceled, 1, 0); if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { channel->last_connect_time = g_current_time; channel->last_report_time = g_current_time; diff --git a/src/sf_nio.c b/src/sf_nio.c index 236b0a3..85f3599 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -37,6 +37,7 @@ #include "fastcommon/fast_task_queue.h" #include "fastcommon/ioevent_loop.h" #include "sf_global.h" +#include "sf_service.h" #include "sf_nio.h" #define SF_CTX ((SFContext *)(task->ctx)) @@ -97,7 +98,7 @@ void sf_task_finish_clean_up(struct fast_task_info *task) task->event.fd = -1; __sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1); - free_queue_push(task); + sf_release_task(task); } static inline int set_write_event(struct fast_task_info *task) @@ -305,10 +306,25 @@ int sf_nio_notify(struct fast_task_info *task, const int stage) int result; bool notify; + if (__sync_add_and_fetch(&task->canceled, 0)) { + if (stage == SF_NIO_STAGE_CONTINUE) { + if (task->continue_callback != NULL) { + return task->continue_callback(task); + } else { + return 0; + } + } else { + logWarning("file: "__FILE__", line: %d, " + "unexpected notify stage: %d, task %p " + "already canceled", __LINE__, stage, task); + return ECANCELED; + } + } + if (!__sync_bool_compare_and_swap(&task->nio_stages.notify, SF_NIO_STAGE_NONE, stage)) { - logDebug("file: "__FILE__", line: %d, " + logWarning("file: "__FILE__", line: %d, " "current stage: %d != %d, skip set stage to %d", __LINE__, __sync_fetch_and_sub(&task->nio_stages.notify, 0), SF_NIO_STAGE_NONE, stage); @@ -374,6 +390,11 @@ void sf_recv_notify_read(int sock, short event, void *arg) } else { stage = __sync_add_and_fetch(&task->nio_stages.notify, 0); if (stage != SF_NIO_STAGE_NONE) { + if (stage == SF_NIO_STAGE_CONTINUE && + task->continue_callback != NULL) + { + task->continue_callback(task); + } __sync_bool_compare_and_swap(&task->nio_stages.notify, stage, SF_NIO_STAGE_NONE); } @@ -395,19 +416,39 @@ int sf_send_add_event(struct fast_task_info *task) return 0; } +static inline int check_task(struct fast_task_info *task, + const short event, const int expect_stage) +{ + if (task->canceled) { + return ENOTCONN; + } + + if (event & IOEVENT_ERROR) { + logDebug("file: "__FILE__", line: %d, " + "client ip: %s, expect stage: %d, recv error event: %d, " + "close connection", __LINE__, task->client_ip, + expect_stage, event); + + ioevent_add_to_deleted_list(task); + return -1; + } + + return task->nio_stages.current == expect_stage ? 0 : EAGAIN; +} + int sf_client_sock_read(int sock, short event, void *arg) { + int result; int bytes; int recv_bytes; int total_read; struct fast_task_info *task; task = (struct fast_task_info *)arg; - if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_RECV)) { - return 0; + if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) { + return result >= 0 ? 0 : -1; } - //assert(sock >= 0); if (event & IOEVENT_TIMEOUT) { if (task->offset == 0 && task->req_count > 0) { if (SF_CTX->timeout_callback != NULL) { @@ -441,15 +482,6 @@ int sf_client_sock_read(int sock, short event, void *arg) return 0; } - if (event & IOEVENT_ERROR) { - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, recv error event: %d, " - "close connection", __LINE__, task->client_ip, event); - - ioevent_add_to_deleted_list(task); - return -1; - } - total_read = 0; while (1) { fast_timer_modify(&task->thread_data->timer, @@ -588,14 +620,14 @@ int sf_client_sock_read(int sock, short event, void *arg) int sf_client_sock_write(int sock, short event, void *arg) { + int result; int bytes; int total_write; struct fast_task_info *task; - //assert(sock >= 0); task = (struct fast_task_info *)arg; - if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_SEND)) { - return 0; + if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) { + return result >= 0 ? 0 : -1; } if (event & IOEVENT_TIMEOUT) { @@ -608,15 +640,6 @@ int sf_client_sock_write(int sock, short event, void *arg) return -1; } - if (event & IOEVENT_ERROR) { - logDebug("file: "__FILE__", line: %d, " - "client ip: %s, recv error event: %d, " - "close connection", __LINE__, task->client_ip, event); - - ioevent_add_to_deleted_list(task); - return -1; - } - total_write = 0; while (1) { fast_timer_modify(&task->thread_data->timer, diff --git a/src/sf_service.c b/src/sf_service.c index e1e0cee..ea0d372 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -380,22 +380,16 @@ static void *accept_thread_entrance(void *arg) continue; } - task = free_queue_pop(); - if (task == NULL) { - logError("file: "__FILE__", line: %d, " - "malloc task buff failed, you should " - "increase the parameter: max_connections", - __LINE__); + if ((task=sf_alloc_init_task(accept_context-> + sf_context, incomesock)) == NULL) + { close(incomesock); continue; } + getPeerIpAddPort(incomesock, task->client_ip, sizeof(task->client_ip), &port); task->port = port; - - task->canceled = false; - task->ctx = accept_context->sf_context; - task->event.fd = incomesock; task->thread_data = accept_context->sf_context->thread_data + incomesock % accept_context->sf_context->work_threads; if (accept_context->sf_context->accept_done_func != NULL) { @@ -406,7 +400,7 @@ static void *accept_thread_entrance(void *arg) if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) { close(incomesock); - free_queue_push(task); + sf_release_task(task); } } diff --git a/src/sf_service.h b/src/sf_service.h index 0757f0d..478b386 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -100,6 +100,63 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler); int sf_init_task(struct fast_task_info *task); +#define sf_alloc_init_task(sf_context, sock) \ + sf_alloc_init_task_ex(sf_context, sock, 1) + +static inline struct fast_task_info *sf_alloc_init_task_ex( + SFContext *sf_context, const int sock, const int init_reffer) +{ + struct fast_task_info *task; + + task = free_queue_pop(); + if (task == NULL) { + logError("file: "__FILE__", line: %d, " + "malloc task buff failed, you should " + "increase the parameter: max_connections", + __LINE__); + return NULL; + } + __sync_add_and_fetch(&task->reffer_count, init_reffer); + __sync_bool_compare_and_swap(&task->canceled, 1, 0); + task->ctx = sf_context; + task->event.fd = sock; + + return task; +} + +#define sf_hold_task(task) __sync_add_and_fetch(&task->reffer_count, 1) + +/* +#define sf_hold_task(task) \ + logInfo("file: "__FILE__", line: %d, " \ + "hold task %p, reffer: %d", \ + __LINE__, task, __sync_add_and_fetch(&task->reffer_count, 1)) + */ + +#define sf_try_hold_task_to_twice(task) \ + __sync_bool_compare_and_swap(&task->reffer_count, 1, 2) + +static inline void sf_release_task(struct fast_task_info *task) +{ + int reffer_count; + if ((reffer_count=__sync_sub_and_fetch(&task->reffer_count, 1)) == 0) { + int free_count = free_queue_count(); + int alloc_count = free_queue_alloc_connections(); + logInfo("file: "__FILE__", line: %d, " + "push task %p to queue, alloc: %d, " + "used: %d, freed: %d", __LINE__, task, + alloc_count, alloc_count - free_count, free_count); + + free_queue_push(task); + } else { + /* + logInfo("file: "__FILE__", line: %d, " + "release task %p, current reffer: %d", + __LINE__, task, reffer_count); + */ + } +} + #ifdef __cplusplus } #endif