From a1914ea249c86dfbc00b44964bf0149d25bc38b2 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Sat, 30 Sep 2023 14:46:56 +0800 Subject: [PATCH] add functions: fc_queue_push_with_check and fc_queue_peek --- HISTORY | 3 ++- src/fc_queue.c | 45 ++++++++++++++++++++++++++++++++++++++++++++- src/fc_queue.h | 25 +++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 2 deletions(-) diff --git a/HISTORY b/HISTORY index 66015e4..a89b36c 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.70 2023-09-25 +Version 1.70 2023-09-30 * get full mac address of infiniband NIC under Linux * struct fast_task_info add field conn for RDMA connection * server_id_func.[hc]: support communication type @@ -7,6 +7,7 @@ Version 1.70 2023-09-25 * nio thread data support busy_polling_callback * connection_pool.[hc] support thread local for performance * struct fast_task_info support send and recv double buffers + * add functions: fc_queue_push_with_check and fc_queue_peek Version 1.69 2023-08-05 * bugfixed: array_allocator_alloc MUST init the array diff --git a/src/fc_queue.c b/src/fc_queue.c index c8f30c4..2a40c45 100644 --- a/src/fc_queue.c +++ b/src/fc_queue.c @@ -50,10 +50,53 @@ void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify) *notify = false; } queue->tail = data; - PTHREAD_MUTEX_UNLOCK(&queue->lcp.lock); } +static inline bool fc_queue_exists(struct fc_queue *queue, void *data) +{ + void *current; + if (queue->head == NULL) { + return false; + } + + current = queue->head; + do { + if (current == data) { + return true; + } + current = FC_QUEUE_NEXT_PTR(queue, current); + } while (current != NULL); + + return false; +} + +int fc_queue_push_with_check_ex(struct fc_queue *queue, + void *data, bool *notify) +{ + int result; + + PTHREAD_MUTEX_LOCK(&queue->lcp.lock); + if (fc_queue_exists(queue, data)) { + result = EEXIST; + *notify = false; + } else { + result = 0; + FC_QUEUE_NEXT_PTR(queue, data) = NULL; + if (queue->tail == NULL) { + queue->head = data; + *notify = true; + } else { + FC_QUEUE_NEXT_PTR(queue, queue->tail) = data; + *notify = false; + } + queue->tail = data; + } + PTHREAD_MUTEX_UNLOCK(&queue->lcp.lock); + + return result; +} + void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked) { void *data; diff --git a/src/fc_queue.h b/src/fc_queue.h index 1533435..7765811 100644 --- a/src/fc_queue.h +++ b/src/fc_queue.h @@ -68,6 +68,8 @@ static inline void fc_queue_terminate_all( //notify by the caller void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify); +int fc_queue_push_with_check_ex(struct fc_queue *queue, + void *data, bool *notify); static inline void fc_queue_push(struct fc_queue *queue, void *data) { @@ -79,6 +81,19 @@ static inline void fc_queue_push(struct fc_queue *queue, void *data) } } +static inline int fc_queue_push_with_check(struct fc_queue *queue, void *data) +{ + int result; + bool notify; + + result = fc_queue_push_with_check_ex(queue, data, ¬ify); + if (notify) { + pthread_cond_signal(&(queue->lcp.cond)); + } + + return result; +} + static inline void fc_queue_push_silence(struct fc_queue *queue, void *data) { bool notify; @@ -171,6 +186,16 @@ static inline int fc_queue_count(struct fc_queue *queue) return count; } +static inline void *fc_queue_peek(struct fc_queue *queue) +{ + void *data; + + pthread_mutex_lock(&queue->lcp.lock); + data = queue->head; + pthread_mutex_unlock(&queue->lcp.lock); + return data; +} + void *fc_queue_timedpop(struct fc_queue *queue, const int timeout, const int time_unit);