add files: thread_pool.[hc]
parent
8e0f5794d9
commit
29586005ba
|
|
@ -52,6 +52,7 @@ src/tests/test_pipe
|
|||
src/tests/test_atomic
|
||||
src/tests/test_file_write_hole
|
||||
src/tests/test_file_lock
|
||||
src/tests/test_thread_pool
|
||||
|
||||
# other
|
||||
*.swp
|
||||
|
|
|
|||
3
HISTORY
3
HISTORY
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
Version 1.44 2020-07-10
|
||||
Version 1.44 2020-07-20
|
||||
* add test file src/tests/test_pthread_lock.c
|
||||
* add uniq_skiplist.[hc]
|
||||
* add function split_string_ex
|
||||
|
|
@ -33,6 +33,7 @@ Version 1.44 2020-07-10
|
|||
* add files: fc_queue.[hc]
|
||||
* add files: fc_memory.[hc]
|
||||
* add files: shared_buffer.[hc]
|
||||
* add files: thread_pool.[hc]
|
||||
|
||||
Version 1.43 2019-12-25
|
||||
* replace function call system to getExecResult,
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \
|
|||
char_converter.lo char_convert_loader.lo common_blocked_queue.lo \
|
||||
multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \
|
||||
json_parser.lo buffered_file_writer.lo server_id_func.lo \
|
||||
fc_queue.lo fc_memory.lo shared_buffer.lo
|
||||
fc_queue.lo fc_memory.lo shared_buffer.lo thread_pool.lo
|
||||
|
||||
FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
|
||||
logger.o sockopt.o base64.o sched_thread.o \
|
||||
|
|
@ -29,7 +29,7 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
|
|||
char_converter.o char_convert_loader.o common_blocked_queue.o \
|
||||
multi_socket_client.o skiplist_set.o uniq_skiplist.o \
|
||||
json_parser.o buffered_file_writer.o server_id_func.o \
|
||||
fc_queue.o fc_memory.o shared_buffer.o
|
||||
fc_queue.o fc_memory.o shared_buffer.o thread_pool.o
|
||||
|
||||
HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
|
||||
shared_func.h pthread_func.h ini_file_reader.h _os_define.h \
|
||||
|
|
@ -43,7 +43,7 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
|
|||
char_convert_loader.h common_blocked_queue.h \
|
||||
multi_socket_client.h skiplist_set.h uniq_skiplist.h \
|
||||
fc_list.h json_parser.h buffered_file_writer.h server_id_func.h \
|
||||
fc_queue.h fc_memory.h shared_buffer.h
|
||||
fc_queue.h fc_memory.h shared_buffer.h thread_pool.h
|
||||
|
||||
ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,289 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2008 Seapeak.Xu / xvhfeng@gmail.com
|
||||
*
|
||||
* FastLib may be copied only under the terms of the GNU General
|
||||
* Public License V3, which may be found in the FastLib source kit.
|
||||
* Please visit the FastLib Home Page http://www.fastken.com/ for more detail.
|
||||
**/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "pthread_pool.h"
|
||||
|
||||
/*
|
||||
*the thread pool
|
||||
*/
|
||||
// global varalibale declared
|
||||
static threadpool_info_t *pool;
|
||||
|
||||
/*
|
||||
* the thread callback function proxy
|
||||
* parameters:
|
||||
* arg:the thread callback function parameter
|
||||
*/
|
||||
static void *callback_proxy(void *arg);
|
||||
|
||||
/*
|
||||
* push the thread into the pool
|
||||
* parameters:
|
||||
* thread:the thread will push into the poolbool
|
||||
* return:
|
||||
* 0:success
|
||||
* >0 : fail
|
||||
*/
|
||||
static int push2pool(thread_info_t *thread);
|
||||
|
||||
// proxy the thread running, use pthread_cond_wait to wait for arg to be update by
|
||||
// other users that need to use this thread
|
||||
static void *callback_proxy(void *arg)
|
||||
{
|
||||
thread_info_t* thread = (thread_info_t *) arg;
|
||||
// runs only when the pool->state is initialized
|
||||
while(initialized == pool->state)
|
||||
{
|
||||
// run what the caller want to do
|
||||
thread->func(thread->arg);
|
||||
|
||||
// if the state of thread pool is changed
|
||||
// we termiate the execution of this thread by returning the result
|
||||
if(pool == NULL || initialized != pool->state) break;
|
||||
|
||||
pthread_mutex_lock(&thread->mutex_locker);
|
||||
|
||||
if(0 == push2pool(thread))
|
||||
{
|
||||
pthread_cond_wait(&thread->run_locker,&thread->mutex_locker);
|
||||
pthread_mutex_unlock(&thread->mutex_locker);
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_mutex_unlock( &thread->mutex_locker );
|
||||
pthread_cond_destroy( &thread->run_locker );
|
||||
pthread_mutex_destroy( &thread->mutex_locker );
|
||||
|
||||
free( thread );
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&pool->mutex_locker);
|
||||
pool->current_size --;
|
||||
if(0 >= pool->current_size) pthread_cond_signal(&pool->empty_locker);
|
||||
pthread_mutex_unlock(&pool->mutex_locker);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// push the free thread_info_t back to the thread pool
|
||||
static int push2pool(thread_info_t *thread)
|
||||
{
|
||||
int result = -1;
|
||||
do
|
||||
{
|
||||
pthread_mutex_lock(&pool->mutex_locker);
|
||||
if( pool->current_index < pool->total_size )
|
||||
{
|
||||
pool->list[ pool->current_index ] = thread;
|
||||
pool->current_index++;
|
||||
result = 0;
|
||||
|
||||
// there is new thread for use, call phtread_cond_signal to
|
||||
// notice the caller
|
||||
pthread_cond_signal( &pool->run_locker);
|
||||
|
||||
if( pool->current_index >= pool->current_size )
|
||||
{
|
||||
// current_index reach the max
|
||||
// notice other thread that I am full
|
||||
pthread_cond_signal( &pool->full_locker );
|
||||
}
|
||||
}
|
||||
}while(0);
|
||||
pthread_mutex_unlock(&pool->mutex_locker);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// initialize a thread pool of [size] for later use
|
||||
int threadpool_init(int size)
|
||||
{
|
||||
if(0 >= size)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
pool = (threadpool_info_t *) malloc(sizeof(threadpool_info_t));
|
||||
if(NULL == pool)
|
||||
{
|
||||
return -2;
|
||||
}
|
||||
memset(pool,0,sizeof(threadpool_info_t));
|
||||
pool->state = initializing;
|
||||
pool->total_size = size;
|
||||
pool->current_size = 0;
|
||||
pool->current_index = 0;
|
||||
// initialize sync data structures
|
||||
pthread_mutex_init(&pool->mutex_locker,NULL);
|
||||
pthread_cond_init(&pool->run_locker,NULL);
|
||||
pthread_cond_init(&pool->empty_locker,NULL);
|
||||
pthread_cond_init(&pool->full_locker,NULL);
|
||||
// initialize a list of thread_info_t structs
|
||||
pool->list = (thread_info_t **) malloc(sizeof(thread_info_t*) * size);
|
||||
if(NULL == pool->list)
|
||||
{
|
||||
pthread_cond_destroy(&pool->run_locker);
|
||||
pthread_cond_destroy(&pool->empty_locker);
|
||||
pthread_cond_destroy(&pool->full_locker);
|
||||
pthread_mutex_destroy(&pool->mutex_locker);
|
||||
// free the memory pointed by pool pointer
|
||||
free(pool);
|
||||
return -2;
|
||||
}
|
||||
|
||||
pool->state = initialized;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// run the callback within the thread pool, arg is its var
|
||||
int threadpool_run(callback func,void *arg)
|
||||
{
|
||||
if(NULL == pool)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
do
|
||||
{
|
||||
pthread_mutex_lock(&pool->mutex_locker);
|
||||
if(NULL == pool || initialized != pool->state) //the pool cannot use
|
||||
{
|
||||
result = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
//current size is >= the max pool size and all thread are busy now
|
||||
while(pool->current_index <= 0 && pool->current_size >= pool->total_size)
|
||||
{
|
||||
// wait on the run locker when there is no spare thread for the caller to use
|
||||
pthread_cond_wait(&pool->run_locker,&pool->mutex_locker);
|
||||
}
|
||||
|
||||
if(0 >= pool->current_index)
|
||||
{
|
||||
// when the current_index is smaler or equal to 0
|
||||
// we create a new thread_info_t data structure and put it into the pool later
|
||||
thread_info_t * thread = (thread_info_t *) malloc(sizeof(thread_info_t));
|
||||
if(NULL == thread)
|
||||
{
|
||||
result = -2;
|
||||
break;
|
||||
}
|
||||
memset(thread,0,sizeof(thread_info_t));
|
||||
|
||||
// create thread and set it to detached mode(it will end by itself when it finishes)
|
||||
pthread_mutex_init(&thread->mutex_locker,NULL);
|
||||
pthread_cond_init(&thread->run_locker,NULL);
|
||||
pthread_attr_t attr;
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
|
||||
|
||||
thread->arg = arg;
|
||||
thread->func = func;
|
||||
|
||||
if(0 == pthread_create(&thread->id,&attr,callback_proxy,thread))
|
||||
{
|
||||
pool->current_size ++;
|
||||
}
|
||||
else
|
||||
{
|
||||
result = -3;
|
||||
pthread_mutex_destroy(&thread->mutex_locker);
|
||||
pthread_cond_destroy(&thread->run_locker);
|
||||
free(thread);
|
||||
}
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
pool->current_index --;//because the array begin with 0
|
||||
thread_info_t *thread = pool->list[ pool->current_index ];
|
||||
pool->list[ pool->current_index ] = NULL;
|
||||
|
||||
thread->func = func;
|
||||
thread->arg = arg;
|
||||
|
||||
pthread_mutex_lock( &thread->mutex_locker );
|
||||
pthread_cond_signal( &thread->run_locker ) ;
|
||||
pthread_mutex_unlock ( &thread->mutex_locker );
|
||||
}
|
||||
}while(0);
|
||||
pthread_mutex_unlock(&pool->mutex_locker);
|
||||
return result;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// destory the thread pool
|
||||
int threadpool_destroy()
|
||||
{
|
||||
if(NULL == pool) return 0;
|
||||
|
||||
pthread_mutex_lock( &pool->mutex_locker);
|
||||
|
||||
// when current_index is biger or equals to current_size
|
||||
// this means all the job in the thread pool is finished
|
||||
// which means we can do the free related jobs
|
||||
if( pool->current_index < pool->current_size )
|
||||
{
|
||||
// if current_index < current_size, wait for current_index to reach current_size
|
||||
// then change the state of the pool, thus stopping the caller to put more task
|
||||
// into the thread pool, then it use pthread_cond_signal to cause all worker thread
|
||||
// to exit.
|
||||
pthread_cond_wait( &pool->full_locker, &pool->mutex_locker );
|
||||
}
|
||||
|
||||
pool->state = uninstalling;
|
||||
int i = 0;
|
||||
|
||||
// cause all the thread to run to its end
|
||||
for( i = 0; i < pool->current_index; i++ )
|
||||
{
|
||||
thread_info_t *thread = pool->list[i];
|
||||
|
||||
pthread_mutex_lock( &thread->mutex_locker );
|
||||
pthread_cond_signal( &thread->run_locker ) ;
|
||||
pthread_mutex_unlock ( &thread->mutex_locker );
|
||||
}
|
||||
|
||||
// wait for all threads to exit, when all threads exit, it will
|
||||
// issue a signal of empty_locker
|
||||
if(0 < pool->current_size)
|
||||
{
|
||||
pthread_cond_wait( &pool->empty_locker, &pool->mutex_locker);
|
||||
}
|
||||
|
||||
for( i = 0; i < pool->current_index; i++ )
|
||||
{
|
||||
free( pool->list[ i ] );
|
||||
pool->list[ i ] = NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock( &pool->mutex_locker );
|
||||
|
||||
pool->current_index = 0;
|
||||
|
||||
pthread_mutex_destroy( &pool->mutex_locker );
|
||||
pthread_cond_destroy( &pool->run_locker );
|
||||
pthread_cond_destroy( &pool->full_locker );
|
||||
pthread_cond_destroy( &pool->empty_locker );
|
||||
|
||||
free( pool->list );
|
||||
pool->list = NULL;
|
||||
free( pool);
|
||||
pool = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -1,123 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2008 Seapeak.Xu / xvhfeng@gmail.com
|
||||
*
|
||||
* FastLib may be copied only under the terms of the GNU General
|
||||
* Public License V3, which may be found in the FastLib source kit.
|
||||
* Please visit the FastLib Home Page http://www.fastken.com/ for more detail.
|
||||
**/
|
||||
|
||||
#ifndef PTHREAD_POOL_H_
|
||||
#define PTHREAD_POOL_H_
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
/*
|
||||
* define the callback function type of thread
|
||||
*/
|
||||
typedef void (*callback)(void *);
|
||||
|
||||
|
||||
/*
|
||||
* the thread pool state
|
||||
* member:
|
||||
* uninitialized : not initialize the thread pool.
|
||||
* initializing : initializing the thread pool.
|
||||
* initialized : the pool can use.
|
||||
* uninstalling : uninstalling the thread pool.
|
||||
* uninstalled : uninstall the thread pool is over.
|
||||
*/
|
||||
typedef enum threadpool_state
|
||||
{
|
||||
uninitialized,
|
||||
initializing,
|
||||
initialized,
|
||||
uninstalling,
|
||||
uninstalled,
|
||||
}thread_state_t;
|
||||
|
||||
|
||||
/*
|
||||
* define the thread type which in the pool
|
||||
* members:
|
||||
* id : the thread id
|
||||
* mutex_locker : the mutext locker
|
||||
* run_locker : the locker for noticing the thread do running or waitting
|
||||
* func : the callback function for thread
|
||||
* arg : the callback parameter
|
||||
*/
|
||||
typedef struct thread_info
|
||||
{
|
||||
pthread_t id;
|
||||
pthread_mutex_t mutex_locker;
|
||||
pthread_cond_t run_locker;
|
||||
callback func;
|
||||
void *arg;
|
||||
}thread_info_t;
|
||||
|
||||
/*
|
||||
* the structure for the thread pool
|
||||
* member:
|
||||
* list : the initialazed thread list
|
||||
* mutex_locker : the mutex locker for the thread operation.
|
||||
* run_locker : the locker for noticing the thread do running or waitting.
|
||||
* full_locker : the locker notice the thread is stoping when free the thread pool and the pool is not full .
|
||||
* empty_Locker : the locker notice the thread waitting for the busy thread work over,then do with the thread.
|
||||
* state : the pool's current state.
|
||||
* total_size : the pool max size;
|
||||
* current_size : the thread count for the current pool ;
|
||||
* current_index : the busy thread in the pool index.
|
||||
*/
|
||||
typedef struct threadpool_info
|
||||
{
|
||||
thread_info_t **list;
|
||||
pthread_mutex_t mutex_locker;
|
||||
pthread_cond_t run_locker;
|
||||
pthread_cond_t full_locker;
|
||||
pthread_cond_t empty_locker;
|
||||
thread_state_t state;
|
||||
int total_size;
|
||||
int current_size;
|
||||
int current_index;
|
||||
}threadpool_info_t;
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/*
|
||||
* initialize the thread pool
|
||||
* parameters:
|
||||
* size : thread pool max size
|
||||
* return:
|
||||
* 0:initialize pool success;
|
||||
* -1:the size parameter is less 0;
|
||||
* -2:initialize pool is fail,malloc memory for pool or pool->list is error;
|
||||
*/
|
||||
int threadpool_init(int size);
|
||||
|
||||
/*
|
||||
* run the function with the thread from pool
|
||||
* parameter:
|
||||
* func:the thread callback function
|
||||
* arg:the parameter of callback function
|
||||
* return:
|
||||
* 0 : success
|
||||
* -1: the pool is NULL;
|
||||
* -2 : malloc memory for thread is error;
|
||||
* -3 : create thread is error;
|
||||
*/
|
||||
int threadpool_run(callback func,void *arg);
|
||||
|
||||
/*
|
||||
* free and destroy the thread pool memory
|
||||
* return:
|
||||
* 0 : success
|
||||
* less 0 : fail
|
||||
*/
|
||||
int threadpool_destroy();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* PTHREAD_POOL_H_ */
|
||||
|
|
@ -8,7 +8,8 @@ ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blo
|
|||
test_id_generator test_ini_parser test_char_convert test_char_convert_loader \
|
||||
test_logger test_skiplist_set test_crc32 test_thourands_seperator test_sched_thread \
|
||||
test_json_parser test_pthread_lock test_uniq_skiplist test_split_string \
|
||||
test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock
|
||||
test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock \
|
||||
test_thread_pool
|
||||
|
||||
all: $(ALL_PRGS)
|
||||
.c:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,128 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
#include <time.h>
|
||||
#include <inttypes.h>
|
||||
#include <sys/time.h>
|
||||
#include "fastcommon/logger.h"
|
||||
#include "fastcommon/shared_func.h"
|
||||
#include "fastcommon/sched_thread.h"
|
||||
#include "fastcommon/pthread_func.h"
|
||||
#include "fastcommon/ini_file_reader.h"
|
||||
#include "fastcommon/thread_pool.h"
|
||||
|
||||
#define LOOP_COUNT (10 * 1000 * 1000)
|
||||
|
||||
static volatile int counter = 0;
|
||||
static volatile int64_t total = 0;
|
||||
|
||||
#define TASK_COUNT 10
|
||||
|
||||
void thread2_func(void *args)
|
||||
{
|
||||
int i;
|
||||
for (i=0; i<LOOP_COUNT; i++) {
|
||||
__sync_add_and_fetch(&counter, 1);
|
||||
__sync_add_and_fetch(&total, 1);
|
||||
}
|
||||
}
|
||||
|
||||
void thread1_func(void *args)
|
||||
{
|
||||
int i;
|
||||
for (i=0; i<LOOP_COUNT; i++) {
|
||||
__sync_sub_and_fetch(&counter, 1);
|
||||
__sync_add_and_fetch(&total, 1);
|
||||
}
|
||||
}
|
||||
|
||||
void wait_thread_func(void *args)
|
||||
{
|
||||
int i;
|
||||
for (i=0; i<LOOP_COUNT; i++) {
|
||||
__sync_add_and_fetch(&counter, 0);
|
||||
}
|
||||
}
|
||||
|
||||
int test(FCThreadPool *pool)
|
||||
{
|
||||
int result;
|
||||
int i;
|
||||
|
||||
for (i=0; i<TASK_COUNT / 2; i++) {
|
||||
if ((result=fc_thread_pool_run(pool, thread1_func, NULL)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"thread_pool_run fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
for (i=0; i<TASK_COUNT / 2; i++) {
|
||||
if ((result=fc_thread_pool_run(pool, thread2_func, NULL)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"thread_pool_run fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
if ((result=fc_thread_pool_run(pool, wait_thread_func, NULL)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"thread_pool_run fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
FCThreadPool pool;
|
||||
const int limit = 8;
|
||||
const int stack_size = 128 * 1024;
|
||||
const int max_idle_time = 5;
|
||||
const int min_idle_count = 2;
|
||||
volatile bool continue_flag = true;
|
||||
int result;
|
||||
int64_t start_time;
|
||||
|
||||
log_init();
|
||||
srand(time(NULL));
|
||||
g_log_context.log_level = LOG_DEBUG;
|
||||
|
||||
start_time = get_current_time_ms();
|
||||
if ((result=fc_thread_pool_init(&pool, limit, stack_size, max_idle_time,
|
||||
min_idle_count, (bool * volatile)&continue_flag)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
|
||||
result = test(&pool);
|
||||
|
||||
sleep(10);
|
||||
printf("counter: %d, total: %"PRId64", time used: %"PRId64" ms\n",
|
||||
__sync_add_and_fetch(&counter, 0),
|
||||
__sync_add_and_fetch(&total, 0),
|
||||
get_current_time_ms() - start_time);
|
||||
|
||||
result = test(&pool);
|
||||
sleep(5);
|
||||
|
||||
continue_flag = false;
|
||||
|
||||
sleep(2);
|
||||
printf("counter: %d, total: %"PRId64", time used: %"PRId64" ms\n",
|
||||
__sync_add_and_fetch(&counter, 0),
|
||||
__sync_add_and_fetch(&total, 0),
|
||||
get_current_time_ms() - start_time);
|
||||
|
||||
fc_thread_pool_destroy(&pool);
|
||||
logInfo("exit");
|
||||
return result;
|
||||
}
|
||||
|
|
@ -0,0 +1,243 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include "pthread_func.h"
|
||||
#include "sched_thread.h"
|
||||
#include "fc_memory.h"
|
||||
#include "thread_pool.h"
|
||||
|
||||
static void *thread_entrance(void *arg)
|
||||
{
|
||||
FCThreadInfo *thread;
|
||||
FCThreadPool *pool;
|
||||
struct timespec ts;
|
||||
fc_thread_pool_callback callback;
|
||||
time_t last_run_time;
|
||||
bool running;
|
||||
bool notify;
|
||||
int idle_count;
|
||||
|
||||
thread = (FCThreadInfo *)arg;
|
||||
pool = thread->pool;
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&thread->lock);
|
||||
thread->inited = true;
|
||||
PTHREAD_MUTEX_UNLOCK(&thread->lock);
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&pool->lock);
|
||||
pool->thread_counts.running++;
|
||||
logInfo("tindex: %d start, tcount: %d",
|
||||
thread->index, pool->thread_counts.running);
|
||||
PTHREAD_MUTEX_UNLOCK(&pool->lock);
|
||||
|
||||
running = true;
|
||||
ts.tv_nsec = 0;
|
||||
last_run_time = get_current_time();
|
||||
while (running && *pool->pcontinue_flag) {
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&thread->lock);
|
||||
if (thread->func == NULL) {
|
||||
ts.tv_sec = get_current_time() + 2;
|
||||
pthread_cond_timedwait(&thread->cond, &thread->lock, &ts);
|
||||
}
|
||||
|
||||
callback = thread->func;
|
||||
if (callback == NULL) {
|
||||
if (pool->max_idle_time > 0 && get_current_time() -
|
||||
last_run_time > pool->max_idle_time)
|
||||
{
|
||||
PTHREAD_MUTEX_LOCK(&pool->lock);
|
||||
idle_count = pool->thread_counts.running -
|
||||
__sync_add_and_fetch(&pool->thread_counts.dealing, 0);
|
||||
|
||||
if (idle_count > pool->min_idle_count) {
|
||||
thread->inited = false;
|
||||
pool->thread_counts.running--;
|
||||
running = false;
|
||||
}
|
||||
PTHREAD_MUTEX_UNLOCK(&pool->lock);
|
||||
}
|
||||
} else {
|
||||
thread->func = NULL;
|
||||
}
|
||||
PTHREAD_MUTEX_UNLOCK(&thread->lock);
|
||||
|
||||
if (callback != NULL) {
|
||||
__sync_add_and_fetch(&pool->thread_counts.dealing, 1);
|
||||
callback(thread->arg);
|
||||
last_run_time = get_current_time();
|
||||
__sync_sub_and_fetch(&pool->thread_counts.dealing, 1);
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&pool->lock);
|
||||
notify = (pool->freelist == NULL);
|
||||
thread->next = pool->freelist;
|
||||
pool->freelist = thread;
|
||||
if (notify) {
|
||||
pthread_cond_signal(&pool->cond);
|
||||
}
|
||||
PTHREAD_MUTEX_UNLOCK(&pool->lock);
|
||||
}
|
||||
}
|
||||
|
||||
if (running) {
|
||||
PTHREAD_MUTEX_LOCK(&thread->lock);
|
||||
thread->inited = false;
|
||||
PTHREAD_MUTEX_UNLOCK(&thread->lock);
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&pool->lock);
|
||||
pool->thread_counts.running--;
|
||||
PTHREAD_MUTEX_UNLOCK(&pool->lock);
|
||||
}
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&pool->lock);
|
||||
logInfo("tindex: %d exit, tcount: %d",
|
||||
thread->index, pool->thread_counts.running);
|
||||
PTHREAD_MUTEX_UNLOCK(&pool->lock);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int init_pthread_lock_cond(pthread_mutex_t *lock, pthread_cond_t *cond)
|
||||
{
|
||||
int result;
|
||||
if ((result=init_pthread_lock(lock)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"init_pthread_lock fail, errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
if ((result=pthread_cond_init(cond, NULL)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"pthread_cond_init fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int thread_pool_alloc_init(FCThreadPool *pool)
|
||||
{
|
||||
int result;
|
||||
int bytes;
|
||||
FCThreadInfo *thread;
|
||||
FCThreadInfo *end;
|
||||
FCThreadInfo *last;
|
||||
|
||||
bytes = sizeof(FCThreadInfo) * pool->thread_counts.limit;
|
||||
pool->threads = (FCThreadInfo *)fc_malloc(bytes);
|
||||
if (pool->threads == NULL) {
|
||||
return ENOMEM;
|
||||
}
|
||||
memset(pool->threads, 0, bytes);
|
||||
|
||||
end = pool->threads + pool->thread_counts.limit;
|
||||
for (thread=pool->threads; thread<end; thread++) {
|
||||
thread->pool = pool;
|
||||
thread->index = thread - pool->threads;
|
||||
if ((result=init_pthread_lock_cond(&thread->lock,
|
||||
&thread->cond)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
last = end - 1;
|
||||
pool->freelist = pool->threads;
|
||||
for (thread=pool->threads; thread<last; thread++) {
|
||||
thread->next = thread + 1;
|
||||
}
|
||||
|
||||
if (pool->min_idle_count > 0) {
|
||||
end = pool->threads + pool->min_idle_count;
|
||||
for (thread=pool->threads; thread<end; thread++) {
|
||||
thread->inited = true;
|
||||
if ((result=fc_create_thread(&thread->tid, thread_entrance,
|
||||
thread, pool->stack_size)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int fc_thread_pool_init(FCThreadPool *pool, const int limit,
|
||||
const int stack_size, const int max_idle_time,
|
||||
const int min_idle_count, bool * volatile pcontinue_flag)
|
||||
{
|
||||
int result;
|
||||
|
||||
if ((result=init_pthread_lock_cond(&pool->lock, &pool->cond)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
pool->stack_size = stack_size;
|
||||
pool->max_idle_time = max_idle_time;
|
||||
if (min_idle_count > limit) {
|
||||
pool->min_idle_count = limit;
|
||||
} else {
|
||||
pool->min_idle_count = min_idle_count;
|
||||
}
|
||||
pool->thread_counts.limit = limit;
|
||||
pool->thread_counts.running = 0;
|
||||
pool->thread_counts.dealing = 0;
|
||||
pool->pcontinue_flag = pcontinue_flag;
|
||||
|
||||
return thread_pool_alloc_init(pool);
|
||||
}
|
||||
|
||||
void fc_thread_pool_destroy(FCThreadPool *pool)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func,
|
||||
void *arg)
|
||||
{
|
||||
FCThreadInfo *thread;
|
||||
struct timespec ts;
|
||||
int result;
|
||||
|
||||
if (func == NULL) {
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
thread = NULL;
|
||||
ts.tv_nsec = 0;
|
||||
PTHREAD_MUTEX_LOCK(&pool->lock);
|
||||
while (*pool->pcontinue_flag) {
|
||||
if (pool->freelist != NULL) {
|
||||
thread = pool->freelist;
|
||||
pool->freelist = pool->freelist->next;
|
||||
break;
|
||||
}
|
||||
|
||||
ts.tv_sec = get_current_time() + 2;
|
||||
pthread_cond_timedwait(&pool->cond, &pool->lock, &ts);
|
||||
}
|
||||
PTHREAD_MUTEX_UNLOCK(&pool->lock);
|
||||
|
||||
if (thread == NULL) {
|
||||
return EINTR;
|
||||
}
|
||||
|
||||
PTHREAD_MUTEX_LOCK(&thread->lock);
|
||||
thread->func = func;
|
||||
thread->arg = arg;
|
||||
if (!thread->inited) {
|
||||
result = fc_create_thread(&thread->tid, thread_entrance,
|
||||
thread, pool->stack_size);
|
||||
} else {
|
||||
pthread_cond_signal(&thread->cond);
|
||||
result = 0;
|
||||
}
|
||||
PTHREAD_MUTEX_UNLOCK(&thread->lock);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
#ifndef FC_THREAD_POOL_H_
|
||||
#define FC_THREAD_POOL_H_
|
||||
|
||||
#include <time.h>
|
||||
#include <pthread.h>
|
||||
#include "fast_mblock.h"
|
||||
|
||||
typedef void (*fc_thread_pool_callback)(void *arg);
|
||||
|
||||
struct fc_thread_pool;
|
||||
typedef struct fc_thread_info
|
||||
{
|
||||
volatile int inited;
|
||||
int index;
|
||||
pthread_t tid;
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t cond;
|
||||
fc_thread_pool_callback func;
|
||||
void *arg;
|
||||
struct fc_thread_pool *pool;
|
||||
struct fc_thread_info *next;
|
||||
} FCThreadInfo;
|
||||
|
||||
typedef struct fc_thread_pool
|
||||
{
|
||||
FCThreadInfo *threads; //all thread info
|
||||
FCThreadInfo *freelist;
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t cond;
|
||||
|
||||
int stack_size;
|
||||
int max_idle_time; //in seconds
|
||||
int min_idle_count;
|
||||
struct {
|
||||
int limit;
|
||||
volatile int running; //running thread count
|
||||
volatile int dealing; //dealing task thread count
|
||||
} thread_counts;
|
||||
bool * volatile pcontinue_flag;
|
||||
} FCThreadPool;
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int fc_thread_pool_init(FCThreadPool *pool, const int limit,
|
||||
const int stack_size, const int max_idle_time,
|
||||
const int min_idle_count, bool * volatile pcontinue_flag);
|
||||
|
||||
void fc_thread_pool_destroy(FCThreadPool *pool);
|
||||
|
||||
int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func,
|
||||
void *arg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
Loading…
Reference in New Issue