/* * Copyright (c) 2020 YuQing <384681@qq.com> * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the Lesser GNU General Public License, version 3 * or later ("LGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the Lesser GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include "sched_thread.h" #include "fc_memory.h" #include "thread_pool.h" static void *thread_entrance(void *arg) { FCThreadInfo *thread; FCThreadPool *pool; struct timespec ts; fc_thread_pool_callback callback; time_t last_run_time; bool running; bool notify; int idle_count; thread = (FCThreadInfo *)arg; pool = thread->pool; #ifdef OS_LINUX { char thread_name[64]; snprintf(thread_name, sizeof(thread_name), "%s[%d]", pool->name, thread->index); prctl(PR_SET_NAME, thread_name); } #endif if (pool->extra_data_callbacks.alloc != NULL) { thread->tdata = pool->extra_data_callbacks.alloc(); } PTHREAD_MUTEX_LOCK(&thread->lock); thread->inited = true; PTHREAD_MUTEX_UNLOCK(&thread->lock); PTHREAD_MUTEX_LOCK(&pool->lock); pool->thread_counts.running++; logDebug("thread pool: %s, index: %d start, running count: %d", pool->name, thread->index, pool->thread_counts.running); PTHREAD_MUTEX_UNLOCK(&pool->lock); running = true; ts.tv_nsec = 0; last_run_time = get_current_time(); while (running && *pool->pcontinue_flag) { PTHREAD_MUTEX_LOCK(&thread->lock); if (thread->callback.func == NULL) { ts.tv_sec = get_current_time() + 2; pthread_cond_timedwait(&thread->cond, &thread->lock, &ts); } callback = thread->callback.func; if (callback == NULL) { if (pool->max_idle_time > 0 && get_current_time() - last_run_time > pool->max_idle_time) { PTHREAD_MUTEX_LOCK(&pool->lock); idle_count = pool->thread_counts.running - __sync_add_and_fetch(&pool->thread_counts.dealing, 0); if (idle_count > pool->min_idle_count) { thread->inited = false; pool->thread_counts.running--; running = false; } PTHREAD_MUTEX_UNLOCK(&pool->lock); } } else { thread->callback.func = NULL; } PTHREAD_MUTEX_UNLOCK(&thread->lock); if (callback != NULL) { __sync_add_and_fetch(&pool->thread_counts.dealing, 1); callback(thread->callback.arg, thread->tdata); last_run_time = get_current_time(); __sync_sub_and_fetch(&pool->thread_counts.dealing, 1); PTHREAD_MUTEX_LOCK(&pool->lock); notify = (pool->freelist == NULL); thread->next = pool->freelist; pool->freelist = thread; if (notify) { pthread_cond_signal(&pool->cond); } PTHREAD_MUTEX_UNLOCK(&pool->lock); } } if (pool->extra_data_callbacks.free != NULL && thread->tdata != NULL) { pool->extra_data_callbacks.free(thread->tdata); thread->tdata = NULL; } if (running) { PTHREAD_MUTEX_LOCK(&thread->lock); thread->inited = false; PTHREAD_MUTEX_UNLOCK(&thread->lock); PTHREAD_MUTEX_LOCK(&pool->lock); pool->thread_counts.running--; PTHREAD_MUTEX_UNLOCK(&pool->lock); } PTHREAD_MUTEX_LOCK(&pool->lock); logDebug("thread pool: %s, index: %d exit, running count: %d", pool->name, thread->index, pool->thread_counts.running); PTHREAD_MUTEX_UNLOCK(&pool->lock); return NULL; } static int init_pthread_lock_cond(pthread_mutex_t *lock, pthread_cond_t *cond) { int result; if ((result=init_pthread_lock(lock)) != 0) { logError("file: "__FILE__", line: %d, " "init_pthread_lock fail, errno: %d, error info: %s", __LINE__, result, STRERROR(result)); return result; } if ((result=pthread_cond_init(cond, NULL)) != 0) { logError("file: "__FILE__", line: %d, " "pthread_cond_init fail, " "errno: %d, error info: %s", __LINE__, result, STRERROR(result)); return result; } return 0; } static int thread_pool_alloc_init(FCThreadPool *pool) { int result; int bytes; FCThreadInfo *thread; FCThreadInfo *end; FCThreadInfo *last; bytes = sizeof(FCThreadInfo) * pool->thread_counts.limit; pool->threads = (FCThreadInfo *)fc_malloc(bytes); if (pool->threads == NULL) { return ENOMEM; } memset(pool->threads, 0, bytes); end = pool->threads + pool->thread_counts.limit; for (thread=pool->threads; threadpool = pool; thread->index = thread - pool->threads; if ((result=init_pthread_lock_cond(&thread->lock, &thread->cond)) != 0) { return result; } } last = end - 1; pool->freelist = pool->threads; for (thread=pool->threads; threadnext = thread + 1; } if (pool->min_idle_count > 0) { end = pool->threads + pool->min_idle_count; for (thread=pool->threads; threadinited = true; if ((result=fc_create_thread(&thread->tid, thread_entrance, thread, pool->stack_size)) != 0) { return result; } } } return 0; } int fc_thread_pool_init_ex(FCThreadPool *pool, const char *name, const int limit, const int stack_size, const int max_idle_time, const int min_idle_count, bool * volatile pcontinue_flag, FCThreadExtraDataCallbacks *extra_data_callbacks) { int result; if ((result=init_pthread_lock_cond(&pool->lock, &pool->cond)) != 0) { return result; } snprintf(pool->name, sizeof(pool->name), "%s", name); pool->stack_size = stack_size; pool->max_idle_time = max_idle_time; if (min_idle_count > limit) { pool->min_idle_count = limit; } else { pool->min_idle_count = min_idle_count; } pool->thread_counts.limit = limit; pool->thread_counts.running = 0; pool->thread_counts.dealing = 0; pool->pcontinue_flag = pcontinue_flag; if (extra_data_callbacks != NULL) { pool->extra_data_callbacks = *extra_data_callbacks; } else { pool->extra_data_callbacks.alloc = NULL; pool->extra_data_callbacks.free = NULL; } return thread_pool_alloc_init(pool); } void fc_thread_pool_destroy(FCThreadPool *pool) { } int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func, void *arg) { FCThreadInfo *thread; struct timespec ts; int result; thread = NULL; ts.tv_nsec = 0; PTHREAD_MUTEX_LOCK(&pool->lock); while (*pool->pcontinue_flag) { if (pool->freelist != NULL) { thread = pool->freelist; pool->freelist = pool->freelist->next; break; } ts.tv_sec = get_current_time() + 2; pthread_cond_timedwait(&pool->cond, &pool->lock, &ts); } PTHREAD_MUTEX_UNLOCK(&pool->lock); if (thread == NULL) { return EINTR; } PTHREAD_MUTEX_LOCK(&thread->lock); thread->callback.func = func; thread->callback.arg = arg; if (!thread->inited) { result = fc_create_thread(&thread->tid, thread_entrance, thread, pool->stack_size); } else { pthread_cond_signal(&thread->cond); result = 0; } PTHREAD_MUTEX_UNLOCK(&thread->lock); return result; }