diff --git a/HISTORY b/HISTORY index 2033768..adb90ce 100644 --- a/HISTORY +++ b/HISTORY @@ -1,9 +1,10 @@ -Version 1.54 2021-08-17 +Version 1.54 2021-09-02 * fast_allocator.[hc]: correct reclaim_interval logic * shared_func.[hc]: add functions getFileContentEx1 and getFileContent1 * fc_queue.[hc]: add function fc_queue_timedpeek * pthread_func.[hc]: add function init_pthread_rwlock + * add files: sorted_queue.[hc] Version 1.53 2021-06-30 * process_action support action status diff --git a/src/Makefile.in b/src/Makefile.in index b5d7477..b651b44 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -14,9 +14,10 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \ fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \ system_info.lo fast_blocked_queue.lo id_generator.lo \ char_converter.lo char_convert_loader.lo common_blocked_queue.lo \ - multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \ - json_parser.lo buffered_file_writer.lo server_id_func.lo \ - fc_queue.lo fc_memory.lo shared_buffer.lo thread_pool.lo + multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \ + json_parser.lo buffered_file_writer.lo server_id_func.lo \ + fc_queue.lo sorted_queue.lo fc_memory.lo shared_buffer.lo \ + thread_pool.lo FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ logger.o sockopt.o base64.o sched_thread.o \ @@ -27,9 +28,10 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ fast_buffer.o multi_skiplist.o flat_skiplist.o \ system_info.o fast_blocked_queue.o id_generator.o \ char_converter.o char_convert_loader.o common_blocked_queue.o \ - multi_socket_client.o skiplist_set.o uniq_skiplist.o \ + multi_socket_client.o skiplist_set.o uniq_skiplist.o \ json_parser.o buffered_file_writer.o server_id_func.o \ - fc_queue.o fc_memory.o shared_buffer.o thread_pool.o + fc_queue.o sorted_queue.o fc_memory.o shared_buffer.o \ + thread_pool.o HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ @@ -43,8 +45,8 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ char_convert_loader.h common_blocked_queue.h \ multi_socket_client.h skiplist_set.h uniq_skiplist.h \ fc_list.h locked_list.h json_parser.h buffered_file_writer.h \ - server_id_func.h fc_queue.h fc_memory.h shared_buffer.h \ - thread_pool.h fc_atomic.h + server_id_func.h fc_queue.h sorted_queue.h fc_memory.h \ + shared_buffer.h thread_pool.h fc_atomic.h ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) diff --git a/src/fc_queue.c b/src/fc_queue.c index 2da4fca..f31ab0c 100644 --- a/src/fc_queue.c +++ b/src/fc_queue.c @@ -15,17 +15,9 @@ //fc_queue.c -#include -#include -#include -#include "logger.h" -#include "shared_func.h" #include "pthread_func.h" #include "fc_queue.h" -#define FC_QUEUE_NEXT_PTR(queue, data) \ - *((void **)(((char *)data) + queue->next_ptr_offset)) - int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset) { int result; diff --git a/src/fc_queue.h b/src/fc_queue.h index 17ea458..742de5a 100644 --- a/src/fc_queue.h +++ b/src/fc_queue.h @@ -18,10 +18,6 @@ #ifndef _FC_QUEUE_H #define _FC_QUEUE_H -#include -#include -#include -#include #include "common_define.h" struct fc_queue_info @@ -38,6 +34,10 @@ struct fc_queue int next_ptr_offset; }; + +#define FC_QUEUE_NEXT_PTR(queue, data) \ + *((void **)(((char *)data) + (queue)->next_ptr_offset)) + #ifdef __cplusplus extern "C" { #endif diff --git a/src/sorted_queue.c b/src/sorted_queue.c new file mode 100644 index 0000000..58e8cdc --- /dev/null +++ b/src/sorted_queue.c @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the Lesser GNU General Public License, version 3 + * or later ("LGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +//sorted_queue.c + +#include "pthread_func.h" +#include "sorted_queue.h" + +int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset, + int (*compare_func)(const void *, const void *)) +{ + sq->compare_func = compare_func; + return fc_queue_init(&sq->queue, next_ptr_offset); +} + +void sorted_queue_destroy(struct sorted_queue *sq) +{ + fc_queue_destroy(&sq->queue); +} + +void sorted_queue_push_ex(struct sorted_queue *sq, void *data, bool *notify) +{ + void *previous; + void *current; + + PTHREAD_MUTEX_LOCK(&sq->queue.lc_pair.lock); + if (sq->queue.tail == NULL) { + FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL; + sq->queue.head = sq->queue.tail = data; + *notify = true; + } else { + if (sq->compare_func(data, sq->queue.tail) >= 0) { + FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL; + FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.tail) = data; + sq->queue.tail = data; + } else if (sq->compare_func(data, sq->queue.head) < 0) { + FC_QUEUE_NEXT_PTR(&sq->queue, data) = sq->queue.head; + sq->queue.head = data; + } else { + previous = sq->queue.head; + current = FC_QUEUE_NEXT_PTR(&sq->queue, previous); + while (sq->compare_func(data, current) >= 0) { + previous = current; + current = FC_QUEUE_NEXT_PTR(&sq->queue, previous); + } + + FC_QUEUE_NEXT_PTR(&sq->queue, data) = FC_QUEUE_NEXT_PTR( + &sq->queue, previous); + FC_QUEUE_NEXT_PTR(&sq->queue, previous) = data; + } + *notify = false; + } + + PTHREAD_MUTEX_UNLOCK(&sq->queue.lc_pair.lock); +} + +void *sorted_queue_pop_ex(struct sorted_queue *sq, + void *less_equal, const bool blocked) +{ + void *data; + + PTHREAD_MUTEX_LOCK(&sq->queue.lc_pair.lock); + do { + if (sq->queue.head == NULL) { + if (!blocked) { + data = NULL; + break; + } + + pthread_cond_wait(&sq->queue.lc_pair.cond, + &sq->queue.lc_pair.lock); + } + + if (sq->queue.head == NULL) { + data = NULL; + } else { + if (sq->compare_func(sq->queue.head, less_equal) <= 0) { + data = sq->queue.head; + sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, data); + if (sq->queue.head == NULL) { + sq->queue.tail = NULL; + } + } else { + data = NULL; + } + } + } while (0); + + PTHREAD_MUTEX_UNLOCK(&sq->queue.lc_pair.lock); + return data; +} + +void *sorted_queue_pop_all_ex(struct sorted_queue *sq, + void *less_equal, const bool blocked) +{ + struct fc_queue_info chain; + + PTHREAD_MUTEX_LOCK(&sq->queue.lc_pair.lock); + do { + if (sq->queue.head == NULL) { + if (!blocked) { + chain.head = NULL; + break; + } + + pthread_cond_wait(&sq->queue.lc_pair.cond, + &sq->queue.lc_pair.lock); + } + + if (sq->queue.head == NULL) { + chain.head = NULL; + } else { + if (sq->compare_func(sq->queue.head, less_equal) <= 0) { + chain.head = chain.tail = sq->queue.head; + sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.head); + while (sq->compare_func(sq->queue.head, less_equal) <= 0) { + chain.tail = sq->queue.head; + sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.head); + } + + if (sq->queue.head == NULL) { + sq->queue.tail = NULL; + } else { + FC_QUEUE_NEXT_PTR(&sq->queue, chain.tail) = NULL; + } + } else { + chain.head = NULL; + } + } + } while (0); + + PTHREAD_MUTEX_UNLOCK(&sq->queue.lc_pair.lock); + return chain.head; +} diff --git a/src/sorted_queue.h b/src/sorted_queue.h new file mode 100644 index 0000000..5f997dc --- /dev/null +++ b/src/sorted_queue.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the Lesser GNU General Public License, version 3 + * or later ("LGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +//sorted_queue.h + +#ifndef _FC_SORTED_QUEUE_H +#define _FC_SORTED_QUEUE_H + +#include "fc_queue.h" + +struct sorted_queue +{ + struct fc_queue queue; + int next_ptr_offset; + int (*compare_func)(const void *, const void *); +}; + +#ifdef __cplusplus +extern "C" { +#endif + +int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset, + int (*compare_func)(const void *, const void *)); + +void sorted_queue_destroy(struct sorted_queue *sq); + +static inline void sorted_queue_terminate(struct sorted_queue *sq) +{ + fc_queue_terminate(&sq->queue); +} + +static inline void sorted_queue_terminate_all( + struct sorted_queue *sq, const int count) +{ + fc_queue_terminate_all(&sq->queue, count); +} + +//notify by the caller +void sorted_queue_push_ex(struct sorted_queue *sq, void *data, bool *notify); + +static inline void sorted_queue_push(struct sorted_queue *sq, void *data) +{ + bool notify; + + sorted_queue_push_ex(sq, data, ¬ify); + if (notify) { + pthread_cond_signal(&(sq->queue.lc_pair.cond)); + } +} + +static inline void sorted_queue_push_silence(struct sorted_queue *sq, void *data) +{ + bool notify; + sorted_queue_push_ex(sq, data, ¬ify); +} + +void *sorted_queue_pop_ex(struct sorted_queue *sq, + void *less_equal, const bool blocked); + +#define sorted_queue_pop(sq, less_equal) \ + sorted_queue_pop_ex(sq, less_equal, true) + +#define sorted_queue_try_pop(sq, less_equal) \ + sorted_queue_pop_ex(sq, less_equal, false) + +void *sorted_queue_pop_all_ex(struct sorted_queue *sq, + void *less_equal, const bool blocked); + +#define sorted_queue_pop_all(sq, less_equal) \ + sorted_queue_pop_all_ex(sq, less_equal, true) + +#define sorted_queue_try_pop_all(sq, less_equal) \ + sorted_queue_pop_all_ex(sq, less_equal, false) + +static inline bool sorted_queue_empty(struct sorted_queue *sq) +{ + return fc_queue_empty(&sq->queue); +} + +#ifdef __cplusplus +} +#endif + +#endif