From ea449d773cc6e10785cc9d4bdba2a9377cd17d2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=B8=E8=A7=81=E6=AC=A2?= Date: Sun, 7 Feb 2016 13:04:01 +0800 Subject: [PATCH] pthread_pool bug fix --- src/pthread_pool.c | 37 +++++++++++++++++++++++++++++++++++-- src/pthread_pool.h | 15 +++++++++++---- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/pthread_pool.c b/src/pthread_pool.c index 2fa3966..308385e 100644 --- a/src/pthread_pool.c +++ b/src/pthread_pool.c @@ -10,12 +10,14 @@ #include #include #include +#include #include "pthread_pool.h" /* *the thread pool */ + // global varalibale declared static threadpool_info_t *pool; /* @@ -35,13 +37,19 @@ static void *callback_proxy(void *arg); */ static int push2pool(thread_info_t *thread); +// proxy the thread running, use pthread_cond_wait to wait for arg to be update by +// other users that need to use this thread static void *callback_proxy(void *arg) { thread_info_t* thread = (thread_info_t *) arg; + // runs only when the pool->state is initialized while(initialized == pool->state) { + // run what the caller want to do thread->func(thread->arg); + // if the state of thread pool is changed + // we termiate the execution of this thread by returning the result if(pool == NULL || initialized != pool->state) break; pthread_mutex_lock(&thread->mutex_locker); @@ -69,6 +77,7 @@ static void *callback_proxy(void *arg) return NULL; } +// push the free thread_info_t back to the thread pool static int push2pool(thread_info_t *thread) { int result = -1; @@ -81,10 +90,14 @@ static int push2pool(thread_info_t *thread) pool->current_index++; result = 0; + // there is new thread for use, call phtread_cond_signal to + // notice the caller pthread_cond_signal( &pool->run_locker); if( pool->current_index >= pool->current_size ) { + // current_index reach the max + // notice other thread that I am full pthread_cond_signal( &pool->full_locker ); } } @@ -94,6 +107,7 @@ static int push2pool(thread_info_t *thread) return result; } +// initialize a thread pool of [size] for later use int threadpool_init(int size) { if(0 >= size) @@ -111,11 +125,12 @@ int threadpool_init(int size) pool->total_size = size; pool->current_size = 0; pool->current_index = 0; + // initialize sync data structures pthread_mutex_init(&pool->mutex_locker,NULL); pthread_cond_init(&pool->run_locker,NULL); pthread_cond_init(&pool->empty_locker,NULL); pthread_cond_init(&pool->full_locker,NULL); - + // initialize a list of thread_info_t structs pool->list = (thread_info_t **) malloc(sizeof(thread_info_t*) * size); if(NULL == pool->list) { @@ -123,6 +138,7 @@ int threadpool_init(int size) pthread_cond_destroy(&pool->empty_locker); pthread_cond_destroy(&pool->full_locker); pthread_mutex_destroy(&pool->mutex_locker); + // free the memory pointed by pool pointer free(pool); return -2; } @@ -131,6 +147,7 @@ int threadpool_init(int size) return 0; } +// run the callback within the thread pool, arg is its var int threadpool_run(callback func,void *arg) { if(NULL == pool) @@ -151,11 +168,14 @@ int threadpool_run(callback func,void *arg) //current size is >= the max pool size and all thread are busy now while(pool->current_index <= 0 && pool->current_size >= pool->total_size) { + // wait on the run locker when there is no spare thread for the caller to use pthread_cond_wait(&pool->run_locker,&pool->mutex_locker); } if(0 >= pool->current_index) { + // when the current_index is smaler or equal to 0 + // we create a new thread_info_t data structure and put it into the pool later thread_info_t * thread = (thread_info_t *) malloc(sizeof(thread_info_t)); if(NULL == thread) { @@ -164,6 +184,7 @@ int threadpool_run(callback func,void *arg) } memset(thread,0,sizeof(thread_info_t)); + // create thread and set it to detached mode(it will end by itself when it finishes) pthread_mutex_init(&thread->mutex_locker,NULL); pthread_cond_init(&thread->run_locker,NULL); pthread_attr_t attr; @@ -206,19 +227,29 @@ int threadpool_run(callback func,void *arg) return 0; } -int threadpool_free() +// destory the thread pool +int threadpool_destroy() { if(NULL == pool) return 0; pthread_mutex_lock( &pool->mutex_locker); + // when current_index is biger or equals to current_size + // this means all the job in the thread pool is finished + // which means we can do the free related jobs if( pool->current_index < pool->current_size ) { + // if current_index < current_size, wait for current_index to reach current_size + // then change the state of the pool, thus stopping the caller to put more task + // into the thread pool, then it use pthread_cond_signal to cause all worker thread + // to exit. pthread_cond_wait( &pool->full_locker, &pool->mutex_locker ); } pool->state = uninstalling; int i = 0; + + // cause all the thread to run to its end for( i = 0; i < pool->current_index; i++ ) { thread_info_t *thread = pool->list[i]; @@ -228,6 +259,8 @@ int threadpool_free() pthread_mutex_unlock ( &thread->mutex_locker ); } + // wait for all threads to exit, when all threads exit, it will + // issue a signal of empty_locker if(0 < pool->current_size) { pthread_cond_wait( &pool->empty_locker, &pool->mutex_locker); diff --git a/src/pthread_pool.h b/src/pthread_pool.h index e6da86f..98a5a62 100644 --- a/src/pthread_pool.h +++ b/src/pthread_pool.h @@ -20,9 +20,9 @@ typedef void (*callback)(void *); /* * the thread pool state * member: - * uninit : not initialize the thread pool. - * initing : initializing the thread pool. - * using : the pool can use. + * uninitialized : not initialize the thread pool. + * initializing : initializing the thread pool. + * initialized : the pool can use. * uninstalling : uninstalling the thread pool. * uninstalled : uninstall the thread pool is over. */ @@ -61,7 +61,7 @@ typedef struct thread_info * mutex_locker : the mutex locker for the thread operation. * run_locker : the locker for noticing the thread do running or waitting. * full_locker : the locker notice the thread is stoping when free the thread pool and the pool is not full . - * empry_Locker : the locker notice the thread waitting for the busy thread work over,then do with the thread. + * empty_Locker : the locker notice the thread waitting for the busy thread work over,then do with the thread. * state : the pool's current state. * total_size : the pool max size; * current_size : the thread count for the current pool ; @@ -80,6 +80,10 @@ typedef struct threadpool_info int current_index; }threadpool_info_t; +#ifdef __cplusplus +extern "C" { +#endif + /* * initialize the thread pool * parameters: @@ -112,5 +116,8 @@ int threadpool_run(callback func,void *arg); */ int threadpool_destroy(); +#ifdef __cplusplus +} +#endif #endif /* PTHREAD_POOL_H_ */