sched_thread.c: sched_add_entries gracefully

pull/37/head
YuQing 2020-09-29 22:18:30 +08:00
parent 118f1e2e2e
commit dc40977500
3 changed files with 89 additions and 84 deletions

View File

@ -1,5 +1,5 @@
Version 1.44 2020-09-27 Version 1.44 2020-09-29
* add test file src/tests/test_pthread_lock.c * add test file src/tests/test_pthread_lock.c
* add uniq_skiplist.[hc] * add uniq_skiplist.[hc]
* add function split_string_ex * add function split_string_ex

View File

@ -19,7 +19,7 @@
volatile int g_schedule_flag = false; volatile int g_schedule_flag = false;
volatile time_t g_current_time = 0; volatile time_t g_current_time = 0;
static volatile ScheduleArray waiting_schedule_array = {NULL, 0}; static ScheduleArray waiting_schedule_array = {NULL, 0};
static int waiting_del_id = -1; static int waiting_del_id = -1;
static ScheduleContext *schedule_context = NULL; static ScheduleContext *schedule_context = NULL;
@ -38,7 +38,7 @@ static int sched_cmp_by_next_call_time(const void *p1, const void *p2)
((ScheduleEntry *)p2)->next_call_time; ((ScheduleEntry *)p2)->next_call_time;
} }
static int sched_init_entries(ScheduleArray *pScheduleArray) static int sched_init_entries(ScheduleEntry *entries, const int count)
{ {
ScheduleEntry *pEntry; ScheduleEntry *pEntry;
ScheduleEntry *pEnd; ScheduleEntry *pEnd;
@ -49,22 +49,22 @@ static int sched_init_entries(ScheduleArray *pScheduleArray)
int remain; int remain;
int interval; int interval;
if (pScheduleArray->count < 0) if (count < 0)
{ {
logError("file: "__FILE__", line: %d, " \ logError("file: "__FILE__", line: %d, " \
"schedule count %d < 0", \ "schedule count %d < 0", \
__LINE__, pScheduleArray->count); __LINE__, count);
return EINVAL; return EINVAL;
} }
if (pScheduleArray->count == 0) if (count == 0)
{ {
return 0; return 0;
} }
current_time = time(NULL); current_time = time(NULL);
localtime_r((time_t *)&current_time, &tm_current); localtime_r((time_t *)&current_time, &tm_current);
pEnd = pScheduleArray->entries + pScheduleArray->count; pEnd = entries + count;
for (pEntry=pScheduleArray->entries; pEntry<pEnd; pEntry++) for (pEntry=entries; pEntry<pEnd; pEntry++)
{ {
if (next_id < pEntry->id) if (next_id < pEntry->id)
{ {
@ -273,8 +273,16 @@ static int do_check_waiting(ScheduleContext *pContext)
waiting_del_id = -1; waiting_del_id = -1;
} }
waitingCount = __sync_add_and_fetch( PTHREAD_MUTEX_LOCK(&schedule_context->lock);
&waiting_schedule_array.count, 0); waitingCount = waiting_schedule_array.count;
waitingEntries = waiting_schedule_array.entries;
if (waiting_schedule_array.entries != NULL)
{
waiting_schedule_array.count = 0;
waiting_schedule_array.entries = NULL;
}
PTHREAD_MUTEX_UNLOCK(&schedule_context->lock);
if (waitingCount == 0) if (waitingCount == 0)
{ {
if (deleteCount > 0) if (deleteCount > 0)
@ -286,8 +294,6 @@ static int do_check_waiting(ScheduleContext *pContext)
return ENOENT; return ENOENT;
} }
waitingEntries = __sync_add_and_fetch(
&waiting_schedule_array.entries, 0);
allocCount = pScheduleArray->count + waitingCount; allocCount = pScheduleArray->count + waitingCount;
newEntries = (ScheduleEntry *)fc_malloc(sizeof(ScheduleEntry) * allocCount); newEntries = (ScheduleEntry *)fc_malloc(sizeof(ScheduleEntry) * allocCount);
if (newEntries == NULL) if (newEntries == NULL)
@ -338,11 +344,6 @@ static int do_check_waiting(ScheduleContext *pContext)
} }
pScheduleArray->entries = newEntries; pScheduleArray->entries = newEntries;
pScheduleArray->count = newCount; pScheduleArray->count = newCount;
__sync_bool_compare_and_swap(&waiting_schedule_array.count,
waitingCount, 0);
__sync_bool_compare_and_swap(&waiting_schedule_array.entries,
waitingEntries, NULL);
free(waitingEntries); free(waitingEntries);
sched_make_chain(pContext); sched_make_chain(pContext);
@ -398,7 +399,8 @@ static void *sched_thread_entrance(void *args)
int i; int i;
pContext = (ScheduleContext *)args; pContext = (ScheduleContext *)args;
if (sched_init_entries(&(pContext->scheduleArray)) != 0) if (sched_init_entries(pContext->scheduleArray.entries,
pContext->scheduleArray.count) != 0)
{ {
free(pContext); free(pContext);
return NULL; return NULL;
@ -567,19 +569,12 @@ static int sched_dup_array(const ScheduleArray *pSrcArray, \
return 0; return 0;
} }
/* static int sched_append_array(const ScheduleArray *pSrcArray,
static int sched_append_array(const ScheduleArray *pSrcArray, \
ScheduleArray *pDestArray) ScheduleArray *pDestArray)
{ {
int result;
int bytes; int bytes;
ScheduleEntry *new_entries; ScheduleEntry *new_entries;
if (pSrcArray->count == 0)
{
return 0;
}
bytes = sizeof(ScheduleEntry) * (pDestArray->count + pSrcArray->count); bytes = sizeof(ScheduleEntry) * (pDestArray->count + pSrcArray->count);
new_entries = (ScheduleEntry *)fc_malloc(bytes); new_entries = (ScheduleEntry *)fc_malloc(bytes);
if (new_entries == NULL) if (new_entries == NULL)
@ -600,65 +595,67 @@ static int sched_append_array(const ScheduleArray *pSrcArray, \
pDestArray->count += pSrcArray->count; pDestArray->count += pSrcArray->count;
return 0; return 0;
} }
*/
int sched_thread_init_ex(ScheduleContext **ppContext)
{
int result;
*ppContext = (ScheduleContext *)fc_malloc(sizeof(ScheduleContext));
if (*ppContext == NULL)
{
return ENOMEM;
}
memset(*ppContext, 0, sizeof(ScheduleContext));
if ((result=init_pthread_lock(&(*ppContext)->lock)) != 0)
{
return result;
}
return 0;
}
int sched_add_entries(const ScheduleArray *pScheduleArray) int sched_add_entries(const ScheduleArray *pScheduleArray)
{ {
ScheduleEntry *newStart;
int old_count;
int result; int result;
int i;
ScheduleArray temp_schedule_array;
if (pScheduleArray->count == 0) if (pScheduleArray->count <= 0)
{ {
logDebug("file: "__FILE__", line: %d, " logWarning("file: "__FILE__", line: %d, "
"no schedule entry", __LINE__); "no schedule entry", __LINE__);
return ENOENT; return ENOENT;
} }
if ((result=sched_dup_array(pScheduleArray, if (schedule_context == NULL)
&temp_schedule_array)) != 0) {
if ((result=sched_thread_init_ex(&schedule_context)) != 0)
{ {
return result; return result;
} }
if ((result=sched_init_entries(&temp_schedule_array)) != 0)
{
return result;
} }
if (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL) PTHREAD_MUTEX_LOCK(&schedule_context->lock);
do {
old_count = waiting_schedule_array.count;
if ((result=sched_append_array(pScheduleArray,
&waiting_schedule_array)) != 0)
{ {
if (!__sync_add_and_fetch(&g_schedule_flag, 0))
{
logDebug("file: "__FILE__", line: %d, "
"schedule thread not ready yet", __LINE__);
}
i = 0;
while (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL)
{
if (i == 0)
{
logDebug("file: "__FILE__", line: %d, "
"waiting for schedule array ready, "
"schedule_flag: %d ...", __LINE__,
__sync_add_and_fetch(&g_schedule_flag, 0));
}
else if (++i == 300)
{
logError("file: "__FILE__", line: %d, "
"waiting for schedule array ready timeout, "
"schedule_flag: %d, wait count: %d", __LINE__,
__sync_add_and_fetch(&g_schedule_flag, 0), i);
break; break;
} }
fc_sleep_ms(10);
}
}
__sync_bool_compare_and_swap(&waiting_schedule_array.entries, newStart = waiting_schedule_array.entries + old_count;
NULL, temp_schedule_array.entries); if ((result=sched_init_entries(newStart, pScheduleArray->count)) != 0)
__sync_bool_compare_and_swap(&waiting_schedule_array.count, {
0, temp_schedule_array.count); waiting_schedule_array.count = newStart -
return 0; waiting_schedule_array.entries; //rollback
break;
}
} while (0);
PTHREAD_MUTEX_UNLOCK(&schedule_context->lock);
return result;
} }
int sched_del_entry(const int id) int sched_del_entry(const int id)
@ -683,18 +680,10 @@ int sched_del_entry(const int id)
int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
const int stack_size, bool * volatile pcontinue_flag, const int stack_size, bool * volatile pcontinue_flag,
ScheduleContext **ppContext) ScheduleContext *pContext)
{ {
int result; int result;
pthread_attr_t thread_attr; pthread_attr_t thread_attr;
ScheduleContext *pContext;
pContext = (ScheduleContext *)fc_malloc(sizeof(ScheduleContext));
if (pContext == NULL)
{
return ENOMEM;
}
memset(pContext, 0, sizeof(ScheduleContext));
if ((result=init_pthread_attr(&thread_attr, stack_size)) != 0) if ((result=init_pthread_attr(&thread_attr, stack_size)) != 0)
{ {
@ -702,7 +691,7 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
return result; return result;
} }
if ((result=sched_dup_array(pScheduleArray, \ if ((result=sched_dup_array(pScheduleArray,
&(pContext->scheduleArray))) != 0) &(pContext->scheduleArray))) != 0)
{ {
free(pContext); free(pContext);
@ -747,7 +736,6 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
__LINE__, result, STRERROR(result)); __LINE__, result, STRERROR(result));
} }
*ppContext = pContext;
pthread_attr_destroy(&thread_attr); pthread_attr_destroy(&thread_attr);
return result; return result;
} }
@ -755,8 +743,16 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid,
const int stack_size, bool * volatile pcontinue_flag) const int stack_size, bool * volatile pcontinue_flag)
{ {
int result;
if (schedule_context == NULL)
{
if ((result=sched_thread_init_ex(&schedule_context)) != 0)
{
return result;
}
}
return sched_start_ex(pScheduleArray, ptid, stack_size, return sched_start_ex(pScheduleArray, ptid, stack_size,
pcontinue_flag, &schedule_context); pcontinue_flag, schedule_context);
} }
void sched_set_delay_params(const int slot_count, const int alloc_once) void sched_set_delay_params(const int slot_count, const int alloc_once)

View File

@ -69,6 +69,7 @@ typedef struct
FastTimer timer; //for delay task FastTimer timer; //for delay task
bool timer_init; bool timer_init;
struct fc_queue delay_queue; struct fc_queue delay_queue;
pthread_mutex_t lock;
bool *pcontinue_flag; bool *pcontinue_flag;
} ScheduleContext; } ScheduleContext;
@ -148,18 +149,26 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
int sched_add_delay_task(TaskFunc task_func, void *func_args, int sched_add_delay_task(TaskFunc task_func, void *func_args,
const int delay_seconds, const bool new_thread); const int delay_seconds, const bool new_thread);
/** init the schedule context
* parameters:
* pContext: store the ScheduleContext pointer
* return: error no, 0 for success, != 0 fail
*/
int sched_thread_init_ex(ScheduleContext **ppContext);
/** execute the schedule thread /** execute the schedule thread
* parameters: * parameters:
* pScheduleArray: the schedule tasks * pScheduleArray: the schedule tasks
* ptid: store the schedule thread id * ptid: store the schedule thread id
* stack_size: set thread stack size (byes) * stack_size: set thread stack size (byes)
* pcontinue_flag: main process continue running flag * pcontinue_flag: main process continue running flag
* ppContext: store the ScheduleContext pointer * pContext: the ScheduleContext pointer
* return: error no, 0 for success, != 0 fail * return: error no, 0 for success, != 0 fail
*/ */
int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid, int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
const int stack_size, bool * volatile pcontinue_flag, const int stack_size, bool * volatile pcontinue_flag,
ScheduleContext **ppContext); ScheduleContext *pContext);
int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \ int sched_start(ScheduleArray *pScheduleArray, pthread_t *ptid, \
const int stack_size, bool * volatile pcontinue_flag); const int stack_size, bool * volatile pcontinue_flag);