diff --git a/HISTORY b/HISTORY index 9c9d8ab..b9e5e66 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-04-26 +Version 1.44 2020-05-06 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -27,6 +27,7 @@ Version 1.44 2020-04-26 * add function fc_get_file_line_count_ex * uniq_skiplist add function find_ge and support bidirection * connection_pool support validate connection on error + * fast_task_queue.[hc]: free_queue support init_callback Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/src/common_define.h b/src/common_define.h index a690e7f..0fbf94b 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -293,6 +293,19 @@ static inline bool fc_string_equal2(const string_t *s1, #define fc_string_equals(s1, s2) fc_string_equal(s1, s2) #define fc_string_equals2(s1, str2, len2) fc_string_equal2(s1, str2, len2) +static inline int fc_compare_int64(const int64_t n1, const int64_t n2) +{ + int64_t sub; + sub = n1 - n2; + if (sub < 0) { + return -1; + } else if (sub > 0) { + return 1; + } else { + return 0; + } +} + #ifdef __cplusplus } #endif diff --git a/src/fast_task_queue.c b/src/fast_task_queue.c index 3a68cda..be83b45 100644 --- a/src/fast_task_queue.c +++ b/src/fast_task_queue.c @@ -42,6 +42,18 @@ int task_queue_init(struct fast_task_queue *pQueue) return 0; } +static void free_mpool(struct mpool_node *mpool, char *end) +{ + char *pt; + for (pt=(char *)mpool->blocks; pt < end; pt += g_free_queue.block_size) + { + free(((struct fast_task_info *)pt)->data); + } + + free(mpool->blocks); + free(mpool); +} + static struct mpool_node *malloc_mpool(const int total_alloc_size) { struct fast_task_info *pTask; @@ -91,29 +103,31 @@ static struct mpool_node *malloc_mpool(const int total_alloc_size) pTask->data = (char *)malloc(pTask->size); if (pTask->data == NULL) { - char *pt; - logError("file: "__FILE__", line: %d, " \ "malloc %d bytes fail, " \ "errno: %d, error info: %s", \ __LINE__, pTask->size, \ errno, STRERROR(errno)); - for (pt=(char *)mpool->blocks; pt < p; \ - pt += g_free_queue.block_size) - { - free(((struct fast_task_info *)pt)->data); - } - - free(mpool->blocks); - free(mpool); + free_mpool(mpool, p); return NULL; } } + + if (g_free_queue.init_callback != NULL) + { + if (g_free_queue.init_callback(pTask) != 0) + { + free_mpool(mpool, p); + return NULL; + } + } } - mpool->last_block = (struct fast_task_info *)(pCharEnd - g_free_queue.block_size); - for (p=(char *)mpool->blocks; p<(char *)mpool->last_block; p += g_free_queue.block_size) + mpool->last_block = (struct fast_task_info *) + (pCharEnd - g_free_queue.block_size); + for (p=(char *)mpool->blocks; p<(char *)mpool->last_block; + p += g_free_queue.block_size) { pTask = (struct fast_task_info *)p; pTask->next = (struct fast_task_info *)(p + g_free_queue.block_size); @@ -123,9 +137,10 @@ static struct mpool_node *malloc_mpool(const int total_alloc_size) return mpool; } -int free_queue_init_ex(const int max_connections, const int init_connections, +int free_queue_init_ex2(const int max_connections, const int init_connections, const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int arg_size) + const int max_buff_size, const int arg_size, + TaskInitCallback init_callback) { #define MAX_DATA_SIZE (256 * 1024 * 1024) int64_t total_size; @@ -183,8 +198,8 @@ int free_queue_init_ex(const int max_connections, const int init_connections, } } - if (max_data_size >= (int64_t)(g_free_queue.block_size + aligned_min_size) * - (int64_t)init_connections) + if (max_data_size >= (int64_t)(g_free_queue.block_size + + aligned_min_size) * (int64_t)init_connections) { total_size = alloc_size + (int64_t)aligned_min_size * init_connections; @@ -217,6 +232,7 @@ int free_queue_init_ex(const int max_connections, const int init_connections, g_free_queue.min_buff_size = aligned_min_size; g_free_queue.max_buff_size = aligned_max_size; g_free_queue.arg_size = aligned_arg_size; + g_free_queue.init_callback = init_callback; logDebug("file: "__FILE__", line: %d, " "max_connections: %d, init_connections: %d, alloc_task_once: %d, " @@ -290,13 +306,6 @@ int free_queue_init_ex(const int max_connections, const int init_connections, return 0; } -int free_queue_init(const int max_connections, const int min_buff_size, - const int max_buff_size, const int arg_size) -{ - return free_queue_init_ex(max_connections, max_connections, - 0, min_buff_size, max_buff_size, arg_size); -} - void free_queue_destroy() { struct mpool_node *mpool; diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 550fd77..e71b552 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -30,6 +30,7 @@ struct fast_task_info; typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData); typedef int (*TaskFinishCallback) (struct fast_task_info *pTask); typedef void (*TaskCleanUpCallback) (struct fast_task_info *pTask); +typedef int (*TaskInitCallback)(struct fast_task_info *pTask); typedef void (*IOEventCallback) (int sock, short event, void *arg); @@ -94,17 +95,32 @@ struct fast_task_queue int arg_size; int block_size; bool malloc_whole_block; + TaskInitCallback init_callback; }; #ifdef __cplusplus extern "C" { #endif -int free_queue_init(const int max_connections, const int min_buff_size, \ - const int max_buff_size, const int arg_size); -int free_queue_init_ex(const int max_connections, const int init_connections, +int free_queue_init_ex2(const int max_connections, const int init_connections, const int alloc_task_once, const int min_buff_size, - const int max_buff_size, const int arg_size); + const int max_buff_size, const int arg_size, + TaskInitCallback init_callback); + +static inline int free_queue_init_ex(const int max_connections, + const int init_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, const int arg_size) +{ + return free_queue_init_ex2(max_connections, init_connections, + alloc_task_once, min_buff_size, max_buff_size, arg_size, NULL); +} + +static inline int free_queue_init(const int max_connections, + const int min_buff_size, const int max_buff_size, const int arg_size) +{ + return free_queue_init_ex2(max_connections, max_connections, + 0, min_buff_size, max_buff_size, arg_size, NULL); +} void free_queue_destroy();