delay task can execute in a new thread

pull/5/head
yuqing 2015-11-02 18:24:26 +08:00
parent 3e1d1e2ae1
commit 07c93f6206
4 changed files with 86 additions and 15 deletions

View File

@ -5,6 +5,7 @@ Version 1.23 2015-11-02
* add function get_current_time_us and get_current_time_ms
* mblock add stat function
* add function get_sys_total_mem_size, ONLY support Linux and FreeBSD
* delay task can execute in a new thread
Version 1.22 2015-10-10
* export php function: fastcommon_get_first_local_ip

View File

@ -60,8 +60,8 @@ struct fast_mblock_man
fast_mblock_alloc_init_func alloc_init_func;
bool need_lock; //if need mutex lock
pthread_mutex_t lock; //the lock for read / write free node chain
struct fast_mblock_man *prev; //for manager
struct fast_mblock_man *next; //for manager
struct fast_mblock_man *prev; //for stat manager
struct fast_mblock_man *next; //for stat manager
};
#define GET_BLOCK_SIZE(info) \

View File

@ -369,13 +369,16 @@ static void *sched_thread_entrance(void *args)
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
usleep(1*1000);
for (i=1; !pCurrent->thread_running && i<100; i++)
else
{
logDebug("file: "__FILE__", line: %d, "
"task_id: %d, waiting thread ready, count %d",
__LINE__, pCurrent->id, i);
usleep(1*1000);
for (i=1; !pCurrent->thread_running && i<100; i++)
{
logDebug("file: "__FILE__", line: %d, "
"task_id: %d, waiting thread ready, count %d",
__LINE__, pCurrent->id, i);
usleep(1*1000);
}
}
}
@ -668,7 +671,7 @@ void sched_set_delay_params(const int slot_count, const int alloc_once)
}
int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
void *func_args, const int delay_seconds)
void *func_args, const int delay_seconds, const bool new_thread)
{
FastDelayTask *task;
if (!pContext->timer_init)
@ -686,6 +689,7 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
}
task->task_func = task_func;
task->func_args = func_args;
task->new_thread = new_thread;
task->next = NULL;
if (delay_seconds > 0)
{
@ -712,10 +716,10 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
}
int sched_add_delay_task(TaskFunc task_func, void *func_args,
const int delay_seconds)
const int delay_seconds, const bool new_thread)
{
return sched_add_delay_task_ex(schedule_context, task_func,
func_args, delay_seconds);
func_args, delay_seconds, new_thread);
}
static void sched_deal_task_queue(ScheduleContext *pContext)
@ -740,6 +744,34 @@ static void sched_deal_task_queue(ScheduleContext *pContext)
}
}
struct delay_thread_context {
ScheduleContext *schedule_context;
FastDelayTask *task;
};
static void *sched_call_delay_func(void *args)
{
struct delay_thread_context *delay_context;
ScheduleContext *pContext;
FastDelayTask *task;
delay_context = (struct delay_thread_context *)args;
task = delay_context->task;
pContext = delay_context->schedule_context;
logDebug("file: "__FILE__", line: %d, " \
"delay thread enter, task args: %p", __LINE__, task->func_args);
task->thread_running = true;
task->task_func(task->func_args);
logDebug("file: "__FILE__", line: %d, " \
"delay thread exit, task args: %p", __LINE__, task->func_args);
fast_mblock_free_object(&pContext->mblock, task);
return NULL;
}
static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head)
{
FastTimerEntry *entry;
@ -755,9 +787,43 @@ static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head)
current->prev = current->next = NULL; //must set NULL because NOT in time wheel
task = (FastDelayTask *)current;
task->task_func(task->func_args);
fast_mblock_free_object(&pContext->mblock, task);
}
if (!task->new_thread)
{
task->task_func(task->func_args);
fast_mblock_free_object(&pContext->mblock, task);
}
else
{
struct delay_thread_context delay_context;
pthread_t tid;
int result;
int i;
task->thread_running = false;
delay_context.task = task;
delay_context.schedule_context = pContext;
if ((result=pthread_create(&tid, NULL,
sched_call_delay_func, &delay_context)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
else
{
usleep(1*1000);
for (i=1; !task->thread_running && i<100; i++)
{
logDebug("file: "__FILE__", line: %d, "
"task args: %p, waiting thread ready, count %d",
__LINE__, task->func_args, i);
usleep(1*1000);
}
}
}
}
}
static void sched_deal_delay_tasks(ScheduleContext *pContext)

View File

@ -48,6 +48,10 @@ typedef struct
typedef struct fast_delay_task {
FastTimerEntry timer; //must be first field
bool new_thread; //run in a new thread
bool thread_running; //if new thread running, for internal use
TaskFunc task_func; //callback function
void *func_args; //arguments pass to callback function
struct fast_delay_task *next;
@ -112,9 +116,9 @@ int sched_del_entry(const int id);
void sched_set_delay_params(const int slot_count, const int alloc_once);
int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
void *func_args, const int delay_seconds);
void *func_args, const int delay_seconds, const bool new_thread);
int sched_add_delay_task(TaskFunc task_func, void *func_args,
const int delay_seconds);
const int delay_seconds, const bool new_thread);
/** execute the schedule thread
* parameters: