diff --git a/.gitignore b/.gitignore index be3d730..034e85a 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,7 @@ src/tests/test_pipe src/tests/test_atomic src/tests/test_file_write_hole src/tests/test_file_lock +src/tests/test_thread_pool # other *.swp diff --git a/HISTORY b/HISTORY index 3a9b60c..40174c1 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-07-10 +Version 1.44 2020-07-20 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -33,6 +33,7 @@ Version 1.44 2020-07-10 * add files: fc_queue.[hc] * add files: fc_memory.[hc] * add files: shared_buffer.[hc] + * add files: thread_pool.[hc] Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/src/Makefile.in b/src/Makefile.in index 8c2cdfe..23993ec 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -16,7 +16,7 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \ char_converter.lo char_convert_loader.lo common_blocked_queue.lo \ multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \ json_parser.lo buffered_file_writer.lo server_id_func.lo \ - fc_queue.lo fc_memory.lo shared_buffer.lo + fc_queue.lo fc_memory.lo shared_buffer.lo thread_pool.lo FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ logger.o sockopt.o base64.o sched_thread.o \ @@ -29,7 +29,7 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ char_converter.o char_convert_loader.o common_blocked_queue.o \ multi_socket_client.o skiplist_set.o uniq_skiplist.o \ json_parser.o buffered_file_writer.o server_id_func.o \ - fc_queue.o fc_memory.o shared_buffer.o + fc_queue.o fc_memory.o shared_buffer.o thread_pool.o HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ @@ -43,7 +43,7 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ char_convert_loader.h common_blocked_queue.h \ multi_socket_client.h skiplist_set.h uniq_skiplist.h \ fc_list.h json_parser.h buffered_file_writer.h server_id_func.h \ - fc_queue.h fc_memory.h shared_buffer.h + fc_queue.h fc_memory.h shared_buffer.h thread_pool.h ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) diff --git a/src/pthread_pool.c b/src/pthread_pool.c deleted file mode 100644 index 951846b..0000000 --- a/src/pthread_pool.c +++ /dev/null @@ -1,289 +0,0 @@ -/** -* Copyright (C) 2008 Seapeak.Xu / xvhfeng@gmail.com -* -* FastLib may be copied only under the terms of the GNU General -* Public License V3, which may be found in the FastLib source kit. -* Please visit the FastLib Home Page http://www.fastken.com/ for more detail. -**/ - -#include -#include -#include -#include -#include - -#include "pthread_pool.h" - -/* - *the thread pool - */ - // global varalibale declared -static threadpool_info_t *pool; - -/* - * the thread callback function proxy - * parameters: - * arg:the thread callback function parameter - */ -static void *callback_proxy(void *arg); - -/* - * push the thread into the pool - * parameters: - * thread:the thread will push into the poolbool - * return: - * 0:success - * >0 : fail - */ -static int push2pool(thread_info_t *thread); - -// proxy the thread running, use pthread_cond_wait to wait for arg to be update by -// other users that need to use this thread -static void *callback_proxy(void *arg) -{ - thread_info_t* thread = (thread_info_t *) arg; - // runs only when the pool->state is initialized - while(initialized == pool->state) - { - // run what the caller want to do - thread->func(thread->arg); - - // if the state of thread pool is changed - // we termiate the execution of this thread by returning the result - if(pool == NULL || initialized != pool->state) break; - - pthread_mutex_lock(&thread->mutex_locker); - - if(0 == push2pool(thread)) - { - pthread_cond_wait(&thread->run_locker,&thread->mutex_locker); - pthread_mutex_unlock(&thread->mutex_locker); - } - else - { - pthread_mutex_unlock( &thread->mutex_locker ); - pthread_cond_destroy( &thread->run_locker ); - pthread_mutex_destroy( &thread->mutex_locker ); - - free( thread ); - break; - } - } - - pthread_mutex_lock(&pool->mutex_locker); - pool->current_size --; - if(0 >= pool->current_size) pthread_cond_signal(&pool->empty_locker); - pthread_mutex_unlock(&pool->mutex_locker); - return NULL; -} - -// push the free thread_info_t back to the thread pool -static int push2pool(thread_info_t *thread) -{ - int result = -1; - do - { - pthread_mutex_lock(&pool->mutex_locker); - if( pool->current_index < pool->total_size ) - { - pool->list[ pool->current_index ] = thread; - pool->current_index++; - result = 0; - - // there is new thread for use, call phtread_cond_signal to - // notice the caller - pthread_cond_signal( &pool->run_locker); - - if( pool->current_index >= pool->current_size ) - { - // current_index reach the max - // notice other thread that I am full - pthread_cond_signal( &pool->full_locker ); - } - } - }while(0); - pthread_mutex_unlock(&pool->mutex_locker); - - return result; -} - -// initialize a thread pool of [size] for later use -int threadpool_init(int size) -{ - if(0 >= size) - { - return -1; - } - - pool = (threadpool_info_t *) malloc(sizeof(threadpool_info_t)); - if(NULL == pool) - { - return -2; - } - memset(pool,0,sizeof(threadpool_info_t)); - pool->state = initializing; - pool->total_size = size; - pool->current_size = 0; - pool->current_index = 0; - // initialize sync data structures - pthread_mutex_init(&pool->mutex_locker,NULL); - pthread_cond_init(&pool->run_locker,NULL); - pthread_cond_init(&pool->empty_locker,NULL); - pthread_cond_init(&pool->full_locker,NULL); - // initialize a list of thread_info_t structs - pool->list = (thread_info_t **) malloc(sizeof(thread_info_t*) * size); - if(NULL == pool->list) - { - pthread_cond_destroy(&pool->run_locker); - pthread_cond_destroy(&pool->empty_locker); - pthread_cond_destroy(&pool->full_locker); - pthread_mutex_destroy(&pool->mutex_locker); - // free the memory pointed by pool pointer - free(pool); - return -2; - } - - pool->state = initialized; - return 0; -} - -// run the callback within the thread pool, arg is its var -int threadpool_run(callback func,void *arg) -{ - if(NULL == pool) - { - return -1; - } - - int result = 0; - do - { - pthread_mutex_lock(&pool->mutex_locker); - if(NULL == pool || initialized != pool->state) //the pool cannot use - { - result = -1; - break; - } - - //current size is >= the max pool size and all thread are busy now - while(pool->current_index <= 0 && pool->current_size >= pool->total_size) - { - // wait on the run locker when there is no spare thread for the caller to use - pthread_cond_wait(&pool->run_locker,&pool->mutex_locker); - } - - if(0 >= pool->current_index) - { - // when the current_index is smaler or equal to 0 - // we create a new thread_info_t data structure and put it into the pool later - thread_info_t * thread = (thread_info_t *) malloc(sizeof(thread_info_t)); - if(NULL == thread) - { - result = -2; - break; - } - memset(thread,0,sizeof(thread_info_t)); - - // create thread and set it to detached mode(it will end by itself when it finishes) - pthread_mutex_init(&thread->mutex_locker,NULL); - pthread_cond_init(&thread->run_locker,NULL); - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); - - thread->arg = arg; - thread->func = func; - - if(0 == pthread_create(&thread->id,&attr,callback_proxy,thread)) - { - pool->current_size ++; - } - else - { - result = -3; - pthread_mutex_destroy(&thread->mutex_locker); - pthread_cond_destroy(&thread->run_locker); - free(thread); - } - break; - } - else - { - pool->current_index --;//because the array begin with 0 - thread_info_t *thread = pool->list[ pool->current_index ]; - pool->list[ pool->current_index ] = NULL; - - thread->func = func; - thread->arg = arg; - - pthread_mutex_lock( &thread->mutex_locker ); - pthread_cond_signal( &thread->run_locker ) ; - pthread_mutex_unlock ( &thread->mutex_locker ); - } - }while(0); - pthread_mutex_unlock(&pool->mutex_locker); - return result; - - return 0; -} - -// destory the thread pool -int threadpool_destroy() -{ - if(NULL == pool) return 0; - - pthread_mutex_lock( &pool->mutex_locker); - - // when current_index is biger or equals to current_size - // this means all the job in the thread pool is finished - // which means we can do the free related jobs - if( pool->current_index < pool->current_size ) - { - // if current_index < current_size, wait for current_index to reach current_size - // then change the state of the pool, thus stopping the caller to put more task - // into the thread pool, then it use pthread_cond_signal to cause all worker thread - // to exit. - pthread_cond_wait( &pool->full_locker, &pool->mutex_locker ); - } - - pool->state = uninstalling; - int i = 0; - - // cause all the thread to run to its end - for( i = 0; i < pool->current_index; i++ ) - { - thread_info_t *thread = pool->list[i]; - - pthread_mutex_lock( &thread->mutex_locker ); - pthread_cond_signal( &thread->run_locker ) ; - pthread_mutex_unlock ( &thread->mutex_locker ); - } - - // wait for all threads to exit, when all threads exit, it will - // issue a signal of empty_locker - if(0 < pool->current_size) - { - pthread_cond_wait( &pool->empty_locker, &pool->mutex_locker); - } - - for( i = 0; i < pool->current_index; i++ ) - { - free( pool->list[ i ] ); - pool->list[ i ] = NULL; - } - - pthread_mutex_unlock( &pool->mutex_locker ); - - pool->current_index = 0; - - pthread_mutex_destroy( &pool->mutex_locker ); - pthread_cond_destroy( &pool->run_locker ); - pthread_cond_destroy( &pool->full_locker ); - pthread_cond_destroy( &pool->empty_locker ); - - free( pool->list ); - pool->list = NULL; - free( pool); - pool = NULL; - return 0; -} diff --git a/src/pthread_pool.h b/src/pthread_pool.h deleted file mode 100644 index f2796fa..0000000 --- a/src/pthread_pool.h +++ /dev/null @@ -1,123 +0,0 @@ -/** -* Copyright (C) 2008 Seapeak.Xu / xvhfeng@gmail.com -* -* FastLib may be copied only under the terms of the GNU General -* Public License V3, which may be found in the FastLib source kit. -* Please visit the FastLib Home Page http://www.fastken.com/ for more detail. -**/ - -#ifndef PTHREAD_POOL_H_ -#define PTHREAD_POOL_H_ - -#include - -/* - * define the callback function type of thread - */ -typedef void (*callback)(void *); - - -/* - * the thread pool state - * member: - * uninitialized : not initialize the thread pool. - * initializing : initializing the thread pool. - * initialized : the pool can use. - * uninstalling : uninstalling the thread pool. - * uninstalled : uninstall the thread pool is over. - */ -typedef enum threadpool_state -{ - uninitialized, - initializing, - initialized, - uninstalling, - uninstalled, -}thread_state_t; - - -/* - * define the thread type which in the pool - * members: - * id : the thread id - * mutex_locker : the mutext locker - * run_locker : the locker for noticing the thread do running or waitting - * func : the callback function for thread - * arg : the callback parameter - */ -typedef struct thread_info -{ - pthread_t id; - pthread_mutex_t mutex_locker; - pthread_cond_t run_locker; - callback func; - void *arg; -}thread_info_t; - -/* - * the structure for the thread pool - * member: - * list : the initialazed thread list - * mutex_locker : the mutex locker for the thread operation. - * run_locker : the locker for noticing the thread do running or waitting. - * full_locker : the locker notice the thread is stoping when free the thread pool and the pool is not full . - * empty_Locker : the locker notice the thread waitting for the busy thread work over,then do with the thread. - * state : the pool's current state. - * total_size : the pool max size; - * current_size : the thread count for the current pool ; - * current_index : the busy thread in the pool index. - */ -typedef struct threadpool_info -{ - thread_info_t **list; - pthread_mutex_t mutex_locker; - pthread_cond_t run_locker; - pthread_cond_t full_locker; - pthread_cond_t empty_locker; - thread_state_t state; - int total_size; - int current_size; - int current_index; -}threadpool_info_t; - -#ifdef __cplusplus -extern "C" { -#endif - -/* - * initialize the thread pool - * parameters: - * size : thread pool max size - * return: - * 0:initialize pool success; - * -1:the size parameter is less 0; - * -2:initialize pool is fail,malloc memory for pool or pool->list is error; - */ -int threadpool_init(int size); - -/* - * run the function with the thread from pool - * parameter: - * func:the thread callback function - * arg:the parameter of callback function - * return: - * 0 : success - * -1: the pool is NULL; - * -2 : malloc memory for thread is error; - * -3 : create thread is error; - */ -int threadpool_run(callback func,void *arg); - -/* - * free and destroy the thread pool memory - * return: - * 0 : success - * less 0 : fail - */ -int threadpool_destroy(); - -#ifdef __cplusplus -} -#endif - -#endif /* PTHREAD_POOL_H_ */ diff --git a/src/tests/Makefile b/src/tests/Makefile index 2cca679..dec1334 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile @@ -8,7 +8,8 @@ ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blo test_id_generator test_ini_parser test_char_convert test_char_convert_loader \ test_logger test_skiplist_set test_crc32 test_thourands_seperator test_sched_thread \ test_json_parser test_pthread_lock test_uniq_skiplist test_split_string \ - test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock + test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock \ + test_thread_pool all: $(ALL_PRGS) .c: diff --git a/src/tests/test_thread_pool.c b/src/tests/test_thread_pool.c new file mode 100644 index 0000000..10f0401 --- /dev/null +++ b/src/tests/test_thread_pool.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/logger.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/sched_thread.h" +#include "fastcommon/pthread_func.h" +#include "fastcommon/ini_file_reader.h" +#include "fastcommon/thread_pool.h" + +#define LOOP_COUNT (10 * 1000 * 1000) + +static volatile int counter = 0; +static volatile int64_t total = 0; + +#define TASK_COUNT 10 + +void thread2_func(void *args) +{ + int i; + for (i=0; i +#include +#include +#include +#include +#include "pthread_func.h" +#include "sched_thread.h" +#include "fc_memory.h" +#include "thread_pool.h" + +static void *thread_entrance(void *arg) +{ + FCThreadInfo *thread; + FCThreadPool *pool; + struct timespec ts; + fc_thread_pool_callback callback; + time_t last_run_time; + bool running; + bool notify; + int idle_count; + + thread = (FCThreadInfo *)arg; + pool = thread->pool; + + PTHREAD_MUTEX_LOCK(&thread->lock); + thread->inited = true; + PTHREAD_MUTEX_UNLOCK(&thread->lock); + + PTHREAD_MUTEX_LOCK(&pool->lock); + pool->thread_counts.running++; + logInfo("tindex: %d start, tcount: %d", + thread->index, pool->thread_counts.running); + PTHREAD_MUTEX_UNLOCK(&pool->lock); + + running = true; + ts.tv_nsec = 0; + last_run_time = get_current_time(); + while (running && *pool->pcontinue_flag) { + + PTHREAD_MUTEX_LOCK(&thread->lock); + if (thread->func == NULL) { + ts.tv_sec = get_current_time() + 2; + pthread_cond_timedwait(&thread->cond, &thread->lock, &ts); + } + + callback = thread->func; + if (callback == NULL) { + if (pool->max_idle_time > 0 && get_current_time() - + last_run_time > pool->max_idle_time) + { + PTHREAD_MUTEX_LOCK(&pool->lock); + idle_count = pool->thread_counts.running - + __sync_add_and_fetch(&pool->thread_counts.dealing, 0); + + if (idle_count > pool->min_idle_count) { + thread->inited = false; + pool->thread_counts.running--; + running = false; + } + PTHREAD_MUTEX_UNLOCK(&pool->lock); + } + } else { + thread->func = NULL; + } + PTHREAD_MUTEX_UNLOCK(&thread->lock); + + if (callback != NULL) { + __sync_add_and_fetch(&pool->thread_counts.dealing, 1); + callback(thread->arg); + last_run_time = get_current_time(); + __sync_sub_and_fetch(&pool->thread_counts.dealing, 1); + + PTHREAD_MUTEX_LOCK(&pool->lock); + notify = (pool->freelist == NULL); + thread->next = pool->freelist; + pool->freelist = thread; + if (notify) { + pthread_cond_signal(&pool->cond); + } + PTHREAD_MUTEX_UNLOCK(&pool->lock); + } + } + + if (running) { + PTHREAD_MUTEX_LOCK(&thread->lock); + thread->inited = false; + PTHREAD_MUTEX_UNLOCK(&thread->lock); + + PTHREAD_MUTEX_LOCK(&pool->lock); + pool->thread_counts.running--; + PTHREAD_MUTEX_UNLOCK(&pool->lock); + } + + PTHREAD_MUTEX_LOCK(&pool->lock); + logInfo("tindex: %d exit, tcount: %d", + thread->index, pool->thread_counts.running); + PTHREAD_MUTEX_UNLOCK(&pool->lock); + + return NULL; +} + +static int init_pthread_lock_cond(pthread_mutex_t *lock, pthread_cond_t *cond) +{ + int result; + if ((result=init_pthread_lock(lock)) != 0) { + logError("file: "__FILE__", line: %d, " + "init_pthread_lock fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + if ((result=pthread_cond_init(cond, NULL)) != 0) { + logError("file: "__FILE__", line: %d, " + "pthread_cond_init fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + return 0; +} + +static int thread_pool_alloc_init(FCThreadPool *pool) +{ + int result; + int bytes; + FCThreadInfo *thread; + FCThreadInfo *end; + FCThreadInfo *last; + + bytes = sizeof(FCThreadInfo) * pool->thread_counts.limit; + pool->threads = (FCThreadInfo *)fc_malloc(bytes); + if (pool->threads == NULL) { + return ENOMEM; + } + memset(pool->threads, 0, bytes); + + end = pool->threads + pool->thread_counts.limit; + for (thread=pool->threads; threadpool = pool; + thread->index = thread - pool->threads; + if ((result=init_pthread_lock_cond(&thread->lock, + &thread->cond)) != 0) + { + return result; + } + } + + last = end - 1; + pool->freelist = pool->threads; + for (thread=pool->threads; threadnext = thread + 1; + } + + if (pool->min_idle_count > 0) { + end = pool->threads + pool->min_idle_count; + for (thread=pool->threads; threadinited = true; + if ((result=fc_create_thread(&thread->tid, thread_entrance, + thread, pool->stack_size)) != 0) + { + return result; + } + } + } + + return 0; +} + +int fc_thread_pool_init(FCThreadPool *pool, const int limit, + const int stack_size, const int max_idle_time, + const int min_idle_count, bool * volatile pcontinue_flag) +{ + int result; + + if ((result=init_pthread_lock_cond(&pool->lock, &pool->cond)) != 0) { + return result; + } + + pool->stack_size = stack_size; + pool->max_idle_time = max_idle_time; + if (min_idle_count > limit) { + pool->min_idle_count = limit; + } else { + pool->min_idle_count = min_idle_count; + } + pool->thread_counts.limit = limit; + pool->thread_counts.running = 0; + pool->thread_counts.dealing = 0; + pool->pcontinue_flag = pcontinue_flag; + + return thread_pool_alloc_init(pool); +} + +void fc_thread_pool_destroy(FCThreadPool *pool) +{ + +} + +int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func, + void *arg) +{ + FCThreadInfo *thread; + struct timespec ts; + int result; + + if (func == NULL) { + return EINVAL; + } + + thread = NULL; + ts.tv_nsec = 0; + PTHREAD_MUTEX_LOCK(&pool->lock); + while (*pool->pcontinue_flag) { + if (pool->freelist != NULL) { + thread = pool->freelist; + pool->freelist = pool->freelist->next; + break; + } + + ts.tv_sec = get_current_time() + 2; + pthread_cond_timedwait(&pool->cond, &pool->lock, &ts); + } + PTHREAD_MUTEX_UNLOCK(&pool->lock); + + if (thread == NULL) { + return EINTR; + } + + PTHREAD_MUTEX_LOCK(&thread->lock); + thread->func = func; + thread->arg = arg; + if (!thread->inited) { + result = fc_create_thread(&thread->tid, thread_entrance, + thread, pool->stack_size); + } else { + pthread_cond_signal(&thread->cond); + result = 0; + } + PTHREAD_MUTEX_UNLOCK(&thread->lock); + + return result; +} diff --git a/src/thread_pool.h b/src/thread_pool.h new file mode 100644 index 0000000..27be837 --- /dev/null +++ b/src/thread_pool.h @@ -0,0 +1,59 @@ +#ifndef FC_THREAD_POOL_H_ +#define FC_THREAD_POOL_H_ + +#include +#include +#include "fast_mblock.h" + +typedef void (*fc_thread_pool_callback)(void *arg); + +struct fc_thread_pool; +typedef struct fc_thread_info +{ + volatile int inited; + int index; + pthread_t tid; + pthread_mutex_t lock; + pthread_cond_t cond; + fc_thread_pool_callback func; + void *arg; + struct fc_thread_pool *pool; + struct fc_thread_info *next; +} FCThreadInfo; + +typedef struct fc_thread_pool +{ + FCThreadInfo *threads; //all thread info + FCThreadInfo *freelist; + pthread_mutex_t lock; + pthread_cond_t cond; + + int stack_size; + int max_idle_time; //in seconds + int min_idle_count; + struct { + int limit; + volatile int running; //running thread count + volatile int dealing; //dealing task thread count + } thread_counts; + bool * volatile pcontinue_flag; +} FCThreadPool; + +#ifdef __cplusplus +extern "C" { +#endif + +int fc_thread_pool_init(FCThreadPool *pool, const int limit, + const int stack_size, const int max_idle_time, + const int min_idle_count, bool * volatile pcontinue_flag); + +void fc_thread_pool_destroy(FCThreadPool *pool); + +int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func, + void *arg); + +#ifdef __cplusplus +} +#endif + +#endif