From 96a6464e91aadf5670a9c00848b2a18ab50a47dc Mon Sep 17 00:00:00 2001 From: yuqing Date: Fri, 16 Oct 2015 16:44:46 +0800 Subject: [PATCH] sched_thread.c: support delay tasks --- HISTORY | 1 + src/fast_timer.c | 1 + src/sched_thread.c | 189 +++++++++++++++++++++++++++++++++++++++++++-- src/sched_thread.h | 40 +++++++++- 4 files changed, 224 insertions(+), 7 deletions(-) diff --git a/HISTORY b/HISTORY index 828e872..112eb08 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,7 @@ Version 1.23 2015-10-16 * sched_thread.c: task can execute in a new thread + * sched_thread.c: support delay tasks Version 1.22 2015-10-10 * export php function: fastcommon_get_first_local_ip diff --git a/src/fast_timer.c b/src/fast_timer.c index 19b0882..b9c3c5c 100644 --- a/src/fast_timer.c +++ b/src/fast_timer.c @@ -52,6 +52,7 @@ int fast_timer_add(FastTimer *timer, FastTimerEntry *entry) } entry->prev = &slot->head; slot->head.next = entry; + entry->rehash = false; return 0; } diff --git a/src/sched_thread.c b/src/sched_thread.c index 393a6fc..5fcc4ef 100644 --- a/src/sched_thread.c +++ b/src/sched_thread.c @@ -10,10 +10,10 @@ #include #include #include -#include "sched_thread.h" #include "shared_func.h" #include "pthread_func.h" #include "logger.h" +#include "sched_thread.h" volatile bool g_schedule_flag = false; volatile time_t g_current_time = 0; @@ -21,6 +21,12 @@ volatile time_t g_current_time = 0; static ScheduleArray waiting_schedule_array = {NULL, 0}; static int waiting_del_id = -1; +static ScheduleContext *schedule_context = NULL; +static int timer_slot_count = 0; +static int mblock_alloc_once = 0; + +static void sched_deal_delay_tasks(ScheduleContext *pContext); + static int sched_cmp_by_next_call_time(const void *p1, const void *p2) { return ((ScheduleEntry *)p1)->next_call_time - \ @@ -311,20 +317,23 @@ static void *sched_thread_entrance(void *args) g_schedule_flag = true; while (*(pContext->pcontinue_flag)) { + g_current_time = time(NULL); + sched_deal_delay_tasks(pContext); + sched_check_waiting(pContext); if (pContext->scheduleArray.count == 0) //no schedule entry { sleep(1); - g_current_time = time(NULL); continue; } - g_current_time = time(NULL); while (pContext->head->next_call_time > g_current_time && *(pContext->pcontinue_flag)) { sleep(1); g_current_time = time(NULL); + + sched_deal_delay_tasks(pContext); if (sched_check_waiting(pContext) == 0) { break; @@ -341,7 +350,7 @@ static void *sched_thread_entrance(void *args) while (*(pContext->pcontinue_flag) && (pCurrent != NULL \ && pCurrent->next_call_time <= g_current_time)) { - //fprintf(stderr, "exec task id=%d\n", pCurrent->id); + //logInfo("exec task id: %d", pCurrent->id); if (!pCurrent->new_thread) { pCurrent->task_func(pCurrent->func_args); @@ -556,8 +565,9 @@ int sched_del_entry(const int id) return 0; } -int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ - const int stack_size, bool * volatile pcontinue_flag) +int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, + const int stack_size, bool * volatile pcontinue_flag, + ScheduleContext **ppContext) { int result; pthread_attr_t thread_attr; @@ -574,6 +584,7 @@ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ result, STRERROR(result)); return result; } + memset(pContext, 0, sizeof(ScheduleContext)); if ((result=init_pthread_attr(&thread_attr, stack_size)) != 0) { @@ -588,6 +599,30 @@ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ return result; } + if (timer_slot_count > 0) + { + if ((result=fast_mblock_init(&pContext->mblock, + sizeof(FastDelayTask), mblock_alloc_once)) != 0) + { + free(pContext); + return result; + } + + g_current_time = time(NULL); + if ((result=fast_timer_init(&pContext->timer, timer_slot_count, + g_current_time)) != 0) + { + free(pContext); + return result; + } + if ((result=init_pthread_lock(&pContext->delay_queue.lock)) != 0) + { + free(pContext); + return result; + } + pContext->timer_init = true; + } + pContext->pcontinue_flag = pcontinue_flag; if ((result=pthread_create(ptid, &thread_attr, \ sched_thread_entrance, pContext)) != 0) @@ -599,7 +634,149 @@ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ __LINE__, result, STRERROR(result)); } + *ppContext = pContext; pthread_attr_destroy(&thread_attr); return result; } +int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, + const int stack_size, bool * volatile pcontinue_flag) +{ + return sched_start_ex(pScheduleArray, ptid, stack_size, + pcontinue_flag, &schedule_context); +} + +void sched_set_delay_params(const int slot_count, const int alloc_once) +{ + if (slot_count > 1) + { + timer_slot_count = slot_count; + } + else + { + timer_slot_count = 300; + } + + if (alloc_once > 0) + { + mblock_alloc_once = alloc_once; + } + else + { + mblock_alloc_once = 4 * 1024; + } +} + +int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, + void *func_args, const int delay_seconds) +{ + FastDelayTask *task; + if (!pContext->timer_init) + { + logError("file: "__FILE__", line: %d, " + "NOT support delay tasks, you should call sched_set_delay_params " + "before sched_start!", __LINE__); + return EOPNOTSUPP; + } + + task = (FastDelayTask *)fast_mblock_alloc_object(&pContext->mblock); + if (task == NULL) + { + return ENOMEM; + } + task->task_func = task_func; + task->func_args = func_args; + task->next = NULL; + if (delay_seconds > 0) + { + task->timer.expires = g_current_time + delay_seconds; + } + else + { + task->timer.expires = g_current_time; + } + + pthread_mutex_lock(&pContext->delay_queue.lock); + if (pContext->delay_queue.head == NULL) + { + pContext->delay_queue.head = task; + } + else + { + pContext->delay_queue.tail->next = task; + } + pContext->delay_queue.tail = task; + pthread_mutex_unlock(&pContext->delay_queue.lock); + + return 0; +} + +int sched_add_delay_task(TaskFunc task_func, void *func_args, + const int delay_seconds) +{ + return sched_add_delay_task_ex(schedule_context, task_func, + func_args, delay_seconds); +} + +static void sched_deal_task_queue(ScheduleContext *pContext) +{ + FastDelayTask *task; + + pthread_mutex_lock(&pContext->delay_queue.lock); + if (pContext->delay_queue.head == NULL) + { + pthread_mutex_unlock(&pContext->delay_queue.lock); + return; + } + task = pContext->delay_queue.head; + pContext->delay_queue.head = NULL; + pContext->delay_queue.tail = NULL; + pthread_mutex_unlock(&pContext->delay_queue.lock); + + while (task != NULL) + { + fast_timer_add(&pContext->timer, (FastTimerEntry *)task); + task = task->next; + } +} + +static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head) +{ + FastTimerEntry *entry; + FastTimerEntry *current; + FastDelayTask *task; + + entry = head->next; + while (entry != NULL) + { + current = entry; + entry = entry->next; + + current->prev = current->next = NULL; //must set NULL because NOT in time wheel + + task = (FastDelayTask *)current; + task->task_func(task->func_args); + fast_mblock_free_object(&pContext->mblock, task); + } +} + +static void sched_deal_delay_tasks(ScheduleContext *pContext) +{ + FastTimerEntry head; + int count; + + if (!pContext->timer_init) + { + return; + } + + sched_deal_task_queue(pContext); + count = fast_timer_timeouts_get( + &pContext->timer, g_current_time, &head); + if (count > 0) + { + deal_timeout_tasks(pContext, &head); + //logInfo("deal delay task count: %d", count); + } +} + diff --git a/src/sched_thread.h b/src/sched_thread.h index 72d9048..b1df533 100644 --- a/src/sched_thread.h +++ b/src/sched_thread.h @@ -10,7 +10,10 @@ #define _SCHED_THREAD_H_ #include +#include #include "common_define.h" +#include "fast_timer.h" +#include "fast_mblock.h" typedef int (*TaskFunc) (void *args); @@ -42,11 +45,32 @@ typedef struct int count; } ScheduleArray; +typedef struct fast_delay_task { + FastTimerEntry timer; //must be first field + + TaskFunc task_func; //callback function + void *func_args; //arguments pass to callback function + struct fast_delay_task *next; +} FastDelayTask; + +typedef struct +{ + FastDelayTask *head; + FastDelayTask *tail; + pthread_mutex_t lock; +} FastDelayQueue; + typedef struct { ScheduleArray scheduleArray; ScheduleEntry *head; //schedule chain head - ScheduleEntry *tail; //schedule chain tail + ScheduleEntry *tail; //schedule chain tail + + struct fast_mblock_man mblock; //for timer entry + FastTimer timer; //for delay task + bool timer_init; + FastDelayQueue delay_queue; + bool *pcontinue_flag; } ScheduleContext; @@ -83,14 +107,28 @@ extern volatile time_t g_current_time; //the current time int sched_add_entries(const ScheduleArray *pScheduleArray); int sched_del_entry(const int id); +#define sched_enable_delay_task() sched_set_delay_params(0, 0) + +void sched_set_delay_params(const int slot_count, const int alloc_once); + +int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, + void *func_args, const int delay_seconds); +int sched_add_delay_task(TaskFunc task_func, void *func_args, + const int delay_seconds); + /** execute the schedule thread * parameters: * pScheduleArray: schedule task * ptid: store the schedule thread id * stack_size: set thread stack size (byes) * pcontinue_flag: main process continue running flag + * ppContext: store the ScheduleContext pointer * return: error no, 0 for success, != 0 fail */ +int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, + const int stack_size, bool * volatile pcontinue_flag, + ScheduleContext **ppContext); + int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ const int stack_size, bool * volatile pcontinue_flag);