pthread_pool bug fix
parent
d17f2873fe
commit
ea449d773c
|
|
@ -10,12 +10,14 @@
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
#include "pthread_pool.h"
|
#include "pthread_pool.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
*the thread pool
|
*the thread pool
|
||||||
*/
|
*/
|
||||||
|
// global varalibale declared
|
||||||
static threadpool_info_t *pool;
|
static threadpool_info_t *pool;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -35,13 +37,19 @@ static void *callback_proxy(void *arg);
|
||||||
*/
|
*/
|
||||||
static int push2pool(thread_info_t *thread);
|
static int push2pool(thread_info_t *thread);
|
||||||
|
|
||||||
|
// proxy the thread running, use pthread_cond_wait to wait for arg to be update by
|
||||||
|
// other users that need to use this thread
|
||||||
static void *callback_proxy(void *arg)
|
static void *callback_proxy(void *arg)
|
||||||
{
|
{
|
||||||
thread_info_t* thread = (thread_info_t *) arg;
|
thread_info_t* thread = (thread_info_t *) arg;
|
||||||
|
// runs only when the pool->state is initialized
|
||||||
while(initialized == pool->state)
|
while(initialized == pool->state)
|
||||||
{
|
{
|
||||||
|
// run what the caller want to do
|
||||||
thread->func(thread->arg);
|
thread->func(thread->arg);
|
||||||
|
|
||||||
|
// if the state of thread pool is changed
|
||||||
|
// we termiate the execution of this thread by returning the result
|
||||||
if(pool == NULL || initialized != pool->state) break;
|
if(pool == NULL || initialized != pool->state) break;
|
||||||
|
|
||||||
pthread_mutex_lock(&thread->mutex_locker);
|
pthread_mutex_lock(&thread->mutex_locker);
|
||||||
|
|
@ -69,6 +77,7 @@ static void *callback_proxy(void *arg)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// push the free thread_info_t back to the thread pool
|
||||||
static int push2pool(thread_info_t *thread)
|
static int push2pool(thread_info_t *thread)
|
||||||
{
|
{
|
||||||
int result = -1;
|
int result = -1;
|
||||||
|
|
@ -81,10 +90,14 @@ static int push2pool(thread_info_t *thread)
|
||||||
pool->current_index++;
|
pool->current_index++;
|
||||||
result = 0;
|
result = 0;
|
||||||
|
|
||||||
|
// there is new thread for use, call phtread_cond_signal to
|
||||||
|
// notice the caller
|
||||||
pthread_cond_signal( &pool->run_locker);
|
pthread_cond_signal( &pool->run_locker);
|
||||||
|
|
||||||
if( pool->current_index >= pool->current_size )
|
if( pool->current_index >= pool->current_size )
|
||||||
{
|
{
|
||||||
|
// current_index reach the max
|
||||||
|
// notice other thread that I am full
|
||||||
pthread_cond_signal( &pool->full_locker );
|
pthread_cond_signal( &pool->full_locker );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -94,6 +107,7 @@ static int push2pool(thread_info_t *thread)
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initialize a thread pool of [size] for later use
|
||||||
int threadpool_init(int size)
|
int threadpool_init(int size)
|
||||||
{
|
{
|
||||||
if(0 >= size)
|
if(0 >= size)
|
||||||
|
|
@ -111,11 +125,12 @@ int threadpool_init(int size)
|
||||||
pool->total_size = size;
|
pool->total_size = size;
|
||||||
pool->current_size = 0;
|
pool->current_size = 0;
|
||||||
pool->current_index = 0;
|
pool->current_index = 0;
|
||||||
|
// initialize sync data structures
|
||||||
pthread_mutex_init(&pool->mutex_locker,NULL);
|
pthread_mutex_init(&pool->mutex_locker,NULL);
|
||||||
pthread_cond_init(&pool->run_locker,NULL);
|
pthread_cond_init(&pool->run_locker,NULL);
|
||||||
pthread_cond_init(&pool->empty_locker,NULL);
|
pthread_cond_init(&pool->empty_locker,NULL);
|
||||||
pthread_cond_init(&pool->full_locker,NULL);
|
pthread_cond_init(&pool->full_locker,NULL);
|
||||||
|
// initialize a list of thread_info_t structs
|
||||||
pool->list = (thread_info_t **) malloc(sizeof(thread_info_t*) * size);
|
pool->list = (thread_info_t **) malloc(sizeof(thread_info_t*) * size);
|
||||||
if(NULL == pool->list)
|
if(NULL == pool->list)
|
||||||
{
|
{
|
||||||
|
|
@ -123,6 +138,7 @@ int threadpool_init(int size)
|
||||||
pthread_cond_destroy(&pool->empty_locker);
|
pthread_cond_destroy(&pool->empty_locker);
|
||||||
pthread_cond_destroy(&pool->full_locker);
|
pthread_cond_destroy(&pool->full_locker);
|
||||||
pthread_mutex_destroy(&pool->mutex_locker);
|
pthread_mutex_destroy(&pool->mutex_locker);
|
||||||
|
// free the memory pointed by pool pointer
|
||||||
free(pool);
|
free(pool);
|
||||||
return -2;
|
return -2;
|
||||||
}
|
}
|
||||||
|
|
@ -131,6 +147,7 @@ int threadpool_init(int size)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// run the callback within the thread pool, arg is its var
|
||||||
int threadpool_run(callback func,void *arg)
|
int threadpool_run(callback func,void *arg)
|
||||||
{
|
{
|
||||||
if(NULL == pool)
|
if(NULL == pool)
|
||||||
|
|
@ -151,11 +168,14 @@ int threadpool_run(callback func,void *arg)
|
||||||
//current size is >= the max pool size and all thread are busy now
|
//current size is >= the max pool size and all thread are busy now
|
||||||
while(pool->current_index <= 0 && pool->current_size >= pool->total_size)
|
while(pool->current_index <= 0 && pool->current_size >= pool->total_size)
|
||||||
{
|
{
|
||||||
|
// wait on the run locker when there is no spare thread for the caller to use
|
||||||
pthread_cond_wait(&pool->run_locker,&pool->mutex_locker);
|
pthread_cond_wait(&pool->run_locker,&pool->mutex_locker);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(0 >= pool->current_index)
|
if(0 >= pool->current_index)
|
||||||
{
|
{
|
||||||
|
// when the current_index is smaler or equal to 0
|
||||||
|
// we create a new thread_info_t data structure and put it into the pool later
|
||||||
thread_info_t * thread = (thread_info_t *) malloc(sizeof(thread_info_t));
|
thread_info_t * thread = (thread_info_t *) malloc(sizeof(thread_info_t));
|
||||||
if(NULL == thread)
|
if(NULL == thread)
|
||||||
{
|
{
|
||||||
|
|
@ -164,6 +184,7 @@ int threadpool_run(callback func,void *arg)
|
||||||
}
|
}
|
||||||
memset(thread,0,sizeof(thread_info_t));
|
memset(thread,0,sizeof(thread_info_t));
|
||||||
|
|
||||||
|
// create thread and set it to detached mode(it will end by itself when it finishes)
|
||||||
pthread_mutex_init(&thread->mutex_locker,NULL);
|
pthread_mutex_init(&thread->mutex_locker,NULL);
|
||||||
pthread_cond_init(&thread->run_locker,NULL);
|
pthread_cond_init(&thread->run_locker,NULL);
|
||||||
pthread_attr_t attr;
|
pthread_attr_t attr;
|
||||||
|
|
@ -206,19 +227,29 @@ int threadpool_run(callback func,void *arg)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int threadpool_free()
|
// destory the thread pool
|
||||||
|
int threadpool_destroy()
|
||||||
{
|
{
|
||||||
if(NULL == pool) return 0;
|
if(NULL == pool) return 0;
|
||||||
|
|
||||||
pthread_mutex_lock( &pool->mutex_locker);
|
pthread_mutex_lock( &pool->mutex_locker);
|
||||||
|
|
||||||
|
// when current_index is biger or equals to current_size
|
||||||
|
// this means all the job in the thread pool is finished
|
||||||
|
// which means we can do the free related jobs
|
||||||
if( pool->current_index < pool->current_size )
|
if( pool->current_index < pool->current_size )
|
||||||
{
|
{
|
||||||
|
// if current_index < current_size, wait for current_index to reach current_size
|
||||||
|
// then change the state of the pool, thus stopping the caller to put more task
|
||||||
|
// into the thread pool, then it use pthread_cond_signal to cause all worker thread
|
||||||
|
// to exit.
|
||||||
pthread_cond_wait( &pool->full_locker, &pool->mutex_locker );
|
pthread_cond_wait( &pool->full_locker, &pool->mutex_locker );
|
||||||
}
|
}
|
||||||
|
|
||||||
pool->state = uninstalling;
|
pool->state = uninstalling;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
|
// cause all the thread to run to its end
|
||||||
for( i = 0; i < pool->current_index; i++ )
|
for( i = 0; i < pool->current_index; i++ )
|
||||||
{
|
{
|
||||||
thread_info_t *thread = pool->list[i];
|
thread_info_t *thread = pool->list[i];
|
||||||
|
|
@ -228,6 +259,8 @@ int threadpool_free()
|
||||||
pthread_mutex_unlock ( &thread->mutex_locker );
|
pthread_mutex_unlock ( &thread->mutex_locker );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait for all threads to exit, when all threads exit, it will
|
||||||
|
// issue a signal of empty_locker
|
||||||
if(0 < pool->current_size)
|
if(0 < pool->current_size)
|
||||||
{
|
{
|
||||||
pthread_cond_wait( &pool->empty_locker, &pool->mutex_locker);
|
pthread_cond_wait( &pool->empty_locker, &pool->mutex_locker);
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,9 @@ typedef void (*callback)(void *);
|
||||||
/*
|
/*
|
||||||
* the thread pool state
|
* the thread pool state
|
||||||
* member:
|
* member:
|
||||||
* uninit : not initialize the thread pool.
|
* uninitialized : not initialize the thread pool.
|
||||||
* initing : initializing the thread pool.
|
* initializing : initializing the thread pool.
|
||||||
* using : the pool can use.
|
* initialized : the pool can use.
|
||||||
* uninstalling : uninstalling the thread pool.
|
* uninstalling : uninstalling the thread pool.
|
||||||
* uninstalled : uninstall the thread pool is over.
|
* uninstalled : uninstall the thread pool is over.
|
||||||
*/
|
*/
|
||||||
|
|
@ -61,7 +61,7 @@ typedef struct thread_info
|
||||||
* mutex_locker : the mutex locker for the thread operation.
|
* mutex_locker : the mutex locker for the thread operation.
|
||||||
* run_locker : the locker for noticing the thread do running or waitting.
|
* run_locker : the locker for noticing the thread do running or waitting.
|
||||||
* full_locker : the locker notice the thread is stoping when free the thread pool and the pool is not full .
|
* full_locker : the locker notice the thread is stoping when free the thread pool and the pool is not full .
|
||||||
* empry_Locker : the locker notice the thread waitting for the busy thread work over,then do with the thread.
|
* empty_Locker : the locker notice the thread waitting for the busy thread work over,then do with the thread.
|
||||||
* state : the pool's current state.
|
* state : the pool's current state.
|
||||||
* total_size : the pool max size;
|
* total_size : the pool max size;
|
||||||
* current_size : the thread count for the current pool ;
|
* current_size : the thread count for the current pool ;
|
||||||
|
|
@ -80,6 +80,10 @@ typedef struct threadpool_info
|
||||||
int current_index;
|
int current_index;
|
||||||
}threadpool_info_t;
|
}threadpool_info_t;
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* initialize the thread pool
|
* initialize the thread pool
|
||||||
* parameters:
|
* parameters:
|
||||||
|
|
@ -112,5 +116,8 @@ int threadpool_run(callback func,void *arg);
|
||||||
*/
|
*/
|
||||||
int threadpool_destroy();
|
int threadpool_destroy();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /* PTHREAD_POOL_H_ */
|
#endif /* PTHREAD_POOL_H_ */
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue