add function common_blocked_queue_timedpop

storage_pool
YuQing 2021-01-29 17:39:12 +08:00
parent 41a4ca78c7
commit 35aa996333
4 changed files with 98 additions and 1 deletions

View File

@ -1,7 +1,8 @@
Version 1.48 2021-01-27 Version 1.48 2021-01-29
* fast_buffer.[hc]: add function fast_buffer_append_binary * fast_buffer.[hc]: add function fast_buffer_append_binary
* fc_check_mkdir_ex return 0 when mkdir with errno EEXIST * fc_check_mkdir_ex return 0 when mkdir with errno EEXIST
* add function common_blocked_queue_timedpop
Version 1.47 2021-01-20 Version 1.47 2021-01-20
* fc_atomic.h: add FC_ATOMIC_GET and FC_ATOMIC_SET etc. * fc_atomic.h: add FC_ATOMIC_GET and FC_ATOMIC_SET etc.

View File

@ -185,6 +185,85 @@ void *common_blocked_queue_pop_ex(struct common_blocked_queue *queue,
return data; return data;
} }
void *common_blocked_queue_timedpop(struct common_blocked_queue *queue,
const int timeout, const int time_unit)
{
struct common_blocked_node *node;
void *data;
struct timespec ts;
int seconds;
int result;
if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0)
{
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_lock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
return NULL;
}
do {
node = queue->head;
if (node == NULL)
{
switch (time_unit) {
case FC_TIME_UNIT_SECOND:
seconds = timeout;
ts.tv_nsec = 0;
break;
case FC_TIME_UNIT_MSECOND:
seconds = timeout / 1000;
ts.tv_nsec = (timeout % 1000) * (1000 * 1000);
break;
case FC_TIME_UNIT_USECOND:
seconds = timeout / (1000 * 1000);
ts.tv_nsec = (timeout % (1000 * 1000)) * 1000;
break;
case FC_TIME_UNIT_NSECOND:
seconds = timeout / (1000 * 1000 * 1000);
ts.tv_nsec = timeout % (1000 * 1000 * 1000);
break;
default:
seconds = timeout;
ts.tv_nsec = 0;
break;
}
ts.tv_sec = get_current_time() + seconds;
pthread_cond_timedwait(&queue->lc_pair.cond,
&queue->lc_pair.lock, &ts);
node = queue->head;
}
if (node != NULL)
{
queue->head = node->next;
if (queue->head == NULL)
{
queue->tail = NULL;
}
data = node->data;
fast_mblock_free_object(&queue->mblock, node);
}
else
{
data = NULL;
}
} while (0);
if ((result=pthread_mutex_unlock(&(queue->lc_pair.lock))) != 0)
{
logError("file: "__FILE__", line: %d, "
"call pthread_mutex_unlock fail, "
"errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
}
return data;
}
struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex( struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex(
struct common_blocked_queue *queue, const bool blocked) struct common_blocked_queue *queue, const bool blocked)
{ {

View File

@ -116,6 +116,18 @@ struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex(
void common_blocked_queue_free_all_nodes(struct common_blocked_queue *queue, void common_blocked_queue_free_all_nodes(struct common_blocked_queue *queue,
struct common_blocked_node *node); struct common_blocked_node *node);
void *common_blocked_queue_timedpop(struct common_blocked_queue *queue,
const int timeout, const int time_unit);
#define common_blocked_queue_timedpop_sec(queue, timeout) \
common_blocked_queue_timedpop(queue, timeout, FC_TIME_UNIT_SECOND)
#define common_blocked_queue_timedpop_ms(queue, timeout) \
common_blocked_queue_timedpop(queue, timeout, FC_TIME_UNIT_MSECOND)
#define common_blocked_queue_timedpop_us(queue, timeout) \
common_blocked_queue_timedpop(queue, timeout, FC_TIME_UNIT_USECOND)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -134,6 +134,11 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);
#define FC_MIN(v1, v2) (v1 < v2 ? v1 : v2) #define FC_MIN(v1, v2) (v1 < v2 ? v1 : v2)
#define FC_MAX(v1, v2) (v1 > v2 ? v1 : v2) #define FC_MAX(v1, v2) (v1 > v2 ? v1 : v2)
#define FC_TIME_UNIT_SECOND 's' //second
#define FC_TIME_UNIT_MSECOND 'm' //millisecond
#define FC_TIME_UNIT_USECOND 'u' //microsecond
#define FC_TIME_UNIT_NSECOND 'n' //nanosecond
#define STRERROR(no) (strerror(no) != NULL ? strerror(no) : "Unkown error") #define STRERROR(no) (strerror(no) != NULL ? strerror(no) : "Unkown error")
#if defined(OS_LINUX) #if defined(OS_LINUX)