diff --git a/src/fast_task_queue.c b/src/fast_task_queue.c index 2319fc3..71c7465 100644 --- a/src/fast_task_queue.c +++ b/src/fast_task_queue.c @@ -11,11 +11,16 @@ static struct fast_task_queue g_free_queue; -struct mpool_chain { +struct mpool_node { struct fast_task_info *blocks; struct fast_task_info *last_block; //last block - struct mpool_chain *next; -} *g_mpool = NULL; + struct mpool_node *next; +}; + +struct mpool_chain { + struct mpool_node *head; + struct mpool_node *tail; +} g_mpool = {NULL, NULL}; #define ALIGNED_TASK_INFO_SIZE MEM_ALIGN(sizeof(struct fast_task_info)) @@ -37,21 +42,20 @@ int task_queue_init(struct fast_task_queue *pQueue) return 0; } -static struct mpool_chain *malloc_mpool(const int block_size, \ - const int total_alloc_size) +static struct mpool_node *malloc_mpool(const int total_alloc_size) { struct fast_task_info *pTask; char *p; char *pCharEnd; - struct mpool_chain *mpool; + struct mpool_node *mpool; - mpool = (struct mpool_chain *)malloc(sizeof(struct mpool_chain)); + mpool = (struct mpool_node *)malloc(sizeof(struct mpool_node)); if (mpool == NULL) { logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", \ - __LINE__, (int)sizeof(struct mpool_chain), \ + __LINE__, (int)sizeof(struct mpool_node), \ errno, STRERROR(errno)); return NULL; } @@ -71,7 +75,7 @@ static struct mpool_chain *malloc_mpool(const int block_size, \ memset(mpool->blocks, 0, total_alloc_size); pCharEnd = ((char *)mpool->blocks) + total_alloc_size; - for (p=(char *)mpool->blocks; pblocks; psize = g_free_queue.min_buff_size; @@ -96,7 +100,7 @@ static struct mpool_chain *malloc_mpool(const int block_size, \ errno, STRERROR(errno)); for (pt=(char *)mpool->blocks; pt < p; \ - pt += block_size) + pt += g_free_queue.block_size) { free(((struct fast_task_info *)pt)->data); } @@ -108,23 +112,23 @@ static struct mpool_chain *malloc_mpool(const int block_size, \ } } - mpool->last_block = (struct fast_task_info *)(pCharEnd - block_size); - for (p=(char *)mpool->blocks; p<(char *)mpool->last_block; p += block_size) + mpool->last_block = (struct fast_task_info *)(pCharEnd - g_free_queue.block_size); + for (p=(char *)mpool->blocks; p<(char *)mpool->last_block; p += g_free_queue.block_size) { pTask = (struct fast_task_info *)p; - pTask->next = (struct fast_task_info *)(p + block_size); + pTask->next = (struct fast_task_info *)(p + g_free_queue.block_size); } mpool->last_block->next = NULL; return mpool; } -int free_queue_init(const int max_connections, const int min_buff_size, \ - const int max_buff_size, const int arg_size) +int free_queue_init_ex(const int max_connections, const int init_connections, + const int alloc_task_once, const int min_buff_size, + const int max_buff_size, const int arg_size) { int64_t total_size; - struct mpool_chain *mpool; - int block_size; + struct mpool_node *mpool; int alloc_size; int result; int loop_count; @@ -144,8 +148,8 @@ int free_queue_init(const int max_connections, const int min_buff_size, \ aligned_min_size = MEM_ALIGN(min_buff_size); aligned_max_size = MEM_ALIGN(max_buff_size); aligned_arg_size = MEM_ALIGN(arg_size); - block_size = ALIGNED_TASK_INFO_SIZE + aligned_arg_size; - alloc_size = block_size * max_connections; + g_free_queue.block_size = ALIGNED_TASK_INFO_SIZE + aligned_arg_size; + alloc_size = g_free_queue.block_size * init_connections; if (aligned_max_size > aligned_min_size) { total_size = alloc_size; @@ -177,13 +181,13 @@ int free_queue_init(const int max_connections, const int min_buff_size, \ } } - if (max_data_size >= (int64_t)(block_size + aligned_min_size) * - (int64_t)max_connections) + if (max_data_size >= (int64_t)(g_free_queue.block_size + aligned_min_size) * + (int64_t)init_connections) { total_size = alloc_size + (int64_t)aligned_min_size * - max_connections; + init_connections; g_free_queue.malloc_whole_block = true; - block_size += aligned_min_size; + g_free_queue.block_size += aligned_min_size; } else { @@ -193,64 +197,72 @@ int free_queue_init(const int max_connections, const int min_buff_size, \ } } - logDebug("file: "__FILE__", line: %d, " \ - "max_connections: %d, min_buff_size: %d, max_buff_size: %d, " \ - "block_size: %d, arg_size: %d, max_data_size: %d, " \ - "total_size: %"PRId64, __LINE__, \ - max_connections, aligned_min_size, aligned_max_size, \ - block_size, aligned_arg_size, (int)max_data_size, total_size); - g_free_queue.max_connections = max_connections; + g_free_queue.current_connections = init_connections; + if (alloc_task_once <= 0) + { + g_free_queue.alloc_task_once = 256; + } + else + { + g_free_queue.alloc_task_once = alloc_task_once; + } g_free_queue.min_buff_size = aligned_min_size; g_free_queue.max_buff_size = aligned_max_size; g_free_queue.arg_size = aligned_arg_size; + logDebug("file: "__FILE__", line: %d, " + "max_connections: %d, init_connections: %d, alloc_task_once: %d, " + "min_buff_size: %d, max_buff_size: %d, block_size: %d, " + "arg_size: %d, max_data_size: %d, total_size: %"PRId64, + __LINE__, max_connections, init_connections, + g_free_queue.alloc_task_once, aligned_min_size, aligned_max_size, + g_free_queue.block_size, aligned_arg_size, (int)max_data_size, total_size); + if ((!g_free_queue.malloc_whole_block) || \ (total_size <= max_data_size)) { loop_count = 1; - mpool = malloc_mpool(block_size, total_size); + mpool = malloc_mpool(total_size); if (mpool == NULL) { return errno != 0 ? errno : ENOMEM; } - g_mpool = mpool; + g_mpool.head = mpool; + g_mpool.tail = mpool; } else { - struct mpool_chain *previous_mpool; int remain_count; int alloc_once; int current_count; int current_alloc_size; - mpool = NULL; - previous_mpool = NULL; loop_count = 0; - remain_count = max_connections; - alloc_once = max_data_size / block_size; + remain_count = init_connections; + alloc_once = max_data_size / g_free_queue.block_size; while (remain_count > 0) { current_count = (remain_count > alloc_once) ? \ alloc_once : remain_count; - current_alloc_size = block_size * current_count; - mpool = malloc_mpool(block_size, current_alloc_size); + current_alloc_size = g_free_queue.block_size * current_count; + mpool = malloc_mpool(current_alloc_size); if (mpool == NULL) { free_queue_destroy(); return errno != 0 ? errno : ENOMEM; } - if (previous_mpool == NULL) + if (g_mpool.tail == NULL) { - g_mpool = mpool; + g_mpool.head = mpool; } else { - previous_mpool->next = mpool; - previous_mpool->last_block->next = mpool->blocks; + g_mpool.tail->next = mpool; + g_mpool.tail->last_block->next = mpool->blocks; } - previous_mpool = mpool; + g_mpool.tail = mpool; remain_count -= current_count; loop_count++; @@ -264,35 +276,28 @@ int free_queue_init(const int max_connections, const int min_buff_size, \ "malloc task info as whole: %d, malloc loop count: %d", \ __LINE__, g_free_queue.malloc_whole_block, loop_count); - if (g_mpool != NULL) + if (g_mpool.head != NULL) { - g_free_queue.head = g_mpool->blocks; - g_free_queue.tail = mpool->last_block; - - /* - struct fast_task_info *pTask; - int task_count = 0; - - pTask = g_free_queue.head; - while (pTask != NULL) - { - task_count++; - pTask = pTask->next; - } - logDebug("file: "__FILE__", line: %d, " \ - "task count: %d", __LINE__, task_count); - */ + g_free_queue.head = g_mpool.head->blocks; + g_free_queue.tail = g_mpool.tail->last_block; } return 0; } +int free_queue_init(const int max_connections, const int min_buff_size, + const int max_buff_size, const int arg_size) +{ + return free_queue_init_ex(max_connections, max_connections, + 0, min_buff_size, max_buff_size, arg_size); +} + void free_queue_destroy() { - struct mpool_chain *mpool; - struct mpool_chain *mp; + struct mpool_node *mpool; + struct mpool_node *mp; - if (g_mpool == NULL) + if (g_mpool.head == NULL) { return; } @@ -301,13 +306,11 @@ void free_queue_destroy() { char *p; char *pCharEnd; - int block_size; struct fast_task_info *pTask; - block_size = ALIGNED_TASK_INFO_SIZE + g_free_queue.arg_size; - pCharEnd = ((char *)g_mpool->blocks) + block_size * \ - g_free_queue.max_connections; - for (p=(char *)g_mpool->blocks; pblocks) + g_free_queue.block_size * \ + g_free_queue.current_connections; + for (p=(char *)g_mpool.head->blocks; pdata != NULL) @@ -318,7 +321,7 @@ void free_queue_destroy() } } - mpool = g_mpool; + mpool = g_mpool.head; while (mpool != NULL) { mp = mpool; @@ -327,14 +330,114 @@ void free_queue_destroy() free(mp->blocks); free(mp); } - g_mpool = NULL; + g_mpool.head = g_mpool.tail = NULL; pthread_mutex_destroy(&(g_free_queue.lock)); } +static int free_queue_realloc() +{ + struct mpool_node *mpool; + struct fast_task_info *head; + struct fast_task_info *tail; + int remain_count; + int current_count; + int current_alloc_size; + int total_alloc_count; + + head = tail = NULL; + total_alloc_count = 0; + remain_count = g_free_queue.max_connections - + g_free_queue.current_connections; + while (remain_count > 0) + { + current_count = (remain_count > g_free_queue.alloc_task_once) ? + g_free_queue.alloc_task_once : remain_count; + current_alloc_size = g_free_queue.block_size * current_count; + mpool = malloc_mpool(current_alloc_size); + if (mpool == NULL) + { + break; + } + + total_alloc_count += current_count; + if (g_mpool.tail == NULL) + { + g_mpool.head = mpool; + } + else + { + g_mpool.tail->next = mpool; + g_mpool.tail->last_block->next = mpool->blocks; + } + g_mpool.tail = mpool; + + if (head == NULL) + { + head = mpool->blocks; + } + tail = mpool->last_block; + + remain_count -= current_count; + } + + if (g_free_queue.head == NULL) + { + g_free_queue.head = head; + } + if (g_free_queue.tail != NULL) + { + g_free_queue.tail->next = head; + } + g_free_queue.tail = tail; + + logDebug("file: "__FILE__", line: %d, " + "alloc %d elements", __LINE__, total_alloc_count); + + if (total_alloc_count > 0) + { + g_free_queue.current_connections += total_alloc_count; + return 0; + } + else + { + return ENOMEM; + } +} + struct fast_task_info *free_queue_pop() { - return task_queue_pop(&g_free_queue);; + struct fast_task_info *pTask; + if ((pTask=task_queue_pop(&g_free_queue)) != NULL) + { + return pTask; + } + + if (g_free_queue.current_connections >= g_free_queue.max_connections) + { + return NULL; + } + + pthread_mutex_lock(&g_free_queue.lock); + if (g_free_queue.current_connections >= g_free_queue.max_connections) + { + if (g_free_queue.head == NULL) + { + pthread_mutex_unlock(&g_free_queue.lock); + return NULL; + } + } + else + { + if (free_queue_realloc() != 0) + { + pthread_mutex_unlock(&g_free_queue.lock); + return NULL; + } + } + pthread_mutex_unlock(&g_free_queue.lock); + + return task_queue_pop(&g_free_queue); } int free_queue_push(struct fast_task_info *pTask) diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 785e656..9b33bbc 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -66,9 +66,12 @@ struct fast_task_queue struct fast_task_info *tail; pthread_mutex_t lock; int max_connections; + int current_connections; + int alloc_task_once; int min_buff_size; int max_buff_size; int arg_size; + int block_size; bool malloc_whole_block; }; @@ -78,6 +81,9 @@ extern "C" { int free_queue_init(const int max_connections, const int min_buff_size, \ const int max_buff_size, const int arg_size); +int free_queue_init_ex(const int max_connections, const int init_connections, + const int alloc_task_once, const int min_buff_size, + const int max_buff_size, const int arg_size); void free_queue_destroy();