fast_timer.[hc]: support lock for option

pull/37/head
YuQing 2020-11-23 11:45:33 +08:00
parent 58e1aea32b
commit f5028fcbe0
5 changed files with 297 additions and 162 deletions

View File

@ -1,5 +1,5 @@
Version 1.44 2020-11-03 Version 1.44 2020-11-23
* add test file src/tests/test_pthread_lock.c * add test file src/tests/test_pthread_lock.c
* add uniq_skiplist.[hc] * add uniq_skiplist.[hc]
* add function split_string_ex * add function split_string_ex
@ -37,6 +37,7 @@ Version 1.44 2020-11-03
* shared_func.[hc]: add function fc_path_contains * shared_func.[hc]: add function fc_path_contains
* fast_mblock.[hc]: support alloc elements limit * fast_mblock.[hc]: support alloc elements limit
* sockopt.[hc]: add function asyncconnectserverbyip * sockopt.[hc]: add function asyncconnectserverbyip
* fast_timer.[hc]: support lock for option
Version 1.43 2019-12-25 Version 1.43 2019-12-25
* replace function call system to getExecResult, * replace function call system to getExecResult,

View File

@ -45,9 +45,9 @@ struct fast_task_info;
typedef struct ioevent_entry typedef struct ioevent_entry
{ {
int fd; FastTimerEntry timer; //must first
FastTimerEntry timer; int fd;
IOEventCallback callback; IOEventCallback callback;
} IOEventEntry; } IOEventEntry;
struct nio_thread_data struct nio_thread_data
@ -70,6 +70,12 @@ struct nio_thread_data
} notify; //for thread notify } notify; //for thread notify
}; };
struct ioevent_notify_entry
{
IOEventEntry event; //must first
struct nio_thread_data *thread_data;
};
struct fast_task_info struct fast_task_info
{ {
IOEventEntry event; //must first IOEventEntry event; //must first

View File

@ -20,174 +20,303 @@
#include <errno.h> #include <errno.h>
#include "logger.h" #include "logger.h"
#include "fc_memory.h" #include "fc_memory.h"
#include "pthread_func.h"
#include "fast_timer.h" #include "fast_timer.h"
int fast_timer_init(FastTimer *timer, const int slot_count, static int fast_timer_init_locks(FastTimer *timer)
const int64_t current_time)
{ {
int bytes; int result;
if (slot_count <= 0 || current_time <= 0) { FastTimerSlot *slot;
return EINVAL; FastTimerSlot *end;
}
timer->slot_count = slot_count; end = timer->slots + timer->slot_count;
timer->base_time = current_time; //base time for slot 0 for (slot=timer->slots; slot<end; slot++) {
timer->current_time = current_time; if ((result=init_pthread_lock(&slot->lock)) != 0) {
bytes = sizeof(FastTimerSlot) * slot_count; return result;
timer->slots = (FastTimerSlot *)fc_malloc(bytes); }
if (timer->slots == NULL) { }
return ENOMEM;
} return 0;
memset(timer->slots, 0, bytes); }
return 0;
int fast_timer_init_ex(FastTimer *timer, const int slot_count,
const int64_t current_time, const bool need_lock)
{
int result;
int bytes;
if (slot_count <= 0 || current_time <= 0) {
return EINVAL;
}
timer->need_lock = need_lock;
timer->slot_count = slot_count;
timer->base_time = current_time; //base time for slot 0
timer->current_time = current_time;
bytes = sizeof(FastTimerSlot) * slot_count;
timer->slots = (FastTimerSlot *)fc_malloc(bytes);
if (timer->slots == NULL) {
return ENOMEM;
}
memset(timer->slots, 0, bytes);
if (need_lock) {
if ((result=fast_timer_init_locks(timer)) != 0) {
return result;
}
}
return 0;
} }
void fast_timer_destroy(FastTimer *timer) void fast_timer_destroy(FastTimer *timer)
{ {
if (timer->slots != NULL) { if (timer->slots != NULL) {
free(timer->slots); if (timer->need_lock) {
timer->slots = NULL; FastTimerSlot *slot;
} FastTimerSlot *end;
end = timer->slots + timer->slot_count;
for (slot=timer->slots; slot<end; slot++) {
pthread_mutex_destroy(&slot->lock);
}
}
free(timer->slots);
timer->slots = NULL;
}
} }
#define TIMER_CHECK_LOCK(timer, slot) \
do { \
if (timer->need_lock) { \
PTHREAD_MUTEX_LOCK(&(slot)->lock); \
} \
} while (0)
#define TIMER_CHECK_UNLOCK(timer, slot) \
do { \
if (timer->need_lock) { \
PTHREAD_MUTEX_UNLOCK(&(slot)->lock); \
} \
} while (0)
#define TIMER_CHECK_LOCK_AND_SET_SLOT(timer, slot, entry) \
do { \
if (timer->need_lock) { \
PTHREAD_MUTEX_LOCK(&(slot)->lock); \
entry->slot_index = slot - timer->slots; \
} \
} while (0)
#define TIMER_CHECK_LOCK_BY_ENTRY(timer, entry) \
do { \
if (timer->need_lock && entry->slot_index >= 0) { \
PTHREAD_MUTEX_LOCK(&(timer->slots + entry->slot_index)->lock); \
} \
} while (0)
#define TIMER_CHECK_UNLOCK_AND_REMOVE_BY_ENTRY(timer, entry) \
do { \
if (timer->need_lock && entry->slot_index >= 0) { \
PTHREAD_MUTEX_UNLOCK(&(timer->slots + entry->slot_index)->lock); \
entry->slot_index = -1; \
} \
} while (0)
#define TIMER_CHECK_LOCK_BY_SINDEX(timer, slot_index) \
do { \
if (timer->need_lock) { \
PTHREAD_MUTEX_LOCK(&(timer->slots + slot_index)->lock); \
} \
} while (0)
#define TIMER_CHECK_UNLOCK_BY_SINDEX(timer, slot_index) \
do { \
if (timer->need_lock) { \
PTHREAD_MUTEX_UNLOCK(&(timer->slots + slot_index)->lock); \
} \
} while (0)
#define TIMER_GET_SLOT_INDEX(timer, expires) \ #define TIMER_GET_SLOT_INDEX(timer, expires) \
(((expires) - timer->base_time) % timer->slot_count) (((expires) - timer->base_time) % timer->slot_count)
#define TIMER_GET_SLOT_POINTER(timer, expires) \ #define TIMER_GET_SLOT_POINTER(timer, expires) \
(timer->slots + TIMER_GET_SLOT_INDEX(timer, expires)) (timer->slots + TIMER_GET_SLOT_INDEX(timer, expires))
int fast_timer_add(FastTimer *timer, FastTimerEntry *entry) static inline void add_entry(FastTimer *timer, FastTimerSlot *slot,
FastTimerEntry *entry, const int64_t expires, const bool set_expires)
{ {
FastTimerSlot *slot; TIMER_CHECK_LOCK_AND_SET_SLOT(timer, slot, entry);
if (set_expires) {
slot = TIMER_GET_SLOT_POINTER(timer, entry->expires > entry->expires = expires;
timer->current_time ? entry->expires : timer->current_time); }
entry->next = slot->head.next; entry->next = slot->head.next;
if (slot->head.next != NULL) { if (slot->head.next != NULL) {
slot->head.next->prev = entry; slot->head.next->prev = entry;
} }
entry->prev = &slot->head; entry->prev = &slot->head;
slot->head.next = entry; slot->head.next = entry;
entry->rehash = false; entry->rehash = false;
return 0; TIMER_CHECK_UNLOCK(timer, slot);
} }
int fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, void fast_timer_add_ex(FastTimer *timer, FastTimerEntry *entry,
const int64_t expires, const bool set_expires)
{
FastTimerSlot *slot;
int64_t new_expires;
bool new_set_expires;
if (expires > timer->current_time) {
new_expires = expires;
new_set_expires = set_expires;
} else {
new_expires = timer->current_time;
new_set_expires = true;
}
slot = TIMER_GET_SLOT_POINTER(timer, new_expires);
add_entry(timer, slot, entry, new_expires, new_set_expires);
}
void fast_timer_modify(FastTimer *timer, FastTimerEntry *entry,
const int64_t new_expires) const int64_t new_expires)
{ {
if (new_expires == entry->expires) { int slot_index;
return 0;
}
if (new_expires < entry->expires) { if (new_expires > entry->expires) {
fast_timer_remove(timer, entry); if (timer->need_lock && entry->slot_index >= 0) {
entry->expires = new_expires; slot_index = entry->slot_index;
return fast_timer_add(timer, entry); if (slot_index < 0) {
} slot_index = TIMER_GET_SLOT_INDEX(timer, entry->expires);
}
} else {
slot_index = TIMER_GET_SLOT_INDEX(timer, entry->expires);
}
entry->rehash = TIMER_GET_SLOT_INDEX(timer, new_expires) != TIMER_CHECK_LOCK_BY_SINDEX(timer, slot_index);
TIMER_GET_SLOT_INDEX(timer, entry->expires); entry->rehash = TIMER_GET_SLOT_INDEX(timer, new_expires) != slot_index;
entry->expires = new_expires; //lazy move entry->expires = new_expires; //lazy move
return 0; TIMER_CHECK_UNLOCK_BY_SINDEX(timer, slot_index);
} else if (new_expires < entry->expires) {
fast_timer_remove(timer, entry);
fast_timer_add_ex(timer, entry, new_expires, true);
}
}
static inline void remove_entry(FastTimerEntry *entry)
{
if (entry->next != NULL) {
entry->next->prev = entry->prev;
entry->prev->next = entry->next;
entry->next = NULL;
}
else {
entry->prev->next = NULL;
}
entry->prev = NULL;
} }
int fast_timer_remove(FastTimer *timer, FastTimerEntry *entry) int fast_timer_remove(FastTimer *timer, FastTimerEntry *entry)
{ {
if (entry->prev == NULL) { int result;
return ENOENT; //already removed
}
if (entry->next != NULL) { TIMER_CHECK_LOCK_BY_ENTRY(timer, entry);
entry->next->prev = entry->prev; if (entry->prev == NULL) {
entry->prev->next = entry->next; result = ENOENT; //already removed
entry->next = NULL; } else {
} remove_entry(entry);
else { result = 0;
entry->prev->next = NULL; }
} TIMER_CHECK_UNLOCK_AND_REMOVE_BY_ENTRY(timer, entry);
return result;
entry->prev = NULL;
return 0;
} }
FastTimerSlot *fast_timer_slot_get(FastTimer *timer, const int64_t current_time) FastTimerSlot *fast_timer_slot_get(FastTimer *timer, const int64_t current_time)
{ {
if (timer->current_time >= current_time) { if (timer->current_time >= current_time) {
return NULL; return NULL;
} }
return TIMER_GET_SLOT_POINTER(timer, timer->current_time++); return TIMER_GET_SLOT_POINTER(timer, timer->current_time++);
} }
int fast_timer_timeouts_get(FastTimer *timer, const int64_t current_time, int fast_timer_timeouts_get(FastTimer *timer, const int64_t current_time,
FastTimerEntry *head) FastTimerEntry *head)
{ {
FastTimerSlot *slot; FastTimerSlot *slot;
FastTimerEntry *entry; FastTimerSlot *new_slot;
FastTimerEntry *first; FastTimerEntry *entry;
FastTimerEntry *last; FastTimerEntry *first;
FastTimerEntry *tail; FastTimerEntry *last;
int count; FastTimerEntry *tail;
int count;
head->prev = NULL; head->prev = NULL;
head->next = NULL; head->next = NULL;
if (timer->current_time >= current_time) { if (timer->current_time >= current_time) {
return 0; return 0;
} }
first = NULL; first = NULL;
last = NULL; last = NULL;
tail = head; tail = head;
count = 0; count = 0;
while (timer->current_time < current_time) { while (timer->current_time < current_time) {
slot = TIMER_GET_SLOT_POINTER(timer, timer->current_time++); slot = TIMER_GET_SLOT_POINTER(timer, timer->current_time++);
entry = slot->head.next; TIMER_CHECK_LOCK(timer, slot);
while (entry != NULL) { entry = slot->head.next;
if (entry->expires >= current_time) { //not expired while (entry != NULL) {
if (first != NULL) { if (entry->expires >= current_time) { //not expired
first->prev->next = entry; if (first != NULL) {
entry->prev = first->prev; first->prev->next = entry;
entry->prev = first->prev;
tail->next = first;
first->prev = tail;
tail = last;
first = NULL;
}
if (entry->rehash) {
last = entry;
entry = entry->next;
new_slot = TIMER_GET_SLOT_POINTER(timer, last->expires);
if (new_slot != slot) { //check to avoid deadlock
remove_entry(last);
add_entry(timer, new_slot, last,
last->expires, false);
} else {
last->rehash = false;
}
continue;
}
} else { //expired
count++;
if (first == NULL) {
first = entry;
}
}
last = entry;
entry = entry->next;
}
if (first != NULL) {
first->prev->next = NULL;
tail->next = first; tail->next = first;
first->prev = tail; first->prev = tail;
tail = last; tail = last;
first = NULL; first = NULL;
} }
if (entry->rehash) { TIMER_CHECK_UNLOCK(timer, slot);
last = entry;
entry = entry->next;
last->rehash = false;
fast_timer_remove(timer, last);
fast_timer_add(timer, last);
continue;
}
}
else { //expired
count++;
if (first == NULL) {
first = entry;
}
}
last = entry;
entry = entry->next;
} }
if (first != NULL) { if (count > 0) {
first->prev->next = NULL; tail->next = NULL;
tail->next = first;
first->prev = tail;
tail = last;
first = NULL;
} }
}
if (count > 0) { return count;
tail->next = NULL;
}
return count;
} }

View File

@ -17,38 +17,49 @@
#define __FAST_TIMER_H__ #define __FAST_TIMER_H__
#include <stdint.h> #include <stdint.h>
#include <pthread.h>
#include "common_define.h" #include "common_define.h"
struct fast_timer_slot;
typedef struct fast_timer_entry { typedef struct fast_timer_entry {
int64_t expires; int64_t expires;
void *data; struct fast_timer_entry *prev;
struct fast_timer_entry *prev; struct fast_timer_entry *next;
struct fast_timer_entry *next; int slot_index;
bool rehash; bool rehash;
} FastTimerEntry; } FastTimerEntry;
typedef struct fast_timer_slot { typedef struct fast_timer_slot {
struct fast_timer_entry head; struct fast_timer_entry head;
pthread_mutex_t lock;
} FastTimerSlot; } FastTimerSlot;
typedef struct fast_timer { typedef struct fast_timer {
int slot_count; //time wheel slot count bool need_lock;
int64_t base_time; //base time for slot 0 int slot_count; //time wheel slot count
int64_t current_time; int64_t base_time; //base time for slot 0
FastTimerSlot *slots; int64_t current_time;
FastTimerSlot *slots;
} FastTimer; } FastTimer;
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
int fast_timer_init(FastTimer *timer, const int slot_count, #define fast_timer_init(timer, slot_count, current_time) \
const int64_t current_time); fast_timer_init_ex(timer, slot_count, current_time, false)
#define fast_timer_add(timer, entry) \
fast_timer_add_ex(timer, entry, (entry)->expires, false)
int fast_timer_init_ex(FastTimer *timer, const int slot_count,
const int64_t current_time, const bool need_lock);
void fast_timer_destroy(FastTimer *timer); void fast_timer_destroy(FastTimer *timer);
int fast_timer_add(FastTimer *timer, FastTimerEntry *entry); void fast_timer_add_ex(FastTimer *timer, FastTimerEntry *entry,
const int64_t expires, const bool set_expires);
int fast_timer_remove(FastTimer *timer, FastTimerEntry *entry); int fast_timer_remove(FastTimer *timer, FastTimerEntry *entry);
int fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, void fast_timer_modify(FastTimer *timer, FastTimerEntry *entry,
const int64_t new_expires); const int64_t new_expires);
FastTimerSlot *fast_timer_slot_get(FastTimer *timer, const int64_t current_time); FastTimerSlot *fast_timer_slot_get(FastTimer *timer, const int64_t current_time);

View File

@ -29,7 +29,7 @@ static void deal_ioevents(IOEventPoller *ioevent)
pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent,
ioevent->iterator.index); ioevent->iterator.index);
if (pEntry != NULL) { if (pEntry != NULL) {
pEntry->callback(pEntry->fd, event, pEntry->timer.data); pEntry->callback(pEntry->fd, event, pEntry);
} }
else { else {
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
@ -51,7 +51,7 @@ int ioevent_remove(IOEventPoller *ioevent, void *data)
pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent,
ioevent->iterator.index); ioevent->iterator.index);
if (pEntry != NULL && pEntry->timer.data == data) { if (pEntry != NULL && (void *)pEntry == data) {
return 0; //do NOT clear current entry return 0; //do NOT clear current entry
} }
@ -59,7 +59,7 @@ int ioevent_remove(IOEventPoller *ioevent, void *data)
index++) index++)
{ {
pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index); pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index);
if (pEntry != NULL && pEntry->timer.data == data) { if (pEntry != NULL && (void *)pEntry == data) {
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
"clear ioevent data: %p", __LINE__, data); "clear ioevent data: %p", __LINE__, data);
IOEVENT_CLEAR_DATA(ioevent, index); IOEVENT_CLEAR_DATA(ioevent, index);
@ -83,11 +83,10 @@ static void deal_timeouts(FastTimerEntry *head)
entry = entry->next; entry = entry->next;
current->prev = current->next = NULL; //must set NULL because NOT in time wheel current->prev = current->next = NULL; //must set NULL because NOT in time wheel
pEventEntry = (IOEventEntry *)current->data; pEventEntry = (IOEventEntry *)current;
if (pEventEntry != NULL) if (pEventEntry != NULL)
{ {
pEventEntry->callback(pEventEntry->fd, IOEVENT_TIMEOUT, pEventEntry->callback(pEventEntry->fd, IOEVENT_TIMEOUT, current);
current->data);
} }
} }
} }
@ -97,16 +96,16 @@ int ioevent_loop(struct nio_thread_data *pThreadData,
clean_up_callback, volatile bool *continue_flag) clean_up_callback, volatile bool *continue_flag)
{ {
int result; int result;
IOEventEntry ev_notify; struct ioevent_notify_entry ev_notify;
FastTimerEntry head; FastTimerEntry head;
struct fast_task_info *task; struct fast_task_info *task;
time_t last_check_time; time_t last_check_time;
int count; int count;
memset(&ev_notify, 0, sizeof(ev_notify)); memset(&ev_notify, 0, sizeof(ev_notify));
ev_notify.fd = FC_NOTIFY_READ_FD(pThreadData); ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData);
ev_notify.callback = recv_notify_callback; ev_notify.event.callback = recv_notify_callback;
ev_notify.timer.data = pThreadData; ev_notify.thread_data = pThreadData;
if (ioevent_attach(&pThreadData->ev_puller, if (ioevent_attach(&pThreadData->ev_puller,
pThreadData->pipe_fds[0], IOEVENT_READ, pThreadData->pipe_fds[0], IOEVENT_READ,
&ev_notify) != 0) &ev_notify) != 0)
@ -210,18 +209,7 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
return result; return result;
} }
task->event.timer.data = task;
task->event.timer.expires = g_current_time + timeout; task->event.timer.expires = g_current_time + timeout;
result = fast_timer_add(&pThread->timer, &task->event.timer); fast_timer_add(&pThread->timer, &task->event.timer);
if (result != 0)
{
logError("file: "__FILE__", line: %d, " \
"fast_timer_add fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
return 0; return 0;
} }