sched_thread.c: support delay tasks

pull/5/head
yuqing 2015-10-16 16:44:46 +08:00
parent 90ea025cb5
commit 96a6464e91
4 changed files with 224 additions and 7 deletions

View File

@ -1,6 +1,7 @@
Version 1.23 2015-10-16
* sched_thread.c: task can execute in a new thread
* sched_thread.c: support delay tasks
Version 1.22 2015-10-10
* export php function: fastcommon_get_first_local_ip

View File

@ -52,6 +52,7 @@ int fast_timer_add(FastTimer *timer, FastTimerEntry *entry)
}
entry->prev = &slot->head;
slot->head.next = entry;
entry->rehash = false;
return 0;
}

View File

@ -10,10 +10,10 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include "sched_thread.h"
#include "shared_func.h"
#include "pthread_func.h"
#include "logger.h"
#include "sched_thread.h"
volatile bool g_schedule_flag = false;
volatile time_t g_current_time = 0;
@ -21,6 +21,12 @@ volatile time_t g_current_time = 0;
static ScheduleArray waiting_schedule_array = {NULL, 0};
static int waiting_del_id = -1;
static ScheduleContext *schedule_context = NULL;
static int timer_slot_count = 0;
static int mblock_alloc_once = 0;
static void sched_deal_delay_tasks(ScheduleContext *pContext);
static int sched_cmp_by_next_call_time(const void *p1, const void *p2)
{
return ((ScheduleEntry *)p1)->next_call_time - \
@ -311,20 +317,23 @@ static void *sched_thread_entrance(void *args)
g_schedule_flag = true;
while (*(pContext->pcontinue_flag))
{
g_current_time = time(NULL);
sched_deal_delay_tasks(pContext);
sched_check_waiting(pContext);
if (pContext->scheduleArray.count == 0) //no schedule entry
{
sleep(1);
g_current_time = time(NULL);
continue;
}
g_current_time = time(NULL);
while (pContext->head->next_call_time > g_current_time &&
*(pContext->pcontinue_flag))
{
sleep(1);
g_current_time = time(NULL);
sched_deal_delay_tasks(pContext);
if (sched_check_waiting(pContext) == 0)
{
break;
@ -341,7 +350,7 @@ static void *sched_thread_entrance(void *args)
while (*(pContext->pcontinue_flag) && (pCurrent != NULL \
&& pCurrent->next_call_time <= g_current_time))
{
//fprintf(stderr, "exec task id=%d\n", pCurrent->id);
//logInfo("exec task id: %d", pCurrent->id);
if (!pCurrent->new_thread)
{
pCurrent->task_func(pCurrent->func_args);
@ -556,8 +565,9 @@ int sched_del_entry(const int id)
return 0;
}
int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \
const int stack_size, bool * volatile pcontinue_flag)
int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
const int stack_size, bool * volatile pcontinue_flag,
ScheduleContext **ppContext)
{
int result;
pthread_attr_t thread_attr;
@ -574,6 +584,7 @@ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \
result, STRERROR(result));
return result;
}
memset(pContext, 0, sizeof(ScheduleContext));
if ((result=init_pthread_attr(&thread_attr, stack_size)) != 0)
{
@ -588,6 +599,30 @@ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \
return result;
}
if (timer_slot_count > 0)
{
if ((result=fast_mblock_init(&pContext->mblock,
sizeof(FastDelayTask), mblock_alloc_once)) != 0)
{
free(pContext);
return result;
}
g_current_time = time(NULL);
if ((result=fast_timer_init(&pContext->timer, timer_slot_count,
g_current_time)) != 0)
{
free(pContext);
return result;
}
if ((result=init_pthread_lock(&pContext->delay_queue.lock)) != 0)
{
free(pContext);
return result;
}
pContext->timer_init = true;
}
pContext->pcontinue_flag = pcontinue_flag;
if ((result=pthread_create(ptid, &thread_attr, \
sched_thread_entrance, pContext)) != 0)
@ -599,7 +634,149 @@ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \
__LINE__, result, STRERROR(result));
}
*ppContext = pContext;
pthread_attr_destroy(&thread_attr);
return result;
}
int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid,
const int stack_size, bool * volatile pcontinue_flag)
{
return sched_start_ex(pScheduleArray, ptid, stack_size,
pcontinue_flag, &schedule_context);
}
void sched_set_delay_params(const int slot_count, const int alloc_once)
{
if (slot_count > 1)
{
timer_slot_count = slot_count;
}
else
{
timer_slot_count = 300;
}
if (alloc_once > 0)
{
mblock_alloc_once = alloc_once;
}
else
{
mblock_alloc_once = 4 * 1024;
}
}
int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
void *func_args, const int delay_seconds)
{
FastDelayTask *task;
if (!pContext->timer_init)
{
logError("file: "__FILE__", line: %d, "
"NOT support delay tasks, you should call sched_set_delay_params "
"before sched_start!", __LINE__);
return EOPNOTSUPP;
}
task = (FastDelayTask *)fast_mblock_alloc_object(&pContext->mblock);
if (task == NULL)
{
return ENOMEM;
}
task->task_func = task_func;
task->func_args = func_args;
task->next = NULL;
if (delay_seconds > 0)
{
task->timer.expires = g_current_time + delay_seconds;
}
else
{
task->timer.expires = g_current_time;
}
pthread_mutex_lock(&pContext->delay_queue.lock);
if (pContext->delay_queue.head == NULL)
{
pContext->delay_queue.head = task;
}
else
{
pContext->delay_queue.tail->next = task;
}
pContext->delay_queue.tail = task;
pthread_mutex_unlock(&pContext->delay_queue.lock);
return 0;
}
int sched_add_delay_task(TaskFunc task_func, void *func_args,
const int delay_seconds)
{
return sched_add_delay_task_ex(schedule_context, task_func,
func_args, delay_seconds);
}
static void sched_deal_task_queue(ScheduleContext *pContext)
{
FastDelayTask *task;
pthread_mutex_lock(&pContext->delay_queue.lock);
if (pContext->delay_queue.head == NULL)
{
pthread_mutex_unlock(&pContext->delay_queue.lock);
return;
}
task = pContext->delay_queue.head;
pContext->delay_queue.head = NULL;
pContext->delay_queue.tail = NULL;
pthread_mutex_unlock(&pContext->delay_queue.lock);
while (task != NULL)
{
fast_timer_add(&pContext->timer, (FastTimerEntry *)task);
task = task->next;
}
}
static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head)
{
FastTimerEntry *entry;
FastTimerEntry *current;
FastDelayTask *task;
entry = head->next;
while (entry != NULL)
{
current = entry;
entry = entry->next;
current->prev = current->next = NULL; //must set NULL because NOT in time wheel
task = (FastDelayTask *)current;
task->task_func(task->func_args);
fast_mblock_free_object(&pContext->mblock, task);
}
}
static void sched_deal_delay_tasks(ScheduleContext *pContext)
{
FastTimerEntry head;
int count;
if (!pContext->timer_init)
{
return;
}
sched_deal_task_queue(pContext);
count = fast_timer_timeouts_get(
&pContext->timer, g_current_time, &head);
if (count > 0)
{
deal_timeout_tasks(pContext, &head);
//logInfo("deal delay task count: %d", count);
}
}

View File

@ -10,7 +10,10 @@
#define _SCHED_THREAD_H_
#include <time.h>
#include <pthread.h>
#include "common_define.h"
#include "fast_timer.h"
#include "fast_mblock.h"
typedef int (*TaskFunc) (void *args);
@ -42,11 +45,32 @@ typedef struct
int count;
} ScheduleArray;
typedef struct fast_delay_task {
FastTimerEntry timer; //must be first field
TaskFunc task_func; //callback function
void *func_args; //arguments pass to callback function
struct fast_delay_task *next;
} FastDelayTask;
typedef struct
{
FastDelayTask *head;
FastDelayTask *tail;
pthread_mutex_t lock;
} FastDelayQueue;
typedef struct
{
ScheduleArray scheduleArray;
ScheduleEntry *head; //schedule chain head
ScheduleEntry *tail; //schedule chain tail
ScheduleEntry *tail; //schedule chain tail
struct fast_mblock_man mblock; //for timer entry
FastTimer timer; //for delay task
bool timer_init;
FastDelayQueue delay_queue;
bool *pcontinue_flag;
} ScheduleContext;
@ -83,14 +107,28 @@ extern volatile time_t g_current_time; //the current time
int sched_add_entries(const ScheduleArray *pScheduleArray);
int sched_del_entry(const int id);
#define sched_enable_delay_task() sched_set_delay_params(0, 0)
void sched_set_delay_params(const int slot_count, const int alloc_once);
int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
void *func_args, const int delay_seconds);
int sched_add_delay_task(TaskFunc task_func, void *func_args,
const int delay_seconds);
/** execute the schedule thread
* parameters:
* pScheduleArray: schedule task
* ptid: store the schedule thread id
* stack_size: set thread stack size (byes)
* pcontinue_flag: main process continue running flag
* ppContext: store the ScheduleContext pointer
* return: error no, 0 for success, != 0 fail
*/
int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
const int stack_size, bool * volatile pcontinue_flag,
ScheduleContext **ppContext);
int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \
const int stack_size, bool * volatile pcontinue_flag);