diff --git a/src/sched_thread.c b/src/sched_thread.c index 4b659b4..154b613 100644 --- a/src/sched_thread.c +++ b/src/sched_thread.c @@ -16,10 +16,10 @@ #include "fc_memory.h" #include "sched_thread.h" -volatile bool g_schedule_flag = false; +volatile int g_schedule_flag = false; volatile time_t g_current_time = 0; -static ScheduleArray waiting_schedule_array = {NULL, 0}; +static volatile ScheduleArray waiting_schedule_array = {NULL, 0}; static int waiting_del_id = -1; static ScheduleContext *schedule_context = NULL; @@ -226,6 +226,7 @@ static int print_all_sched_entries(ScheduleArray *pScheduleArray) static int do_check_waiting(ScheduleContext *pContext) { ScheduleArray *pScheduleArray; + ScheduleEntry *waitingEntries; ScheduleEntry *newEntries; ScheduleEntry *pWaitingEntry; ScheduleEntry *pWaitingEnd; @@ -234,6 +235,7 @@ static int do_check_waiting(ScheduleContext *pContext) int allocCount; int newCount; int deleteCount; + int waitingCount; pScheduleArray = &(pContext->scheduleArray); deleteCount = 0; @@ -271,7 +273,9 @@ static int do_check_waiting(ScheduleContext *pContext) waiting_del_id = -1; } - if (waiting_schedule_array.count == 0) + waitingCount = __sync_add_and_fetch( + &waiting_schedule_array.count, 0); + if (waitingCount == 0) { if (deleteCount > 0) { @@ -282,7 +286,9 @@ static int do_check_waiting(ScheduleContext *pContext) return ENOENT; } - allocCount = pScheduleArray->count + waiting_schedule_array.count; + waitingEntries = __sync_add_and_fetch( + &waiting_schedule_array.entries, 0); + allocCount = pScheduleArray->count + waitingCount; newEntries = (ScheduleEntry *)fc_malloc(sizeof(ScheduleEntry) * allocCount); if (newEntries == NULL) { @@ -295,13 +301,13 @@ static int do_check_waiting(ScheduleContext *pContext) if (pScheduleArray->count > 0) { - memcpy(newEntries, pScheduleArray->entries, \ + memcpy(newEntries, pScheduleArray->entries, sizeof(ScheduleEntry) * pScheduleArray->count); } newCount = pScheduleArray->count; - pWaitingEnd = waiting_schedule_array.entries + waiting_schedule_array.count; - for (pWaitingEntry=waiting_schedule_array.entries; \ - pWaitingEntrycount, \ - waiting_schedule_array.count - (newCount - pScheduleArray->count)); + waitingCount - (newCount - pScheduleArray->count)); if (pScheduleArray->entries != NULL) { @@ -333,12 +339,13 @@ static int do_check_waiting(ScheduleContext *pContext) pScheduleArray->entries = newEntries; pScheduleArray->count = newCount; - free(waiting_schedule_array.entries); - waiting_schedule_array.count = 0; - waiting_schedule_array.entries = NULL; + __sync_bool_compare_and_swap(&waiting_schedule_array.count, + waitingCount, 0); + __sync_bool_compare_and_swap(&waiting_schedule_array.entries, + waitingEntries, NULL); + free(waitingEntries); sched_make_chain(pContext); - return 0; } @@ -398,7 +405,7 @@ static void *sched_thread_entrance(void *args) } sched_make_chain(pContext); - g_schedule_flag = true; + __sync_bool_compare_and_swap(&g_schedule_flag, 0, 1); while (*(pContext->pcontinue_flag)) { g_current_time = time(NULL); @@ -527,7 +534,7 @@ static void *sched_thread_entrance(void *args) } } - g_schedule_flag = false; + __sync_bool_compare_and_swap(&g_schedule_flag, 1, 0); logDebug("file: "__FILE__", line: %d, " \ "schedule thread exit", __LINE__); @@ -598,6 +605,7 @@ static int sched_append_array(const ScheduleArray *pSrcArray, \ int sched_add_entries(const ScheduleArray *pScheduleArray) { int result; + int i; ScheduleArray temp_schedule_array; if (pScheduleArray->count == 0) @@ -617,21 +625,39 @@ int sched_add_entries(const ScheduleArray *pScheduleArray) return result; } - if (waiting_schedule_array.entries != NULL) + if (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL) { - if (g_schedule_flag) + if (!__sync_add_and_fetch(&g_schedule_flag, 0)) { - while (waiting_schedule_array.entries != NULL) + logDebug("file: "__FILE__", line: %d, " + "schedule thread not ready yet", __LINE__); + } + i = 0; + while (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL) + { + if (i == 0) { logDebug("file: "__FILE__", line: %d, " - "waiting for schedule array ready ...", __LINE__); - sleep(1); + "waiting for schedule array ready, " + "schedule_flag: %d ...", __LINE__, + __sync_add_and_fetch(&g_schedule_flag, 0)); } + else if (++i == 10) + { + logError("file: "__FILE__", line: %d, " + "waiting for schedule array ready timeout, " + "schedule_flag: %d, wait count: %d", __LINE__, + __sync_add_and_fetch(&g_schedule_flag, 0), i); + break; + } + sleep(1); } } - waiting_schedule_array.entries = temp_schedule_array.entries; - waiting_schedule_array.count = temp_schedule_array.count; + __sync_bool_compare_and_swap(&waiting_schedule_array.entries, + NULL, temp_schedule_array.entries); + __sync_bool_compare_and_swap(&waiting_schedule_array.count, + 0, temp_schedule_array.count); return 0; } @@ -685,8 +711,9 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, if (timer_slot_count > 0) { - if ((result=fast_mblock_init(&pContext->mblock, - sizeof(FastDelayTask), mblock_alloc_once)) != 0) + if ((result=fast_mblock_init_ex1(&pContext->delay_task_allocator, + "sched_delay_task", sizeof(FastDelayTask), + mblock_alloc_once, 0, NULL, NULL, true)) != 0) { free(pContext); return result; @@ -699,7 +726,9 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, free(pContext); return result; } - if ((result=init_pthread_lock(&pContext->delay_queue.lock)) != 0) + + if ((result=fc_queue_init(&pContext->delay_queue, (long) + (&((FastDelayTask *)NULL)->next))) != 0) { free(pContext); return result; @@ -755,6 +784,8 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, void *func_args, const int delay_seconds, const bool new_thread) { FastDelayTask *task; + bool notify; + if (!pContext->timer_init) { logError("file: "__FILE__", line: %d, " @@ -763,7 +794,8 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, return EOPNOTSUPP; } - task = (FastDelayTask *)fast_mblock_alloc_object(&pContext->mblock); + task = (FastDelayTask *)fast_mblock_alloc_object( + &pContext->delay_task_allocator); if (task == NULL) { return ENOMEM; @@ -781,18 +813,7 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, task->timer.expires = g_current_time; } - pthread_mutex_lock(&pContext->delay_queue.lock); - if (pContext->delay_queue.head == NULL) - { - pContext->delay_queue.head = task; - } - else - { - pContext->delay_queue.tail->next = task; - } - pContext->delay_queue.tail = task; - pthread_mutex_unlock(&pContext->delay_queue.lock); - + fc_queue_push_ex(&pContext->delay_queue, task, ¬ify); return 0; } @@ -806,18 +827,10 @@ int sched_add_delay_task(TaskFunc task_func, void *func_args, static void sched_deal_task_queue(ScheduleContext *pContext) { FastDelayTask *task; + struct fc_queue_info qinfo; - pthread_mutex_lock(&pContext->delay_queue.lock); - if (pContext->delay_queue.head == NULL) - { - pthread_mutex_unlock(&pContext->delay_queue.lock); - return; - } - task = pContext->delay_queue.head; - pContext->delay_queue.head = NULL; - pContext->delay_queue.tail = NULL; - pthread_mutex_unlock(&pContext->delay_queue.lock); - + fc_queue_pop_to_queue(&pContext->delay_queue, &qinfo); + task = qinfo.head; while (task != NULL) { fast_timer_add(&pContext->timer, (FastTimerEntry *)task); @@ -849,7 +862,7 @@ static void *sched_call_delay_func(void *args) logDebug("file: "__FILE__", line: %d, " \ "delay thread exit, task args: %p", __LINE__, task->func_args); - fast_mblock_free_object(&pContext->mblock, task); + fast_mblock_free_object(&pContext->delay_task_allocator, task); pthread_detach(pthread_self()); return NULL; } @@ -873,7 +886,7 @@ static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head) if (!task->new_thread) { task->task_func(task->func_args); - fast_mblock_free_object(&pContext->mblock, task); + fast_mblock_free_object(&pContext->delay_task_allocator, task); } else { diff --git a/src/sched_thread.h b/src/sched_thread.h index 288220f..ef364f1 100644 --- a/src/sched_thread.h +++ b/src/sched_thread.h @@ -15,6 +15,7 @@ #include "common_define.h" #include "fast_timer.h" #include "fast_mblock.h" +#include "fc_queue.h" typedef int (*TaskFunc) (void *args); @@ -58,23 +59,16 @@ typedef struct fast_delay_task { struct fast_delay_task *next; } FastDelayTask; -typedef struct -{ - FastDelayTask *head; - FastDelayTask *tail; - pthread_mutex_t lock; -} FastDelayQueue; - typedef struct { ScheduleArray scheduleArray; ScheduleEntry *head; //schedule chain head ScheduleEntry *tail; //schedule chain tail - struct fast_mblock_man mblock; //for timer entry + struct fast_mblock_man delay_task_allocator; //for FastDelayTask FastTimer timer; //for delay task bool timer_init; - FastDelayQueue delay_queue; + struct fc_queue delay_queue; bool *pcontinue_flag; } ScheduleContext; @@ -103,10 +97,9 @@ typedef struct extern "C" { #endif -extern volatile bool g_schedule_flag; //schedule continue running flag +extern volatile int g_schedule_flag; //schedule continue running flag extern volatile time_t g_current_time; //the current time - #define get_current_time() (g_schedule_flag ? g_current_time: time(NULL)) /** generate next id