diff --git a/HISTORY b/HISTORY index 40b795c..5cd3ef0 100644 --- a/HISTORY +++ b/HISTORY @@ -5,6 +5,7 @@ Version 1.23 2015-11-02 * add function get_current_time_us and get_current_time_ms * mblock add stat function * add function get_sys_total_mem_size, ONLY support Linux and FreeBSD + * delay task can execute in a new thread Version 1.22 2015-10-10 * export php function: fastcommon_get_first_local_ip diff --git a/src/fast_mblock.h b/src/fast_mblock.h index 40f05ac..21eb0b0 100644 --- a/src/fast_mblock.h +++ b/src/fast_mblock.h @@ -60,8 +60,8 @@ struct fast_mblock_man fast_mblock_alloc_init_func alloc_init_func; bool need_lock; //if need mutex lock pthread_mutex_t lock; //the lock for read / write free node chain - struct fast_mblock_man *prev; //for manager - struct fast_mblock_man *next; //for manager + struct fast_mblock_man *prev; //for stat manager + struct fast_mblock_man *next; //for stat manager }; #define GET_BLOCK_SIZE(info) \ diff --git a/src/sched_thread.c b/src/sched_thread.c index 5fcc4ef..cc0e407 100644 --- a/src/sched_thread.c +++ b/src/sched_thread.c @@ -369,13 +369,16 @@ static void *sched_thread_entrance(void *args) "errno: %d, error info: %s", \ __LINE__, result, STRERROR(result)); } - usleep(1*1000); - for (i=1; !pCurrent->thread_running && i<100; i++) + else { - logDebug("file: "__FILE__", line: %d, " - "task_id: %d, waiting thread ready, count %d", - __LINE__, pCurrent->id, i); usleep(1*1000); + for (i=1; !pCurrent->thread_running && i<100; i++) + { + logDebug("file: "__FILE__", line: %d, " + "task_id: %d, waiting thread ready, count %d", + __LINE__, pCurrent->id, i); + usleep(1*1000); + } } } @@ -668,7 +671,7 @@ void sched_set_delay_params(const int slot_count, const int alloc_once) } int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, - void *func_args, const int delay_seconds) + void *func_args, const int delay_seconds, const bool new_thread) { FastDelayTask *task; if (!pContext->timer_init) @@ -686,6 +689,7 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, } task->task_func = task_func; task->func_args = func_args; + task->new_thread = new_thread; task->next = NULL; if (delay_seconds > 0) { @@ -712,10 +716,10 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, } int sched_add_delay_task(TaskFunc task_func, void *func_args, - const int delay_seconds) + const int delay_seconds, const bool new_thread) { return sched_add_delay_task_ex(schedule_context, task_func, - func_args, delay_seconds); + func_args, delay_seconds, new_thread); } static void sched_deal_task_queue(ScheduleContext *pContext) @@ -740,6 +744,34 @@ static void sched_deal_task_queue(ScheduleContext *pContext) } } +struct delay_thread_context { + ScheduleContext *schedule_context; + FastDelayTask *task; +}; + +static void *sched_call_delay_func(void *args) +{ + struct delay_thread_context *delay_context; + ScheduleContext *pContext; + FastDelayTask *task; + + delay_context = (struct delay_thread_context *)args; + task = delay_context->task; + pContext = delay_context->schedule_context; + + logDebug("file: "__FILE__", line: %d, " \ + "delay thread enter, task args: %p", __LINE__, task->func_args); + + task->thread_running = true; + task->task_func(task->func_args); + + logDebug("file: "__FILE__", line: %d, " \ + "delay thread exit, task args: %p", __LINE__, task->func_args); + + fast_mblock_free_object(&pContext->mblock, task); + return NULL; +} + static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head) { FastTimerEntry *entry; @@ -755,9 +787,43 @@ static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head) current->prev = current->next = NULL; //must set NULL because NOT in time wheel task = (FastDelayTask *)current; - task->task_func(task->func_args); - fast_mblock_free_object(&pContext->mblock, task); - } + + if (!task->new_thread) + { + task->task_func(task->func_args); + fast_mblock_free_object(&pContext->mblock, task); + } + else + { + struct delay_thread_context delay_context; + pthread_t tid; + int result; + int i; + + task->thread_running = false; + delay_context.task = task; + delay_context.schedule_context = pContext; + if ((result=pthread_create(&tid, NULL, + sched_call_delay_func, &delay_context)) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "create thread failed, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + } + else + { + usleep(1*1000); + for (i=1; !task->thread_running && i<100; i++) + { + logDebug("file: "__FILE__", line: %d, " + "task args: %p, waiting thread ready, count %d", + __LINE__, task->func_args, i); + usleep(1*1000); + } + } + } + } } static void sched_deal_delay_tasks(ScheduleContext *pContext) diff --git a/src/sched_thread.h b/src/sched_thread.h index b1df533..8f31456 100644 --- a/src/sched_thread.h +++ b/src/sched_thread.h @@ -48,6 +48,10 @@ typedef struct typedef struct fast_delay_task { FastTimerEntry timer; //must be first field + bool new_thread; //run in a new thread + + bool thread_running; //if new thread running, for internal use + TaskFunc task_func; //callback function void *func_args; //arguments pass to callback function struct fast_delay_task *next; @@ -112,9 +116,9 @@ int sched_del_entry(const int id); void sched_set_delay_params(const int slot_count, const int alloc_once); int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, - void *func_args, const int delay_seconds); + void *func_args, const int delay_seconds, const bool new_thread); int sched_add_delay_task(TaskFunc task_func, void *func_args, - const int delay_seconds); + const int delay_seconds, const bool new_thread); /** execute the schedule thread * parameters: