diff --git a/.gitignore b/.gitignore index c32eebb..6c44ddd 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,7 @@ src/tests/test_mutex_lock_perf src/tests/test_queue_perf src/tests/test_normalize_path src/tests/test_sorted_array +src/tests/test_sorted_queue # other *.swp diff --git a/HISTORY b/HISTORY index 4517a15..fc6fc35 100644 --- a/HISTORY +++ b/HISTORY @@ -1,7 +1,8 @@ -Version 1.66 2023-03-27 +Version 1.66 2023-05-04 * struct fast_task_info add field: notify_next for nio notify queue * lc_pair in struct fc_queue change to lcp + * sorted queue use double link chain for quick push Version 1.65 2023-01-09 * locked_list.h: add functions locked_list_move and locked_list_move_tail diff --git a/src/fc_list.h b/src/fc_list.h index f9e362c..a3e4c92 100644 --- a/src/fc_list.h +++ b/src/fc_list.h @@ -45,6 +45,16 @@ fc_list_add_before (struct fc_list_head *_new, struct fc_list_head *current) _new->next->prev = _new; } +static inline void +fc_list_add_after (struct fc_list_head *_new, struct fc_list_head *current) +{ + _new->prev = current; + _new->next = current->next; + + current->next->prev = _new; + current->next = _new; +} + static inline void fc_list_add_internal (struct fc_list_head *_new, struct fc_list_head *prev, struct fc_list_head *next) diff --git a/src/sorted_queue.c b/src/sorted_queue.c index 7183f15..481e482 100644 --- a/src/sorted_queue.c +++ b/src/sorted_queue.c @@ -18,135 +18,143 @@ #include "pthread_func.h" #include "sorted_queue.h" -int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset, +int sorted_queue_init(struct sorted_queue *sq, const int dlink_offset, int (*compare_func)(const void *, const void *)) { + int result; + + if ((result=init_pthread_lock_cond_pair(&sq->lcp)) != 0) { + return result; + } + + FC_INIT_LIST_HEAD(&sq->head); + sq->dlink_offset = dlink_offset; sq->compare_func = compare_func; - return fc_queue_init(&sq->queue, next_ptr_offset); + return 0; } void sorted_queue_destroy(struct sorted_queue *sq) { - fc_queue_destroy(&sq->queue); + destroy_pthread_lock_cond_pair(&sq->lcp); } void sorted_queue_push_ex(struct sorted_queue *sq, void *data, bool *notify) { - void *previous; - void *current; + struct fc_list_head *dlink; + struct fc_list_head *current; + int count = 0; - PTHREAD_MUTEX_LOCK(&sq->queue.lcp.lock); - if (sq->queue.tail == NULL) { - FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL; - sq->queue.head = sq->queue.tail = data; + dlink = FC_SORTED_QUEUE_DLINK_PTR(sq, data); + PTHREAD_MUTEX_LOCK(&sq->lcp.lock); + if (fc_list_empty(&sq->head)) { + fc_list_add(dlink, &sq->head); *notify = true; } else { - if (sq->compare_func(data, sq->queue.tail) >= 0) { - FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL; - FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.tail) = data; - sq->queue.tail = data; + if (sq->compare_func(data, FC_SORTED_QUEUE_DATA_PTR( + sq, sq->head.prev)) >= 0) + { + fc_list_add_tail(dlink, &sq->head); *notify = false; - } else if (sq->compare_func(data, sq->queue.head) < 0) { - FC_QUEUE_NEXT_PTR(&sq->queue, data) = sq->queue.head; - sq->queue.head = data; + } else if (sq->compare_func(data, FC_SORTED_QUEUE_DATA_PTR( + sq, sq->head.next)) < 0) + { + fc_list_add(dlink, &sq->head); *notify = true; } else { - previous = sq->queue.head; - current = FC_QUEUE_NEXT_PTR(&sq->queue, previous); - while (sq->compare_func(data, current) >= 0) { - previous = current; - current = FC_QUEUE_NEXT_PTR(&sq->queue, previous); + current = sq->head.prev->prev; + while (sq->compare_func(data, FC_SORTED_QUEUE_DATA_PTR( + sq, current)) < 0) + { + current = current->prev; + ++count; } - - FC_QUEUE_NEXT_PTR(&sq->queue, data) = FC_QUEUE_NEXT_PTR( - &sq->queue, previous); - FC_QUEUE_NEXT_PTR(&sq->queue, previous) = data; + fc_list_add_after(dlink, current); *notify = false; + + if (count >= 10) { + logInfo("================== compare count: %d ==========", count); + } } } - PTHREAD_MUTEX_UNLOCK(&sq->queue.lcp.lock); + PTHREAD_MUTEX_UNLOCK(&sq->lcp.lock); } -void *sorted_queue_pop_ex(struct sorted_queue *sq, - void *less_equal, const bool blocked) +void sorted_queue_pop_ex(struct sorted_queue *sq, void *less_equal, + struct fc_list_head *head, const bool blocked) { - void *data; + struct fc_list_head *current; - PTHREAD_MUTEX_LOCK(&sq->queue.lcp.lock); + PTHREAD_MUTEX_LOCK(&sq->lcp.lock); do { - if (sq->queue.head == NULL || sq->compare_func( - sq->queue.head, less_equal) > 0) - { + if (fc_list_empty(&sq->head)) { if (!blocked) { - data = NULL; + FC_INIT_LIST_HEAD(head); break; } - pthread_cond_wait(&sq->queue.lcp.cond, - &sq->queue.lcp.lock); + pthread_cond_wait(&sq->lcp.cond, + &sq->lcp.lock); } - if (sq->queue.head == NULL) { - data = NULL; + if (fc_list_empty(&sq->head)) { + FC_INIT_LIST_HEAD(head); } else { - if (sq->compare_func(sq->queue.head, less_equal) <= 0) { - data = sq->queue.head; - sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, data); - if (sq->queue.head == NULL) { - sq->queue.tail = NULL; - } - } else { - data = NULL; - } - } - } while (0); - - PTHREAD_MUTEX_UNLOCK(&sq->queue.lcp.lock); - return data; -} - -void sorted_queue_pop_to_queue_ex(struct sorted_queue *sq, - void *less_equal, struct fc_queue_info *qinfo, - const bool blocked) -{ - PTHREAD_MUTEX_LOCK(&sq->queue.lcp.lock); - do { - if (sq->queue.head == NULL) { - if (!blocked) { - qinfo->head = qinfo->tail = NULL; - break; - } - - pthread_cond_wait(&sq->queue.lcp.cond, - &sq->queue.lcp.lock); - } - - if (sq->queue.head == NULL) { - qinfo->head = qinfo->tail = NULL; - } else { - if (sq->compare_func(sq->queue.head, less_equal) <= 0) { - qinfo->head = qinfo->tail = sq->queue.head; - sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, - sq->queue.head); - while (sq->queue.head != NULL && sq->compare_func( - sq->queue.head, less_equal) <= 0) + current = sq->head.next; + if (sq->compare_func(FC_SORTED_QUEUE_DATA_PTR( + sq, current), less_equal) <= 0) + { + head->next = current; + current->prev = head; + current = current->next; + while (current != &sq->head && sq->compare_func( + FC_SORTED_QUEUE_DATA_PTR(sq, current), + less_equal) <= 0) { - qinfo->tail = sq->queue.head; - sq->queue.head = FC_QUEUE_NEXT_PTR( - &sq->queue, sq->queue.head); + current = current->next; } - if (sq->queue.head == NULL) { - sq->queue.tail = NULL; + head->prev = current->prev; + current->prev->next = head; + if (current == &sq->head) { + FC_INIT_LIST_HEAD(&sq->head); } else { - FC_QUEUE_NEXT_PTR(&sq->queue, qinfo->tail) = NULL; + sq->head.next = current; + current->prev = &sq->head; } } else { - qinfo->head = qinfo->tail = NULL; + FC_INIT_LIST_HEAD(head); } } } while (0); - PTHREAD_MUTEX_UNLOCK(&sq->queue.lcp.lock); + PTHREAD_MUTEX_UNLOCK(&sq->lcp.lock); +} + +int sorted_queue_free_chain(struct sorted_queue *sq, + struct fast_mblock_man *mblock, struct fc_list_head *head) +{ + struct fast_mblock_node *previous; + struct fast_mblock_node *current; + struct fast_mblock_chain chain; + struct fc_list_head *node; + + if (fc_list_empty(&sq->head)) { + return 0; + } + + node = head->next; + chain.head = previous = fast_mblock_to_node_ptr( + FC_SORTED_QUEUE_DATA_PTR(sq, node)); + node = node->next; + while (node != head) { + current = fast_mblock_to_node_ptr(FC_SORTED_QUEUE_DATA_PTR(sq, node)); + previous->next = current; + previous = current; + node = node->next; + } + + previous->next = NULL; + chain.tail = previous; + return fast_mblock_batch_free(mblock, &chain); } diff --git a/src/sorted_queue.h b/src/sorted_queue.h index 83c0683..c878b13 100644 --- a/src/sorted_queue.h +++ b/src/sorted_queue.h @@ -18,32 +18,44 @@ #ifndef _FC_SORTED_QUEUE_H #define _FC_SORTED_QUEUE_H -#include "fc_queue.h" +#include "fast_mblock.h" +#include "fc_list.h" struct sorted_queue { - struct fc_queue queue; + struct fc_list_head head; + pthread_lock_cond_pair_t lcp; + int dlink_offset; int (*compare_func)(const void *, const void *); }; +#define FC_SORTED_QUEUE_DLINK_PTR(sq, data) \ + ((void *)(((char *)data) + (sq)->dlink_offset)) + +#define FC_SORTED_QUEUE_DATA_PTR(sq, dlink) \ + ((void *)((char *)(dlink) - (sq)->dlink_offset)) + #ifdef __cplusplus extern "C" { #endif -int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset, +int sorted_queue_init(struct sorted_queue *sq, const int dlink_offset, int (*compare_func)(const void *, const void *)); void sorted_queue_destroy(struct sorted_queue *sq); static inline void sorted_queue_terminate(struct sorted_queue *sq) { - fc_queue_terminate(&sq->queue); + pthread_cond_signal(&sq->lcp.cond); } static inline void sorted_queue_terminate_all( struct sorted_queue *sq, const int count) { - fc_queue_terminate_all(&sq->queue, count); + int i; + for (i=0; ilcp.cond)); + } } //notify by the caller @@ -55,76 +67,33 @@ static inline void sorted_queue_push(struct sorted_queue *sq, void *data) sorted_queue_push_ex(sq, data, ¬ify); if (notify) { - pthread_cond_signal(&(sq->queue.lcp.cond)); + pthread_cond_signal(&(sq->lcp.cond)); } } -static inline void sorted_queue_push_silence(struct sorted_queue *sq, void *data) +static inline void sorted_queue_push_silence( + struct sorted_queue *sq, void *data) { bool notify; sorted_queue_push_ex(sq, data, ¬ify); } -void *sorted_queue_pop_ex(struct sorted_queue *sq, - void *less_equal, const bool blocked); +void sorted_queue_pop_ex(struct sorted_queue *sq, void *less_equal, + struct fc_list_head *head, const bool blocked); -#define sorted_queue_pop(sq, less_equal) \ - sorted_queue_pop_ex(sq, less_equal, true) +#define sorted_queue_pop(sq, less_equal, head) \ + sorted_queue_pop_ex(sq, less_equal, head, true) -#define sorted_queue_try_pop(sq, less_equal) \ - sorted_queue_pop_ex(sq, less_equal, false) - - -void sorted_queue_pop_to_queue_ex(struct sorted_queue *sq, - void *less_equal, struct fc_queue_info *qinfo, - const bool blocked); - -#define sorted_queue_pop_to_queue(sq, less_equal, qinfo) \ - sorted_queue_pop_to_queue_ex(sq, less_equal, qinfo, true) - -#define sorted_queue_try_pop_to_queue(sq, less_equal, qinfo) \ - sorted_queue_pop_to_queue_ex(sq, less_equal, qinfo, false) - - -static inline void *sorted_queue_pop_all_ex(struct sorted_queue *sq, - void *less_equal, const bool blocked) -{ - struct fc_queue_info chain; - sorted_queue_pop_to_queue_ex(sq, less_equal, &chain, blocked); - return chain.head; -} - -#define sorted_queue_pop_all(sq, less_equal) \ - sorted_queue_pop_all_ex(sq, less_equal, true) - -#define sorted_queue_try_pop_all(sq, less_equal) \ - sorted_queue_pop_all_ex(sq, less_equal, false) +#define sorted_queue_try_pop(sq, less_equal, head) \ + sorted_queue_pop_ex(sq, less_equal, head, false) static inline bool sorted_queue_empty(struct sorted_queue *sq) { - return fc_queue_empty(&sq->queue); + return fc_list_empty(&sq->head); } -static inline void *sorted_queue_timedpeek(struct sorted_queue *sq, - const int timeout, const int time_unit) -{ - return fc_queue_timedpeek(&sq->queue, timeout, time_unit); -} - -#define sorted_queue_timedpeek_sec(sq, timeout) \ - sorted_queue_timedpeek(sq, timeout, FC_TIME_UNIT_SECOND) - -#define sorted_queue_timedpeek_ms(sq, timeout_ms) \ - sorted_queue_timedpeek(sq, timeout_ms, FC_TIME_UNIT_MSECOND) - -#define sorted_queue_timedpeek_us(sq, timeout_us) \ - sorted_queue_timedpeek(sq, timeout_us, FC_TIME_UNIT_USECOND) - -static inline int sorted_queue_free_chain(struct sorted_queue *sq, - struct fast_mblock_man *mblock, struct fc_queue_info *qinfo) -{ - return fc_queue_free_chain(&sq->queue, mblock, qinfo); -} +int sorted_queue_free_chain(struct sorted_queue *sq, + struct fast_mblock_man *mblock, struct fc_list_head *head); #ifdef __cplusplus } diff --git a/src/tests/Makefile b/src/tests/Makefile index 25370c4..c4f78ef 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile @@ -10,7 +10,7 @@ ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blo test_json_parser test_pthread_lock test_uniq_skiplist test_split_string \ test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock \ test_pthread_wait test_thread_pool test_data_visible test_mutex_lock_perf \ - test_queue_perf test_normalize_path test_sorted_array + test_queue_perf test_normalize_path test_sorted_array test_sorted_queue all: $(ALL_PRGS) .c: diff --git a/src/tests/test_sorted_queue.c b/src/tests/test_sorted_queue.c new file mode 100644 index 0000000..ae63509 --- /dev/null +++ b/src/tests/test_sorted_queue.c @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the Lesser GNU General Public License, version 3 + * or later ("LGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "fastcommon/sorted_queue.h" +#include "fastcommon/logger.h" +#include "fastcommon/shared_func.h" + +#define COUNT 100 +#define LAST_INDEX (COUNT - 1) + +typedef struct { + int n; + struct fc_list_head dlink; +} DoubleLinkNumber; + +static DoubleLinkNumber *numbers; +static struct sorted_queue sq; + +static int compare_func(const void *p1, const void *p2) +{ + return ((DoubleLinkNumber *)p1)->n - ((DoubleLinkNumber *)p2)->n; +} + +void set_rand_numbers(const int multiple) +{ + int i; + int tmp; + int index1; + int index2; + + for (i=0; in) { + fprintf(stderr, "i: %d != value: %d\n", i, number->n); + break; + } + } + assert(i == COUNT); + + sorted_queue_try_pop(&sq, &less_equal, &head); + assert(fc_list_empty(&head)); +} + +static void test2() +{ +#define MULTIPLE 2 + int i; + int n; + DoubleLinkNumber less_equal; + DoubleLinkNumber *number; + struct fc_list_head head; + + set_rand_numbers(MULTIPLE); + + for (i=0; in) { + fprintf(stderr, "%d. n: %d != value: %d\n", i, n, number->n); + break; + } + } + + less_equal.n = 2 * COUNT + 1; + sorted_queue_try_pop(&sq, &less_equal, &head); + assert(sorted_queue_empty(&sq)); + fc_list_for_each_entry (number, &head, dlink) { + n = i++ * MULTIPLE + 1; + if (n != number->n) { + fprintf(stderr, "%d. n: %d != value: %d\n", i, n, number->n); + break; + } + } + assert(i == COUNT); +} + +int main(int argc, char *argv[]) +{ + int result; + int64_t start_time; + int64_t end_time; + + start_time = get_current_time_ms(); + + log_init(); + numbers = (DoubleLinkNumber *)malloc(sizeof(DoubleLinkNumber) * COUNT); + srand(time(NULL)); + + if ((result=sorted_queue_init(&sq, (long)(&((DoubleLinkNumber *) + NULL)->dlink), compare_func)) != 0) + { + return result; + } + + test1(); + test2(); + + end_time = get_current_time_ms(); + printf("pass OK, time used: %"PRId64" ms\n", end_time - start_time); + return 0; +} +