diff --git a/HISTORY b/HISTORY index 14f8873..ecf51f3 100644 --- a/HISTORY +++ b/HISTORY @@ -1,7 +1,8 @@ -Version 1.48 2021-01-27 +Version 1.48 2021-01-29 * fast_buffer.[hc]: add function fast_buffer_append_binary * fc_check_mkdir_ex return 0 when mkdir with errno EEXIST + * add function common_blocked_queue_timedpop Version 1.47 2021-01-20 * fc_atomic.h: add FC_ATOMIC_GET and FC_ATOMIC_SET etc. diff --git a/src/common_blocked_queue.c b/src/common_blocked_queue.c index e69e024..9fe589d 100644 --- a/src/common_blocked_queue.c +++ b/src/common_blocked_queue.c @@ -185,6 +185,85 @@ void *common_blocked_queue_pop_ex(struct common_blocked_queue *queue, return data; } +void *common_blocked_queue_timedpop(struct common_blocked_queue *queue, + const int timeout, const int time_unit) +{ + struct common_blocked_node *node; + void *data; + struct timespec ts; + int seconds; + int result; + + if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_lock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return NULL; + } + + do { + node = queue->head; + if (node == NULL) + { + switch (time_unit) { + case FC_TIME_UNIT_SECOND: + seconds = timeout; + ts.tv_nsec = 0; + break; + case FC_TIME_UNIT_MSECOND: + seconds = timeout / 1000; + ts.tv_nsec = (timeout % 1000) * (1000 * 1000); + break; + case FC_TIME_UNIT_USECOND: + seconds = timeout / (1000 * 1000); + ts.tv_nsec = (timeout % (1000 * 1000)) * 1000; + break; + case FC_TIME_UNIT_NSECOND: + seconds = timeout / (1000 * 1000 * 1000); + ts.tv_nsec = timeout % (1000 * 1000 * 1000); + break; + default: + seconds = timeout; + ts.tv_nsec = 0; + break; + } + + ts.tv_sec = get_current_time() + seconds; + pthread_cond_timedwait(&queue->lc_pair.cond, + &queue->lc_pair.lock, &ts); + node = queue->head; + } + + if (node != NULL) + { + queue->head = node->next; + if (queue->head == NULL) + { + queue->tail = NULL; + } + + data = node->data; + fast_mblock_free_object(&queue->mblock, node); + } + else + { + data = NULL; + } + } while (0); + + if ((result=pthread_mutex_unlock(&(queue->lc_pair.lock))) != 0) + { + logError("file: "__FILE__", line: %d, " + "call pthread_mutex_unlock fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + } + + return data; +} + struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex( struct common_blocked_queue *queue, const bool blocked) { diff --git a/src/common_blocked_queue.h b/src/common_blocked_queue.h index 667710f..e938816 100644 --- a/src/common_blocked_queue.h +++ b/src/common_blocked_queue.h @@ -116,6 +116,18 @@ struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex( void common_blocked_queue_free_all_nodes(struct common_blocked_queue *queue, struct common_blocked_node *node); +void *common_blocked_queue_timedpop(struct common_blocked_queue *queue, + const int timeout, const int time_unit); + +#define common_blocked_queue_timedpop_sec(queue, timeout) \ + common_blocked_queue_timedpop(queue, timeout, FC_TIME_UNIT_SECOND) + +#define common_blocked_queue_timedpop_ms(queue, timeout) \ + common_blocked_queue_timedpop(queue, timeout, FC_TIME_UNIT_MSECOND) + +#define common_blocked_queue_timedpop_us(queue, timeout) \ + common_blocked_queue_timedpop(queue, timeout, FC_TIME_UNIT_USECOND) + #ifdef __cplusplus } #endif diff --git a/src/common_define.h b/src/common_define.h index 0e98cfe..b667c80 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -134,6 +134,11 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind); #define FC_MIN(v1, v2) (v1 < v2 ? v1 : v2) #define FC_MAX(v1, v2) (v1 > v2 ? v1 : v2) +#define FC_TIME_UNIT_SECOND 's' //second +#define FC_TIME_UNIT_MSECOND 'm' //millisecond +#define FC_TIME_UNIT_USECOND 'u' //microsecond +#define FC_TIME_UNIT_NSECOND 'n' //nanosecond + #define STRERROR(no) (strerror(no) != NULL ? strerror(no) : "Unkown error") #if defined(OS_LINUX)