diff --git a/HISTORY b/HISTORY index ecf51f3..7caaa26 100644 --- a/HISTORY +++ b/HISTORY @@ -1,8 +1,9 @@ -Version 1.48 2021-01-29 +Version 1.48 2021-02-01 * fast_buffer.[hc]: add function fast_buffer_append_binary * fc_check_mkdir_ex return 0 when mkdir with errno EEXIST * add function common_blocked_queue_timedpop + * add function fc_queue_timedpop Version 1.47 2021-01-20 * fc_atomic.h: add FC_ATOMIC_GET and FC_ATOMIC_SET etc. diff --git a/src/common_blocked_queue.c b/src/common_blocked_queue.c index 9fe589d..8945cf3 100644 --- a/src/common_blocked_queue.c +++ b/src/common_blocked_queue.c @@ -190,8 +190,6 @@ void *common_blocked_queue_timedpop(struct common_blocked_queue *queue, { struct common_blocked_node *node; void *data; - struct timespec ts; - int seconds; int result; if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0) @@ -207,32 +205,7 @@ void *common_blocked_queue_timedpop(struct common_blocked_queue *queue, node = queue->head; if (node == NULL) { - switch (time_unit) { - case FC_TIME_UNIT_SECOND: - seconds = timeout; - ts.tv_nsec = 0; - break; - case FC_TIME_UNIT_MSECOND: - seconds = timeout / 1000; - ts.tv_nsec = (timeout % 1000) * (1000 * 1000); - break; - case FC_TIME_UNIT_USECOND: - seconds = timeout / (1000 * 1000); - ts.tv_nsec = (timeout % (1000 * 1000)) * 1000; - break; - case FC_TIME_UNIT_NSECOND: - seconds = timeout / (1000 * 1000 * 1000); - ts.tv_nsec = timeout % (1000 * 1000 * 1000); - break; - default: - seconds = timeout; - ts.tv_nsec = 0; - break; - } - - ts.tv_sec = get_current_time() + seconds; - pthread_cond_timedwait(&queue->lc_pair.cond, - &queue->lc_pair.lock, &ts); + fc_cond_timedwait(&queue->lc_pair, timeout, time_unit); node = queue->head; } diff --git a/src/fc_queue.c b/src/fc_queue.c index b48453b..5e27bf7 100644 --- a/src/fc_queue.c +++ b/src/fc_queue.c @@ -148,3 +148,28 @@ void fc_queue_pop_to_queue(struct fc_queue *queue, } PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); } + +void *fc_queue_timedpop(struct fc_queue *queue, + const int timeout, const int time_unit) +{ + void *data; + + PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock); + do { + data = queue->head; + if (data == NULL) { + fc_cond_timedwait(&queue->lc_pair, timeout, time_unit); + data = queue->head; + } + + if (data != NULL) { + queue->head = FC_QUEUE_NEXT_PTR(queue, data); + if (queue->head == NULL) { + queue->tail = NULL; + } + } + } while (0); + + PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); + return data; +} diff --git a/src/fc_queue.h b/src/fc_queue.h index 2434d20..e5ce71e 100644 --- a/src/fc_queue.h +++ b/src/fc_queue.h @@ -114,6 +114,18 @@ static inline bool fc_queue_empty(struct fc_queue *queue) return empty; } +void *fc_queue_timedpop(struct fc_queue *queue, + const int timeout, const int time_unit); + +#define fc_queue_timedpop_sec(queue, timeout) \ + fc_queue_timedpop(queue, timeout, FC_TIME_UNIT_SECOND) + +#define fc_queue_timedpop_ms(queue, timeout_ms) \ + fc_queue_timedpop(queue, timeout_ms, FC_TIME_UNIT_MSECOND) + +#define fc_queue_timedpop_us(queue, timeout_us) \ + fc_queue_timedpop(queue, timeout_us, FC_TIME_UNIT_USECOND) + #ifdef __cplusplus } #endif diff --git a/src/pthread_func.h b/src/pthread_func.h index 53a0f5d..02c9080 100644 --- a/src/pthread_func.h +++ b/src/pthread_func.h @@ -95,6 +95,60 @@ static inline void fc_timedwait_ms(pthread_mutex_t *lock, PTHREAD_MUTEX_UNLOCK(lock); } +static inline int fc_timeout_to_timespec(const int timeout, + const int time_unit, struct timespec *ts) +{ + int seconds; + + switch (time_unit) { + case FC_TIME_UNIT_SECOND: + seconds = timeout; + ts->tv_nsec = 0; + break; + case FC_TIME_UNIT_MSECOND: + seconds = timeout / 1000; + ts->tv_nsec = (timeout % 1000) * (1000 * 1000); + break; + case FC_TIME_UNIT_USECOND: + seconds = timeout / (1000 * 1000); + ts->tv_nsec = (timeout % (1000 * 1000)) * 1000; + break; + case FC_TIME_UNIT_NSECOND: + seconds = timeout / (1000 * 1000 * 1000); + ts->tv_nsec = timeout % (1000 * 1000 * 1000); + break; + default: + logError("file: "__FILE__", line: %d, " + "invalid time unit: %d", __LINE__, time_unit); + return EINVAL; + } + + ts->tv_sec = get_current_time() + seconds; + return 0; +} + +static inline int fc_cond_timedwait(pthread_lock_cond_pair_t *lcp, + const int timeout, const int time_unit) +{ + struct timespec ts; + int result; + + if ((result=fc_timeout_to_timespec(timeout, time_unit, &ts)) != 0) { + return result; + } + return pthread_cond_timedwait(&lcp->cond, &lcp->lock, &ts); +} + +#define fc_cond_timedwait_sec(lcp, timeout) \ + fc_cond_timedwait(lcp, timeout, FC_TIME_UNIT_SECOND) + +#define fc_cond_timedwait_ms(lcp, timeout_ms) \ + fc_cond_timedwait(lcp, timeout_ms, FC_TIME_UNIT_MSECOND) + +#define fc_cond_timedwait_us(lcp, timeout_us) \ + fc_cond_timedwait(lcp, timeout_us, FC_TIME_UNIT_USECOND) + + int create_work_threads(int *count, void *(*start_func)(void *), void **args, pthread_t *tids, const int stack_size);