From f5028fcbe0080aead532dedd842dcac21bc43e41 Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Mon, 23 Nov 2020 11:45:33 +0800 Subject: [PATCH] fast_timer.[hc]: support lock for option --- HISTORY | 3 +- src/fast_task_queue.h | 12 +- src/fast_timer.c | 373 ++++++++++++++++++++++++++++-------------- src/fast_timer.h | 39 +++-- src/ioevent_loop.c | 32 ++-- 5 files changed, 297 insertions(+), 162 deletions(-) diff --git a/HISTORY b/HISTORY index 35111eb..e0b67e1 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-11-03 +Version 1.44 2020-11-23 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -37,6 +37,7 @@ Version 1.44 2020-11-03 * shared_func.[hc]: add function fc_path_contains * fast_mblock.[hc]: support alloc elements limit * sockopt.[hc]: add function asyncconnectserverbyip + * fast_timer.[hc]: support lock for option Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 5e2d4c3..aaad817 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -45,9 +45,9 @@ struct fast_task_info; typedef struct ioevent_entry { - int fd; - FastTimerEntry timer; - IOEventCallback callback; + FastTimerEntry timer; //must first + int fd; + IOEventCallback callback; } IOEventEntry; struct nio_thread_data @@ -70,6 +70,12 @@ struct nio_thread_data } notify; //for thread notify }; +struct ioevent_notify_entry +{ + IOEventEntry event; //must first + struct nio_thread_data *thread_data; +}; + struct fast_task_info { IOEventEntry event; //must first diff --git a/src/fast_timer.c b/src/fast_timer.c index deb0ed5..f325780 100644 --- a/src/fast_timer.c +++ b/src/fast_timer.c @@ -20,174 +20,303 @@ #include #include "logger.h" #include "fc_memory.h" +#include "pthread_func.h" #include "fast_timer.h" -int fast_timer_init(FastTimer *timer, const int slot_count, - const int64_t current_time) +static int fast_timer_init_locks(FastTimer *timer) { - int bytes; - if (slot_count <= 0 || current_time <= 0) { - return EINVAL; - } + int result; + FastTimerSlot *slot; + FastTimerSlot *end; - timer->slot_count = slot_count; - timer->base_time = current_time; //base time for slot 0 - timer->current_time = current_time; - bytes = sizeof(FastTimerSlot) * slot_count; - timer->slots = (FastTimerSlot *)fc_malloc(bytes); - if (timer->slots == NULL) { - return ENOMEM; - } - memset(timer->slots, 0, bytes); - return 0; + end = timer->slots + timer->slot_count; + for (slot=timer->slots; slotlock)) != 0) { + return result; + } + } + + return 0; +} + +int fast_timer_init_ex(FastTimer *timer, const int slot_count, + const int64_t current_time, const bool need_lock) +{ + int result; + int bytes; + + if (slot_count <= 0 || current_time <= 0) { + return EINVAL; + } + + timer->need_lock = need_lock; + timer->slot_count = slot_count; + timer->base_time = current_time; //base time for slot 0 + timer->current_time = current_time; + bytes = sizeof(FastTimerSlot) * slot_count; + timer->slots = (FastTimerSlot *)fc_malloc(bytes); + if (timer->slots == NULL) { + return ENOMEM; + } + memset(timer->slots, 0, bytes); + + if (need_lock) { + if ((result=fast_timer_init_locks(timer)) != 0) { + return result; + } + } + return 0; } void fast_timer_destroy(FastTimer *timer) { - if (timer->slots != NULL) { - free(timer->slots); - timer->slots = NULL; - } + if (timer->slots != NULL) { + if (timer->need_lock) { + FastTimerSlot *slot; + FastTimerSlot *end; + end = timer->slots + timer->slot_count; + for (slot=timer->slots; slotlock); + } + } + + free(timer->slots); + timer->slots = NULL; + } } +#define TIMER_CHECK_LOCK(timer, slot) \ + do { \ + if (timer->need_lock) { \ + PTHREAD_MUTEX_LOCK(&(slot)->lock); \ + } \ + } while (0) + +#define TIMER_CHECK_UNLOCK(timer, slot) \ + do { \ + if (timer->need_lock) { \ + PTHREAD_MUTEX_UNLOCK(&(slot)->lock); \ + } \ + } while (0) + +#define TIMER_CHECK_LOCK_AND_SET_SLOT(timer, slot, entry) \ + do { \ + if (timer->need_lock) { \ + PTHREAD_MUTEX_LOCK(&(slot)->lock); \ + entry->slot_index = slot - timer->slots; \ + } \ + } while (0) + +#define TIMER_CHECK_LOCK_BY_ENTRY(timer, entry) \ + do { \ + if (timer->need_lock && entry->slot_index >= 0) { \ + PTHREAD_MUTEX_LOCK(&(timer->slots + entry->slot_index)->lock); \ + } \ + } while (0) + + +#define TIMER_CHECK_UNLOCK_AND_REMOVE_BY_ENTRY(timer, entry) \ + do { \ + if (timer->need_lock && entry->slot_index >= 0) { \ + PTHREAD_MUTEX_UNLOCK(&(timer->slots + entry->slot_index)->lock); \ + entry->slot_index = -1; \ + } \ + } while (0) + +#define TIMER_CHECK_LOCK_BY_SINDEX(timer, slot_index) \ + do { \ + if (timer->need_lock) { \ + PTHREAD_MUTEX_LOCK(&(timer->slots + slot_index)->lock); \ + } \ + } while (0) + +#define TIMER_CHECK_UNLOCK_BY_SINDEX(timer, slot_index) \ + do { \ + if (timer->need_lock) { \ + PTHREAD_MUTEX_UNLOCK(&(timer->slots + slot_index)->lock); \ + } \ + } while (0) + + #define TIMER_GET_SLOT_INDEX(timer, expires) \ (((expires) - timer->base_time) % timer->slot_count) #define TIMER_GET_SLOT_POINTER(timer, expires) \ (timer->slots + TIMER_GET_SLOT_INDEX(timer, expires)) -int fast_timer_add(FastTimer *timer, FastTimerEntry *entry) +static inline void add_entry(FastTimer *timer, FastTimerSlot *slot, + FastTimerEntry *entry, const int64_t expires, const bool set_expires) { - FastTimerSlot *slot; - - slot = TIMER_GET_SLOT_POINTER(timer, entry->expires > - timer->current_time ? entry->expires : timer->current_time); - entry->next = slot->head.next; - if (slot->head.next != NULL) { - slot->head.next->prev = entry; - } - entry->prev = &slot->head; - slot->head.next = entry; - entry->rehash = false; - return 0; + TIMER_CHECK_LOCK_AND_SET_SLOT(timer, slot, entry); + if (set_expires) { + entry->expires = expires; + } + entry->next = slot->head.next; + if (slot->head.next != NULL) { + slot->head.next->prev = entry; + } + entry->prev = &slot->head; + slot->head.next = entry; + entry->rehash = false; + TIMER_CHECK_UNLOCK(timer, slot); } -int fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, +void fast_timer_add_ex(FastTimer *timer, FastTimerEntry *entry, + const int64_t expires, const bool set_expires) +{ + FastTimerSlot *slot; + int64_t new_expires; + bool new_set_expires; + + if (expires > timer->current_time) { + new_expires = expires; + new_set_expires = set_expires; + } else { + new_expires = timer->current_time; + new_set_expires = true; + } + slot = TIMER_GET_SLOT_POINTER(timer, new_expires); + add_entry(timer, slot, entry, new_expires, new_set_expires); +} + +void fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, const int64_t new_expires) { - if (new_expires == entry->expires) { - return 0; - } + int slot_index; - if (new_expires < entry->expires) { - fast_timer_remove(timer, entry); - entry->expires = new_expires; - return fast_timer_add(timer, entry); - } + if (new_expires > entry->expires) { + if (timer->need_lock && entry->slot_index >= 0) { + slot_index = entry->slot_index; + if (slot_index < 0) { + slot_index = TIMER_GET_SLOT_INDEX(timer, entry->expires); + } + } else { + slot_index = TIMER_GET_SLOT_INDEX(timer, entry->expires); + } - entry->rehash = TIMER_GET_SLOT_INDEX(timer, new_expires) != - TIMER_GET_SLOT_INDEX(timer, entry->expires); - entry->expires = new_expires; //lazy move - return 0; + TIMER_CHECK_LOCK_BY_SINDEX(timer, slot_index); + entry->rehash = TIMER_GET_SLOT_INDEX(timer, new_expires) != slot_index; + entry->expires = new_expires; //lazy move + TIMER_CHECK_UNLOCK_BY_SINDEX(timer, slot_index); + } else if (new_expires < entry->expires) { + fast_timer_remove(timer, entry); + fast_timer_add_ex(timer, entry, new_expires, true); + } +} + +static inline void remove_entry(FastTimerEntry *entry) +{ + if (entry->next != NULL) { + entry->next->prev = entry->prev; + entry->prev->next = entry->next; + entry->next = NULL; + } + else { + entry->prev->next = NULL; + } + + entry->prev = NULL; } int fast_timer_remove(FastTimer *timer, FastTimerEntry *entry) { - if (entry->prev == NULL) { - return ENOENT; //already removed - } + int result; - if (entry->next != NULL) { - entry->next->prev = entry->prev; - entry->prev->next = entry->next; - entry->next = NULL; - } - else { - entry->prev->next = NULL; - } - - entry->prev = NULL; - return 0; + TIMER_CHECK_LOCK_BY_ENTRY(timer, entry); + if (entry->prev == NULL) { + result = ENOENT; //already removed + } else { + remove_entry(entry); + result = 0; + } + TIMER_CHECK_UNLOCK_AND_REMOVE_BY_ENTRY(timer, entry); + return result; } FastTimerSlot *fast_timer_slot_get(FastTimer *timer, const int64_t current_time) { - if (timer->current_time >= current_time) { - return NULL; - } + if (timer->current_time >= current_time) { + return NULL; + } - return TIMER_GET_SLOT_POINTER(timer, timer->current_time++); + return TIMER_GET_SLOT_POINTER(timer, timer->current_time++); } int fast_timer_timeouts_get(FastTimer *timer, const int64_t current_time, - FastTimerEntry *head) + FastTimerEntry *head) { - FastTimerSlot *slot; - FastTimerEntry *entry; - FastTimerEntry *first; - FastTimerEntry *last; - FastTimerEntry *tail; - int count; + FastTimerSlot *slot; + FastTimerSlot *new_slot; + FastTimerEntry *entry; + FastTimerEntry *first; + FastTimerEntry *last; + FastTimerEntry *tail; + int count; - head->prev = NULL; - head->next = NULL; - if (timer->current_time >= current_time) { - return 0; - } + head->prev = NULL; + head->next = NULL; + if (timer->current_time >= current_time) { + return 0; + } - first = NULL; - last = NULL; - tail = head; - count = 0; - while (timer->current_time < current_time) { - slot = TIMER_GET_SLOT_POINTER(timer, timer->current_time++); - entry = slot->head.next; - while (entry != NULL) { - if (entry->expires >= current_time) { //not expired - if (first != NULL) { - first->prev->next = entry; - entry->prev = first->prev; + first = NULL; + last = NULL; + tail = head; + count = 0; + while (timer->current_time < current_time) { + slot = TIMER_GET_SLOT_POINTER(timer, timer->current_time++); + TIMER_CHECK_LOCK(timer, slot); + entry = slot->head.next; + while (entry != NULL) { + if (entry->expires >= current_time) { //not expired + if (first != NULL) { + first->prev->next = entry; + entry->prev = first->prev; + + tail->next = first; + first->prev = tail; + tail = last; + first = NULL; + } + if (entry->rehash) { + last = entry; + entry = entry->next; + + new_slot = TIMER_GET_SLOT_POINTER(timer, last->expires); + if (new_slot != slot) { //check to avoid deadlock + remove_entry(last); + add_entry(timer, new_slot, last, + last->expires, false); + } else { + last->rehash = false; + } + continue; + } + } else { //expired + count++; + if (first == NULL) { + first = entry; + } + } + + last = entry; + entry = entry->next; + } + + if (first != NULL) { + first->prev->next = NULL; tail->next = first; first->prev = tail; tail = last; first = NULL; - } - if (entry->rehash) { - last = entry; - entry = entry->next; - - last->rehash = false; - fast_timer_remove(timer, last); - fast_timer_add(timer, last); - continue; - } - } - else { //expired - count++; - if (first == NULL) { - first = entry; - } - } - - last = entry; - entry = entry->next; + } + TIMER_CHECK_UNLOCK(timer, slot); } - if (first != NULL) { - first->prev->next = NULL; - - tail->next = first; - first->prev = tail; - tail = last; - first = NULL; + if (count > 0) { + tail->next = NULL; } - } - if (count > 0) { - tail->next = NULL; - } - - return count; + return count; } - diff --git a/src/fast_timer.h b/src/fast_timer.h index d6a1edf..e040f69 100644 --- a/src/fast_timer.h +++ b/src/fast_timer.h @@ -17,38 +17,49 @@ #define __FAST_TIMER_H__ #include +#include #include "common_define.h" +struct fast_timer_slot; typedef struct fast_timer_entry { - int64_t expires; - void *data; - struct fast_timer_entry *prev; - struct fast_timer_entry *next; - bool rehash; + int64_t expires; + struct fast_timer_entry *prev; + struct fast_timer_entry *next; + int slot_index; + bool rehash; } FastTimerEntry; typedef struct fast_timer_slot { - struct fast_timer_entry head; + struct fast_timer_entry head; + pthread_mutex_t lock; } FastTimerSlot; typedef struct fast_timer { - int slot_count; //time wheel slot count - int64_t base_time; //base time for slot 0 - int64_t current_time; - FastTimerSlot *slots; + bool need_lock; + int slot_count; //time wheel slot count + int64_t base_time; //base time for slot 0 + int64_t current_time; + FastTimerSlot *slots; } FastTimer; #ifdef __cplusplus extern "C" { #endif -int fast_timer_init(FastTimer *timer, const int slot_count, - const int64_t current_time); +#define fast_timer_init(timer, slot_count, current_time) \ + fast_timer_init_ex(timer, slot_count, current_time, false) + +#define fast_timer_add(timer, entry) \ + fast_timer_add_ex(timer, entry, (entry)->expires, false) + +int fast_timer_init_ex(FastTimer *timer, const int slot_count, + const int64_t current_time, const bool need_lock); void fast_timer_destroy(FastTimer *timer); -int fast_timer_add(FastTimer *timer, FastTimerEntry *entry); +void fast_timer_add_ex(FastTimer *timer, FastTimerEntry *entry, + const int64_t expires, const bool set_expires); int fast_timer_remove(FastTimer *timer, FastTimerEntry *entry); -int fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, +void fast_timer_modify(FastTimer *timer, FastTimerEntry *entry, const int64_t new_expires); FastTimerSlot *fast_timer_slot_get(FastTimer *timer, const int64_t current_time); diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index 82b2177..ce0ee19 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -29,7 +29,7 @@ static void deal_ioevents(IOEventPoller *ioevent) pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, ioevent->iterator.index); if (pEntry != NULL) { - pEntry->callback(pEntry->fd, event, pEntry->timer.data); + pEntry->callback(pEntry->fd, event, pEntry); } else { logDebug("file: "__FILE__", line: %d, " @@ -51,7 +51,7 @@ int ioevent_remove(IOEventPoller *ioevent, void *data) pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, ioevent->iterator.index); - if (pEntry != NULL && pEntry->timer.data == data) { + if (pEntry != NULL && (void *)pEntry == data) { return 0; //do NOT clear current entry } @@ -59,7 +59,7 @@ int ioevent_remove(IOEventPoller *ioevent, void *data) index++) { pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index); - if (pEntry != NULL && pEntry->timer.data == data) { + if (pEntry != NULL && (void *)pEntry == data) { logDebug("file: "__FILE__", line: %d, " "clear ioevent data: %p", __LINE__, data); IOEVENT_CLEAR_DATA(ioevent, index); @@ -83,11 +83,10 @@ static void deal_timeouts(FastTimerEntry *head) entry = entry->next; current->prev = current->next = NULL; //must set NULL because NOT in time wheel - pEventEntry = (IOEventEntry *)current->data; + pEventEntry = (IOEventEntry *)current; if (pEventEntry != NULL) { - pEventEntry->callback(pEventEntry->fd, IOEVENT_TIMEOUT, - current->data); + pEventEntry->callback(pEventEntry->fd, IOEVENT_TIMEOUT, current); } } } @@ -97,16 +96,16 @@ int ioevent_loop(struct nio_thread_data *pThreadData, clean_up_callback, volatile bool *continue_flag) { int result; - IOEventEntry ev_notify; + struct ioevent_notify_entry ev_notify; FastTimerEntry head; struct fast_task_info *task; time_t last_check_time; int count; memset(&ev_notify, 0, sizeof(ev_notify)); - ev_notify.fd = FC_NOTIFY_READ_FD(pThreadData); - ev_notify.callback = recv_notify_callback; - ev_notify.timer.data = pThreadData; + ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData); + ev_notify.event.callback = recv_notify_callback; + ev_notify.thread_data = pThreadData; if (ioevent_attach(&pThreadData->ev_puller, pThreadData->pipe_fds[0], IOEVENT_READ, &ev_notify) != 0) @@ -210,18 +209,7 @@ int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread, return result; } - task->event.timer.data = task; task->event.timer.expires = g_current_time + timeout; - result = fast_timer_add(&pThread->timer, &task->event.timer); - if (result != 0) - { - logError("file: "__FILE__", line: %d, " \ - "fast_timer_add fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - + fast_timer_add(&pThread->timer, &task->event.timer); return 0; } -