From 8718818dc023d252eb562553d063e2b3e0f4fc1d Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Thu, 26 Nov 2020 16:18:37 +0800 Subject: [PATCH] add locked_timer.[hc]: time wheel timer with lock --- HISTORY | 4 +- src/Makefile.in | 6 +- src/locked_timer.c | 323 +++++++++++++++++++++++++++++++++++++++++++++ src/locked_timer.h | 91 +++++++++++++ 4 files changed, 419 insertions(+), 5 deletions(-) create mode 100644 src/locked_timer.c create mode 100644 src/locked_timer.h diff --git a/HISTORY b/HISTORY index e0b67e1..6fdafd6 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-11-23 +Version 1.44 2020-11-26 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -37,7 +37,7 @@ Version 1.44 2020-11-23 * shared_func.[hc]: add function fc_path_contains * fast_mblock.[hc]: support alloc elements limit * sockopt.[hc]: add function asyncconnectserverbyip - * fast_timer.[hc]: support lock for option + * add locked_timer.[hc]: time wheel timer with lock Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/src/Makefile.in b/src/Makefile.in index 10f595e..c662e43 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -9,7 +9,7 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \ logger.lo sockopt.lo base64.lo sched_thread.lo \ http_func.lo md5.lo pthread_func.lo local_ip_func.lo \ avl_tree.lo ioevent.lo ioevent_loop.lo fast_task_queue.lo \ - fast_timer.lo process_ctrl.lo fast_mblock.lo \ + fast_timer.lo locked_timer.lo process_ctrl.lo fast_mblock.lo \ connection_pool.lo fast_mpool.lo fast_allocator.lo \ fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \ system_info.lo fast_blocked_queue.lo id_generator.lo \ @@ -22,7 +22,7 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ logger.o sockopt.o base64.o sched_thread.o \ http_func.o md5.o pthread_func.o local_ip_func.o \ avl_tree.o ioevent.o ioevent_loop.o fast_task_queue.o \ - fast_timer.o process_ctrl.o fast_mblock.o \ + fast_timer.o locked_timer.o process_ctrl.o fast_mblock.o \ connection_pool.o fast_mpool.o fast_allocator.o \ fast_buffer.o multi_skiplist.o flat_skiplist.o \ system_info.o fast_blocked_queue.o id_generator.o \ @@ -35,7 +35,7 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ sockopt.h sched_thread.h http_func.h md5.h local_ip_func.h \ avl_tree.h ioevent.h ioevent_loop.h fast_task_queue.h \ - fast_timer.h process_ctrl.h fast_mblock.h \ + fast_timer.h locked_timer.h process_ctrl.h fast_mblock.h \ connection_pool.h fast_mpool.h fast_allocator.h \ fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h \ skiplist_common.h system_info.h fast_blocked_queue.h \ diff --git a/src/locked_timer.c b/src/locked_timer.c new file mode 100644 index 0000000..f8df8ad --- /dev/null +++ b/src/locked_timer.c @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the Lesser GNU General Public License, version 3 + * or later ("LGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "logger.h" +#include "fc_memory.h" +#include "shared_func.h" +#include "pthread_func.h" +#include "locked_timer.h" + +static int locked_timer_init_slots(LockedTimer *timer) +{ + int bytes; + int result; + LockedTimerSlot *slot; + LockedTimerSlot *send; + pthread_mutex_t *lock; + pthread_mutex_t *lend; + + bytes = sizeof(LockedTimerSlot) * timer->slot_count; + timer->slots = (LockedTimerSlot *)fc_malloc(bytes); + if (timer->slots == NULL) { + return ENOMEM; + } + memset(timer->slots, 0, bytes); + + send = timer->slots + timer->slot_count; + for (slot=timer->slots; slotlock)) != 0) { + return result; + } + FC_INIT_LIST_HEAD(&slot->head); + } + + timer->entry_shares.locks = (pthread_mutex_t *)fc_malloc( + sizeof(pthread_mutex_t) * timer->entry_shares.count); + if (timer->entry_shares.locks == NULL) { + return ENOMEM; + } + + lend = timer->entry_shares.locks + timer->entry_shares.count; + for (lock=timer->entry_shares.locks; lockslot_count = slot_count; + timer->entry_shares.count = shared_lock_count; + timer->base_time = current_time; //base time for slot 0 + timer->current_time = current_time; + return locked_timer_init_slots(timer); +} + +void locked_timer_destroy(LockedTimer *timer) +{ + LockedTimerSlot *slot; + LockedTimerSlot *send; + pthread_mutex_t *lock; + pthread_mutex_t *lend; + + if (timer->slots != NULL) { + send = timer->slots + timer->slot_count; + for (slot=timer->slots; slotlock); + } + + lend = timer->entry_shares.locks + timer->entry_shares.count; + for (lock=timer->entry_shares.locks; lockentry_shares.locks); + timer->entry_shares.locks = NULL; + timer->entry_shares.count = 0; + + free(timer->slots); + timer->slots = NULL; + } +} + +#define TIMER_GET_SLOT_INDEX(timer, expires) \ + (((expires) - timer->base_time) % timer->slot_count) + +#define TIMER_GET_SLOT_POINTER(timer, expires) \ + (timer->slots + TIMER_GET_SLOT_INDEX(timer, expires)) + +#define LOCKED_TIMER_ENTRY_LOCK(timer, entry) \ + PTHREAD_MUTEX_LOCK(timer->entry_shares.locks + entry->lock_index) + +#define LOCKED_TIMER_ENTRY_UNLOCK(timer, entry) \ + PTHREAD_MUTEX_UNLOCK(timer->entry_shares.locks + entry->lock_index) + +static inline void add_entry(LockedTimer *timer, LockedTimerSlot *slot, + LockedTimerEntry *entry, const int64_t expires, + const bool set_expires, const bool set_entry_lock) +{ + if (set_entry_lock) { + entry->lock_index = ((unsigned long)entry) % + timer->entry_shares.count; + } + + LOCKED_TIMER_ENTRY_LOCK(timer, entry); + entry->status = FAST_TIMER_STATUS_NORMAL; + entry->slot_index = slot - timer->slots; + LOCKED_TIMER_ENTRY_UNLOCK(timer, entry); + + PTHREAD_MUTEX_LOCK(&slot->lock); + if (set_expires) { + entry->expires = expires; + } + + fc_list_add_tail(&entry->dlink, &slot->head); + entry->rehash = false; + PTHREAD_MUTEX_UNLOCK(&slot->lock); +} + +#define check_entry_status(timer, entry, slot_index) \ + check_set_entry_status(timer, entry, slot_index, \ + FAST_TIMER_STATUS_NONE) + +static inline int check_set_entry_status(LockedTimer *timer, + LockedTimerEntry *entry, int *slot_index, const int new_status) +{ + int result; + + while (1) { + LOCKED_TIMER_ENTRY_LOCK(timer, entry); + switch (entry->status) { + case FAST_TIMER_STATUS_CLEARED: + result = ECANCELED; + break; + case FAST_TIMER_STATUS_TIMEOUT: + result = ETIMEDOUT; + break; + case FAST_TIMER_STATUS_MOVING: + result = EAGAIN; + break; + default: + result = 0; + if (new_status != FAST_TIMER_STATUS_NONE) { + entry->status = new_status; + } + break; + } + *slot_index = entry->slot_index; + LOCKED_TIMER_ENTRY_UNLOCK(timer, entry); + + if (result == EAGAIN) { + fc_sleep_ms(1); + } else { + return result; + } + } +} + +void locked_timer_add_ex(LockedTimer *timer, LockedTimerEntry *entry, + const int64_t expires, const bool set_expires) +{ + LockedTimerSlot *slot; + int64_t new_expires; + bool new_set_expires; + + if (expires > timer->current_time) { + new_expires = expires; + new_set_expires = set_expires; + } else { + new_expires = timer->current_time; + new_set_expires = true; + } + slot = TIMER_GET_SLOT_POINTER(timer, new_expires); + add_entry(timer, slot, entry, new_expires, new_set_expires, true); +} + +int locked_timer_modify(LockedTimer *timer, LockedTimerEntry *entry, + const int64_t new_expires) +{ + int result; + int slot_index; + + if (new_expires > entry->expires) { + if ((result=check_entry_status(timer, entry, &slot_index)) != 0) { + return result; + } + + PTHREAD_MUTEX_LOCK(&(timer->slots + slot_index)->lock); + entry->rehash = TIMER_GET_SLOT_INDEX(timer, + new_expires) != slot_index; + entry->expires = new_expires; //lazy move + PTHREAD_MUTEX_UNLOCK(&(timer->slots + slot_index)->lock); + } else if (new_expires < entry->expires) { + if ((result=locked_timer_remove_ex(timer, entry, + FAST_TIMER_STATUS_MOVING)) == 0) + { + locked_timer_add_ex(timer, entry, new_expires, true); + } + return result; + } + + return 0; +} + +int locked_timer_remove_ex(LockedTimer *timer, LockedTimerEntry *entry, + const int new_status) +{ + int result; + int slot_index; + + if ((result=check_set_entry_status(timer, entry, + &slot_index, new_status)) != 0) + { + return result; + } + + PTHREAD_MUTEX_LOCK(&(timer->slots + slot_index)->lock); + fc_list_del_init(&entry->dlink); + PTHREAD_MUTEX_UNLOCK(&(timer->slots + slot_index)->lock); + return 0; +} + +LockedTimerSlot *locked_timer_slot_get(LockedTimer *timer, const int64_t current_time) +{ + if (timer->current_time >= current_time) { + return NULL; + } + + return TIMER_GET_SLOT_POINTER(timer, timer->current_time++); +} + +int locked_timer_timeouts_get(LockedTimer *timer, const int64_t current_time, + LockedTimerEntry *head) +{ + LockedTimerSlot *slot; + LockedTimerSlot *new_slot; + LockedTimerEntry *entry; + LockedTimerEntry *tmp; + LockedTimerEntry *tail; + bool is_valid; + int count; + + if (timer->current_time >= current_time) { + head->next = NULL; + return 0; + } + + tail = head; + count = 0; + while (timer->current_time < current_time) { + slot = TIMER_GET_SLOT_POINTER(timer, timer->current_time++); + PTHREAD_MUTEX_LOCK(&slot->lock); + fc_list_for_each_entry_safe(entry, tmp, &slot->head, dlink) { + if (entry->expires >= current_time) { //not expired + if (entry->rehash) { + new_slot = TIMER_GET_SLOT_POINTER(timer, entry->expires); + if (new_slot != slot) { //check to avoid deadlock + LOCKED_TIMER_ENTRY_LOCK(timer, entry); + if (entry->status == FAST_TIMER_STATUS_NORMAL) { + entry->status = FAST_TIMER_STATUS_MOVING; + is_valid = true; + } else { + is_valid = false; + } + LOCKED_TIMER_ENTRY_UNLOCK(timer, entry); + + if (is_valid) { + fc_list_del_init(&entry->dlink); + add_entry(timer, new_slot, entry, + entry->expires, false, false); + } + } else { + entry->rehash = false; + } + } + } else { //expired + LOCKED_TIMER_ENTRY_LOCK(timer, entry); + if (entry->status == FAST_TIMER_STATUS_NORMAL) { + entry->status = FAST_TIMER_STATUS_TIMEOUT; + is_valid = true; + } else { + is_valid = false; + } + LOCKED_TIMER_ENTRY_UNLOCK(timer, entry); + + if (is_valid) { + fc_list_del_init(&entry->dlink); + tail->next = entry; + tail = entry; + count++; + } + } + } + + PTHREAD_MUTEX_UNLOCK(&slot->lock); + } + + tail->next = NULL; + return count; +} diff --git a/src/locked_timer.h b/src/locked_timer.h new file mode 100644 index 0000000..ffb90ca --- /dev/null +++ b/src/locked_timer.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2020 YuQing <384681@qq.com> + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the Lesser GNU General Public License, version 3 + * or later ("LGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __LOCKED_TIMER_H__ +#define __LOCKED_TIMER_H__ + +#include +#include +#include "common_define.h" +#include "fc_list.h" + +#define FAST_TIMER_STATUS_NONE 0 +#define FAST_TIMER_STATUS_NORMAL 1 +#define FAST_TIMER_STATUS_MOVING 2 +#define FAST_TIMER_STATUS_TIMEOUT 3 +#define FAST_TIMER_STATUS_CLEARED 4 + +struct locked_timer_slot; +typedef struct locked_timer_entry { + int64_t expires; + struct fc_list_head dlink; + struct locked_timer_entry *next; //for timeout chain + uint32_t slot_index; //for slot lock + uint16_t lock_index; //for entry lock + uint8_t status; + bool rehash; +} LockedTimerEntry; + +typedef struct locked_timer_slot { + struct fc_list_head head; + pthread_mutex_t lock; +} LockedTimerSlot; + +typedef struct locked_timer_shared_locks { + uint16_t count; + pthread_mutex_t *locks; +} LockedTimerSharedLocks; + +typedef struct locked_timer { + int slot_count; //time wheel slot count + LockedTimerSharedLocks entry_shares; + int64_t base_time; //base time for slot 0 + int64_t current_time; + LockedTimerSlot *slots; +} LockedTimer; + +#ifdef __cplusplus +extern "C" { +#endif + +#define locked_timer_add(timer, entry) \ + locked_timer_add_ex(timer, entry, (entry)->expires, false) + +#define locked_timer_remove(timer, entry) \ + locked_timer_remove_ex(timer, entry, FAST_TIMER_STATUS_CLEARED) + +int locked_timer_init(LockedTimer *timer, const int slot_count, + const int64_t current_time, const int shared_lock_count); + +void locked_timer_destroy(LockedTimer *timer); + +void locked_timer_add_ex(LockedTimer *timer, LockedTimerEntry *entry, + const int64_t expires, const bool set_expires); + +int locked_timer_remove_ex(LockedTimer *timer, LockedTimerEntry *entry, + const int new_status); + +int locked_timer_modify(LockedTimer *timer, LockedTimerEntry *entry, + const int64_t new_expires); + +LockedTimerSlot *locked_timer_slot_get(LockedTimer *timer, const int64_t current_time); +int locked_timer_timeouts_get(LockedTimer *timer, const int64_t current_time, + LockedTimerEntry *head); + +#ifdef __cplusplus +} +#endif + +#endif