add function fc_queue_timedpop

storage_pool
YuQing 2021-02-01 10:52:47 +08:00
parent 12aab5f94c
commit 02701c3781
5 changed files with 94 additions and 29 deletions

View File

@ -1,8 +1,9 @@
Version 1.48 2021-01-29
Version 1.48 2021-02-01
* fast_buffer.[hc]: add function fast_buffer_append_binary
* fc_check_mkdir_ex return 0 when mkdir with errno EEXIST
* add function common_blocked_queue_timedpop
* add function fc_queue_timedpop
Version 1.47 2021-01-20
* fc_atomic.h: add FC_ATOMIC_GET and FC_ATOMIC_SET etc.

View File

@ -190,8 +190,6 @@ void *common_blocked_queue_timedpop(struct common_blocked_queue *queue,
{
struct common_blocked_node *node;
void *data;
struct timespec ts;
int seconds;
int result;
if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0)
@ -207,32 +205,7 @@ void *common_blocked_queue_timedpop(struct common_blocked_queue *queue,
node = queue->head;
if (node == NULL)
{
switch (time_unit) {
case FC_TIME_UNIT_SECOND:
seconds = timeout;
ts.tv_nsec = 0;
break;
case FC_TIME_UNIT_MSECOND:
seconds = timeout / 1000;
ts.tv_nsec = (timeout % 1000) * (1000 * 1000);
break;
case FC_TIME_UNIT_USECOND:
seconds = timeout / (1000 * 1000);
ts.tv_nsec = (timeout % (1000 * 1000)) * 1000;
break;
case FC_TIME_UNIT_NSECOND:
seconds = timeout / (1000 * 1000 * 1000);
ts.tv_nsec = timeout % (1000 * 1000 * 1000);
break;
default:
seconds = timeout;
ts.tv_nsec = 0;
break;
}
ts.tv_sec = get_current_time() + seconds;
pthread_cond_timedwait(&queue->lc_pair.cond,
&queue->lc_pair.lock, &ts);
fc_cond_timedwait(&queue->lc_pair, timeout, time_unit);
node = queue->head;
}

View File

@ -148,3 +148,28 @@ void fc_queue_pop_to_queue(struct fc_queue *queue,
}
PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock);
}
void *fc_queue_timedpop(struct fc_queue *queue,
const int timeout, const int time_unit)
{
void *data;
PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock);
do {
data = queue->head;
if (data == NULL) {
fc_cond_timedwait(&queue->lc_pair, timeout, time_unit);
data = queue->head;
}
if (data != NULL) {
queue->head = FC_QUEUE_NEXT_PTR(queue, data);
if (queue->head == NULL) {
queue->tail = NULL;
}
}
} while (0);
PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock);
return data;
}

View File

@ -114,6 +114,18 @@ static inline bool fc_queue_empty(struct fc_queue *queue)
return empty;
}
void *fc_queue_timedpop(struct fc_queue *queue,
const int timeout, const int time_unit);
#define fc_queue_timedpop_sec(queue, timeout) \
fc_queue_timedpop(queue, timeout, FC_TIME_UNIT_SECOND)
#define fc_queue_timedpop_ms(queue, timeout_ms) \
fc_queue_timedpop(queue, timeout_ms, FC_TIME_UNIT_MSECOND)
#define fc_queue_timedpop_us(queue, timeout_us) \
fc_queue_timedpop(queue, timeout_us, FC_TIME_UNIT_USECOND)
#ifdef __cplusplus
}
#endif

View File

@ -95,6 +95,60 @@ static inline void fc_timedwait_ms(pthread_mutex_t *lock,
PTHREAD_MUTEX_UNLOCK(lock);
}
static inline int fc_timeout_to_timespec(const int timeout,
const int time_unit, struct timespec *ts)
{
int seconds;
switch (time_unit) {
case FC_TIME_UNIT_SECOND:
seconds = timeout;
ts->tv_nsec = 0;
break;
case FC_TIME_UNIT_MSECOND:
seconds = timeout / 1000;
ts->tv_nsec = (timeout % 1000) * (1000 * 1000);
break;
case FC_TIME_UNIT_USECOND:
seconds = timeout / (1000 * 1000);
ts->tv_nsec = (timeout % (1000 * 1000)) * 1000;
break;
case FC_TIME_UNIT_NSECOND:
seconds = timeout / (1000 * 1000 * 1000);
ts->tv_nsec = timeout % (1000 * 1000 * 1000);
break;
default:
logError("file: "__FILE__", line: %d, "
"invalid time unit: %d", __LINE__, time_unit);
return EINVAL;
}
ts->tv_sec = get_current_time() + seconds;
return 0;
}
static inline int fc_cond_timedwait(pthread_lock_cond_pair_t *lcp,
const int timeout, const int time_unit)
{
struct timespec ts;
int result;
if ((result=fc_timeout_to_timespec(timeout, time_unit, &ts)) != 0) {
return result;
}
return pthread_cond_timedwait(&lcp->cond, &lcp->lock, &ts);
}
#define fc_cond_timedwait_sec(lcp, timeout) \
fc_cond_timedwait(lcp, timeout, FC_TIME_UNIT_SECOND)
#define fc_cond_timedwait_ms(lcp, timeout_ms) \
fc_cond_timedwait(lcp, timeout_ms, FC_TIME_UNIT_MSECOND)
#define fc_cond_timedwait_us(lcp, timeout_us) \
fc_cond_timedwait(lcp, timeout_us, FC_TIME_UNIT_USECOND)
int create_work_threads(int *count, void *(*start_func)(void *),
void **args, pthread_t *tids, const int stack_size);