diff --git a/HISTORY b/HISTORY index 203bb65..52b29af 100644 --- a/HISTORY +++ b/HISTORY @@ -1,4 +1,7 @@ +Version 1.51 2021-05-18 + * fast_mblock.[hc]: support batch alloc and batch free + Version 1.50 2021-05-11 * add function is_digital_string * add function fc_server_load_from_ini_context diff --git a/src/fast_mblock.c b/src/fast_mblock.c index 43f687e..197d0a3 100644 --- a/src/fast_mblock.c +++ b/src/fast_mblock.c @@ -623,21 +623,12 @@ void fast_mblock_destroy(struct fast_mblock_man *mblock) delete_from_mblock_list(mblock); } -struct fast_mblock_node *fast_mblock_alloc(struct fast_mblock_man *mblock) +static inline struct fast_mblock_node *alloc_node( + struct fast_mblock_man *mblock) { struct fast_mblock_node *pNode; int result; - if (mblock->need_lock && (result=pthread_mutex_lock( - &mblock->lcp.lock)) != 0) - { - logError("file: "__FILE__", line: %d, " - "call pthread_mutex_lock fail, " - "errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); - return NULL; - } - while (1) { if (mblock->free_chain_head != NULL) @@ -688,6 +679,27 @@ struct fast_mblock_node *fast_mblock_alloc(struct fast_mblock_man *mblock) mblock->info.element_used_count++; fast_mblock_ref_counter_inc(mblock, pNode); } + + return pNode; +} + +struct fast_mblock_node *fast_mblock_alloc(struct fast_mblock_man *mblock) +{ + struct fast_mblock_node *pNode; + int result; + + if (mblock->need_lock && (result=pthread_mutex_lock( + &mblock->lcp.lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_lock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return NULL; + } + + pNode = alloc_node(mblock); + if (mblock->need_lock && (result=pthread_mutex_unlock( &mblock->lcp.lock)) != 0) { @@ -700,7 +712,7 @@ struct fast_mblock_node *fast_mblock_alloc(struct fast_mblock_man *mblock) return pNode; } -int fast_mblock_free(struct fast_mblock_man *mblock, \ +int fast_mblock_free(struct fast_mblock_man *mblock, struct fast_mblock_node *pNode) { int result; @@ -729,13 +741,120 @@ int fast_mblock_free(struct fast_mblock_man *mblock, \ if (mblock->need_lock && (result=pthread_mutex_unlock( &mblock->lcp.lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + } + + return 0; +} + +static inline void batch_free(struct fast_mblock_man *mblock, + struct fast_mblock_chain *chain) +{ + bool notify; + struct fast_mblock_node *pNode; + + pNode = chain->head; + while (pNode != NULL) + { + mblock->info.element_used_count--; + fast_mblock_ref_counter_dec(mblock, pNode); + pNode = pNode->next; + } + + notify = (mblock->free_chain_head == NULL); + chain->tail->next = mblock->free_chain_head; + mblock->free_chain_head = chain->head; + if (mblock->alloc_elements.need_wait && notify) + { + pthread_cond_broadcast(&mblock->lcp.cond); + } +} + +struct fast_mblock_node *fast_mblock_batch_alloc( + struct fast_mblock_man *mblock, const int count) +{ + struct fast_mblock_chain chain; + struct fast_mblock_node *pNode; + int i; + int result; + + if (mblock->need_lock && (result=pthread_mutex_lock( + &mblock->lcp.lock)) != 0) { logError("file: "__FILE__", line: %d, " - "call pthread_mutex_unlock fail, " + "call pthread_mutex_lock fail, " "errno: %d, error info: %s", __LINE__, result, STRERROR(result)); + return NULL; } + if ((chain.head=alloc_node(mblock)) != NULL) + { + chain.tail = chain.head; + for (i=1; inext = pNode; + chain.tail = pNode; + } + chain.tail->next = NULL; + + if (i != count) { //fail + batch_free(mblock, &chain); + chain.head = NULL; + } + } + + if (mblock->need_lock && (result=pthread_mutex_unlock( + &mblock->lcp.lock)) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "call pthread_mutex_unlock fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + } + + return chain.head; +} + +int fast_mblock_batch_free(struct fast_mblock_man *mblock, + struct fast_mblock_chain *chain) +{ + int result; + + if (chain->head == NULL) { + return ENOENT; + } + + if (mblock->need_lock && (result=pthread_mutex_lock( + &mblock->lcp.lock)) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "call pthread_mutex_lock fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + return result; + } + + batch_free(mblock, chain); + + if (mblock->need_lock && (result=pthread_mutex_unlock( + &mblock->lcp.lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + } + return 0; } diff --git a/src/fast_mblock.h b/src/fast_mblock.h index 441ce17..54787b2 100644 --- a/src/fast_mblock.h +++ b/src/fast_mblock.h @@ -243,6 +243,26 @@ return 0 for success, return none zero if fail int fast_mblock_free(struct fast_mblock_man *mblock, struct fast_mblock_node *pNode); +/** +batch alloc nodes from the mblock +parameters: + mblock: the mblock pointer + count: alloc count +return the alloced node head, return NULL if fail +*/ +struct fast_mblock_node *fast_mblock_batch_alloc( + struct fast_mblock_man *mblock, const int count); + +/** +batch free nodes +parameters: + mblock: the mblock pointer + chain: the node chain to free +return 0 for success, return none zero if fail +*/ +int fast_mblock_batch_free(struct fast_mblock_man *mblock, + struct fast_mblock_chain *chain); + /** delay free a node (put a node to the mblock) parameters: diff --git a/src/tests/Makefile b/src/tests/Makefile index 539a728..85568f8 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile @@ -9,7 +9,7 @@ ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blo test_logger test_skiplist_set test_crc32 test_thourands_seperator test_sched_thread \ test_json_parser test_pthread_lock test_uniq_skiplist test_split_string \ test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock \ - test_thread_pool test_data_visible test_mutex_lock_perf + test_thread_pool test_data_visible test_mutex_lock_perf test_queue_perf all: $(ALL_PRGS) .c: diff --git a/src/tests/test_queue_perf.c b/src/tests/test_queue_perf.c new file mode 100644 index 0000000..b26c2bf --- /dev/null +++ b/src/tests/test_queue_perf.c @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the Lesser GNU General Public License, version 3 + * or later ("LGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/logger.h" +#include "fastcommon/shared_func.h" +#include "fastcommon/sched_thread.h" +#include "fastcommon/fc_queue.h" + +typedef struct my_record { + char type; + struct my_record *next; //for queue +} MyRecord; + +const int LOOP_COUNT = 20 * 1000 * 1000; +static volatile bool g_continue_flag = true; +static struct fast_mblock_man record_allocator; +static struct fc_queue queue; + +void *producer_thread(void *arg) +{ + const int BATCH_SIZE = 16; + int64_t count; + struct fast_mblock_node *node; + struct fc_queue_info qinfo; + MyRecord *record; + + count = 0; + while (g_continue_flag && count < LOOP_COUNT) { + qinfo.head = qinfo.tail = NULL; + + node = fast_mblock_batch_alloc( + &record_allocator, BATCH_SIZE); + if (node == NULL) { + g_continue_flag = false; + return NULL; + } + + do { + record = (MyRecord *)node->data; + if (qinfo.head == NULL) { + qinfo.head = record; + } else { + ((MyRecord *)qinfo.tail)->next = record; + } + qinfo.tail = record; + + node = node->next; + } while (node != NULL); + + count += BATCH_SIZE; + ((MyRecord *)qinfo.tail)->next = NULL; + fc_queue_push_queue_to_tail(&queue, &qinfo); + } + + return NULL; +} + +static void sigQuitHandler(int sig) +{ + g_continue_flag = false; + fc_queue_terminate(&queue); + + logCrit("file: "__FILE__", line: %d, " \ + "catch signal %d, program exiting...", \ + __LINE__, sig); +} + +int main(int argc, char *argv[]) +{ + const int alloc_elements_once = 8 * 1024; + int elements_limit; + pthread_t tid; + struct sigaction act; + int result; + int qps; + int64_t count; + struct fast_mblock_node *node; + MyRecord *record; + struct fast_mblock_chain chain; + int64_t start_time; + int64_t end_time; + int64_t time_used; + char time_buff[32]; + + start_time = get_current_time_ms(); + + srand(time(NULL)); + log_init(); + g_log_context.log_level = LOG_DEBUG; + + memset(&act, 0, sizeof(act)); + sigemptyset(&act.sa_mask); + act.sa_handler = sigQuitHandler; + if(sigaction(SIGINT, &act, NULL) < 0 || + sigaction(SIGTERM, &act, NULL) < 0 || + sigaction(SIGQUIT, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + logCrit("exit abnormally!\n"); + return errno; + } + + fast_mblock_manager_init(); + + elements_limit = 2 * alloc_elements_once; + //elements_limit = 0; + if ((result=fast_mblock_init_ex1(&record_allocator, + "my_record", sizeof(MyRecord), + alloc_elements_once, elements_limit, + NULL, NULL, true)) != 0) + { + return result; + } + if (elements_limit > 0) { + fast_mblock_set_need_wait(&record_allocator, + true, (bool *)&g_continue_flag); + } + + if ((result=fc_queue_init(&queue, (long)( + &((MyRecord *)NULL)->next))) != 0) + { + return result; + } + + pthread_create(&tid, NULL, producer_thread, NULL); + + count = 0; + while (g_continue_flag && count < LOOP_COUNT) { + /* + record = (MyRecord *)fc_queue_pop(&queue); + if (record != NULL) { + ++count; + fast_mblock_free_object(&record_allocator, record); + } + */ + + if ((record=(MyRecord *)fc_queue_pop_all(&queue)) == NULL) { + continue; + } + + chain.head = chain.tail = NULL; + while (record != NULL) { + ++count; + node = fast_mblock_to_node_ptr(record); + if (chain.head == NULL) { + chain.head = node; + } else { + chain.tail->next = node; + } + chain.tail = node; + + record = record->next; + } + chain.tail->next = NULL; + fast_mblock_batch_free(&record_allocator, &chain); + } + + end_time = get_current_time_ms(); + time_used = end_time - start_time; + long_to_comma_str(time_used, time_buff); + + fast_mblock_manager_stat_print(false); + + qps = count * 1000LL / time_used; + printf("time used: %s ms, QPS: %d\n", time_buff, qps); + return 0; +}