diff --git a/HISTORY b/HISTORY index 0e2cd87..a90f7fd 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-09-27 +Version 1.44 2020-09-29 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex diff --git a/src/sched_thread.c b/src/sched_thread.c index c6546da..d5f8c3d 100644 --- a/src/sched_thread.c +++ b/src/sched_thread.c @@ -19,7 +19,7 @@ volatile int g_schedule_flag = false; volatile time_t g_current_time = 0; -static volatile ScheduleArray waiting_schedule_array = {NULL, 0}; +static ScheduleArray waiting_schedule_array = {NULL, 0}; static int waiting_del_id = -1; static ScheduleContext *schedule_context = NULL; @@ -38,7 +38,7 @@ static int sched_cmp_by_next_call_time(const void *p1, const void *p2) ((ScheduleEntry *)p2)->next_call_time; } -static int sched_init_entries(ScheduleArray *pScheduleArray) +static int sched_init_entries(ScheduleEntry *entries, const int count) { ScheduleEntry *pEntry; ScheduleEntry *pEnd; @@ -49,22 +49,22 @@ static int sched_init_entries(ScheduleArray *pScheduleArray) int remain; int interval; - if (pScheduleArray->count < 0) + if (count < 0) { logError("file: "__FILE__", line: %d, " \ "schedule count %d < 0", \ - __LINE__, pScheduleArray->count); + __LINE__, count); return EINVAL; } - if (pScheduleArray->count == 0) + if (count == 0) { return 0; } current_time = time(NULL); localtime_r((time_t *)¤t_time, &tm_current); - pEnd = pScheduleArray->entries + pScheduleArray->count; - for (pEntry=pScheduleArray->entries; pEntryid) { @@ -273,8 +273,16 @@ static int do_check_waiting(ScheduleContext *pContext) waiting_del_id = -1; } - waitingCount = __sync_add_and_fetch( - &waiting_schedule_array.count, 0); + PTHREAD_MUTEX_LOCK(&schedule_context->lock); + waitingCount = waiting_schedule_array.count; + waitingEntries = waiting_schedule_array.entries; + if (waiting_schedule_array.entries != NULL) + { + waiting_schedule_array.count = 0; + waiting_schedule_array.entries = NULL; + } + PTHREAD_MUTEX_UNLOCK(&schedule_context->lock); + if (waitingCount == 0) { if (deleteCount > 0) @@ -286,8 +294,6 @@ static int do_check_waiting(ScheduleContext *pContext) return ENOENT; } - waitingEntries = __sync_add_and_fetch( - &waiting_schedule_array.entries, 0); allocCount = pScheduleArray->count + waitingCount; newEntries = (ScheduleEntry *)fc_malloc(sizeof(ScheduleEntry) * allocCount); if (newEntries == NULL) @@ -338,11 +344,6 @@ static int do_check_waiting(ScheduleContext *pContext) } pScheduleArray->entries = newEntries; pScheduleArray->count = newCount; - - __sync_bool_compare_and_swap(&waiting_schedule_array.count, - waitingCount, 0); - __sync_bool_compare_and_swap(&waiting_schedule_array.entries, - waitingEntries, NULL); free(waitingEntries); sched_make_chain(pContext); @@ -398,7 +399,8 @@ static void *sched_thread_entrance(void *args) int i; pContext = (ScheduleContext *)args; - if (sched_init_entries(&(pContext->scheduleArray)) != 0) + if (sched_init_entries(pContext->scheduleArray.entries, + pContext->scheduleArray.count) != 0) { free(pContext); return NULL; @@ -567,19 +569,12 @@ static int sched_dup_array(const ScheduleArray *pSrcArray, \ return 0; } -/* -static int sched_append_array(const ScheduleArray *pSrcArray, \ +static int sched_append_array(const ScheduleArray *pSrcArray, ScheduleArray *pDestArray) { - int result; int bytes; ScheduleEntry *new_entries; - if (pSrcArray->count == 0) - { - return 0; - } - bytes = sizeof(ScheduleEntry) * (pDestArray->count + pSrcArray->count); new_entries = (ScheduleEntry *)fc_malloc(bytes); if (new_entries == NULL) @@ -600,65 +595,67 @@ static int sched_append_array(const ScheduleArray *pSrcArray, \ pDestArray->count += pSrcArray->count; return 0; } -*/ + +int sched_thread_init_ex(ScheduleContext **ppContext) +{ + int result; + + *ppContext = (ScheduleContext *)fc_malloc(sizeof(ScheduleContext)); + if (*ppContext == NULL) + { + return ENOMEM; + } + memset(*ppContext, 0, sizeof(ScheduleContext)); + + if ((result=init_pthread_lock(&(*ppContext)->lock)) != 0) + { + return result; + } + + return 0; +} int sched_add_entries(const ScheduleArray *pScheduleArray) { + ScheduleEntry *newStart; + int old_count; int result; - int i; - ScheduleArray temp_schedule_array; - if (pScheduleArray->count == 0) + if (pScheduleArray->count <= 0) { - logDebug("file: "__FILE__", line: %d, " + logWarning("file: "__FILE__", line: %d, " "no schedule entry", __LINE__); return ENOENT; } - if ((result=sched_dup_array(pScheduleArray, - &temp_schedule_array)) != 0) + if (schedule_context == NULL) { - return result; - } - if ((result=sched_init_entries(&temp_schedule_array)) != 0) - { - return result; - } - - if (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL) - { - if (!__sync_add_and_fetch(&g_schedule_flag, 0)) + if ((result=sched_thread_init_ex(&schedule_context)) != 0) { - logDebug("file: "__FILE__", line: %d, " - "schedule thread not ready yet", __LINE__); - } - i = 0; - while (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL) - { - if (i == 0) - { - logDebug("file: "__FILE__", line: %d, " - "waiting for schedule array ready, " - "schedule_flag: %d ...", __LINE__, - __sync_add_and_fetch(&g_schedule_flag, 0)); - } - else if (++i == 300) - { - logError("file: "__FILE__", line: %d, " - "waiting for schedule array ready timeout, " - "schedule_flag: %d, wait count: %d", __LINE__, - __sync_add_and_fetch(&g_schedule_flag, 0), i); - break; - } - fc_sleep_ms(10); + return result; } } - __sync_bool_compare_and_swap(&waiting_schedule_array.entries, - NULL, temp_schedule_array.entries); - __sync_bool_compare_and_swap(&waiting_schedule_array.count, - 0, temp_schedule_array.count); - return 0; + PTHREAD_MUTEX_LOCK(&schedule_context->lock); + do { + old_count = waiting_schedule_array.count; + if ((result=sched_append_array(pScheduleArray, + &waiting_schedule_array)) != 0) + { + break; + } + + newStart = waiting_schedule_array.entries + old_count; + if ((result=sched_init_entries(newStart, pScheduleArray->count)) != 0) + { + waiting_schedule_array.count = newStart - + waiting_schedule_array.entries; //rollback + break; + } + } while (0); + PTHREAD_MUTEX_UNLOCK(&schedule_context->lock); + + return result; } int sched_del_entry(const int id) @@ -683,18 +680,10 @@ int sched_del_entry(const int id) int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, const int stack_size, bool * volatile pcontinue_flag, - ScheduleContext **ppContext) + ScheduleContext *pContext) { int result; pthread_attr_t thread_attr; - ScheduleContext *pContext; - - pContext = (ScheduleContext *)fc_malloc(sizeof(ScheduleContext)); - if (pContext == NULL) - { - return ENOMEM; - } - memset(pContext, 0, sizeof(ScheduleContext)); if ((result=init_pthread_attr(&thread_attr, stack_size)) != 0) { @@ -702,7 +691,7 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, return result; } - if ((result=sched_dup_array(pScheduleArray, \ + if ((result=sched_dup_array(pScheduleArray, &(pContext->scheduleArray))) != 0) { free(pContext); @@ -747,7 +736,6 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, __LINE__, result, STRERROR(result)); } - *ppContext = pContext; pthread_attr_destroy(&thread_attr); return result; } @@ -755,8 +743,16 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, const int stack_size, bool * volatile pcontinue_flag) { + int result; + if (schedule_context == NULL) + { + if ((result=sched_thread_init_ex(&schedule_context)) != 0) + { + return result; + } + } return sched_start_ex(pScheduleArray, ptid, stack_size, - pcontinue_flag, &schedule_context); + pcontinue_flag, schedule_context); } void sched_set_delay_params(const int slot_count, const int alloc_once) diff --git a/src/sched_thread.h b/src/sched_thread.h index ef364f1..6bd9085 100644 --- a/src/sched_thread.h +++ b/src/sched_thread.h @@ -69,6 +69,7 @@ typedef struct FastTimer timer; //for delay task bool timer_init; struct fc_queue delay_queue; + pthread_mutex_t lock; bool *pcontinue_flag; } ScheduleContext; @@ -148,18 +149,26 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func, int sched_add_delay_task(TaskFunc task_func, void *func_args, const int delay_seconds, const bool new_thread); + +/** init the schedule context + * parameters: + * pContext: store the ScheduleContext pointer + * return: error no, 0 for success, != 0 fail +*/ +int sched_thread_init_ex(ScheduleContext **ppContext); + /** execute the schedule thread * parameters: * pScheduleArray: the schedule tasks * ptid: store the schedule thread id * stack_size: set thread stack size (byes) * pcontinue_flag: main process continue running flag - * ppContext: store the ScheduleContext pointer + * pContext: the ScheduleContext pointer * return: error no, 0 for success, != 0 fail */ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, const int stack_size, bool * volatile pcontinue_flag, - ScheduleContext **ppContext); + ScheduleContext *pContext); int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ const int stack_size, bool * volatile pcontinue_flag);