schedule waiting_schedule_array use atomic opt

pull/37/head
YuQing 2020-09-08 18:53:08 +08:00
parent 130d7fe110
commit 488e483d22
2 changed files with 68 additions and 62 deletions

View File

@ -16,10 +16,10 @@
#include "fc_memory.h"
#include "sched_thread.h"
volatile bool g_schedule_flag = false;
volatile int g_schedule_flag = false;
volatile time_t g_current_time = 0;
static ScheduleArray waiting_schedule_array = {NULL, 0};
static volatile ScheduleArray waiting_schedule_array = {NULL, 0};
static int waiting_del_id = -1;
static ScheduleContext *schedule_context = NULL;
@ -226,6 +226,7 @@ static int print_all_sched_entries(ScheduleArray *pScheduleArray)
static int do_check_waiting(ScheduleContext *pContext)
{
ScheduleArray *pScheduleArray;
ScheduleEntry *waitingEntries;
ScheduleEntry *newEntries;
ScheduleEntry *pWaitingEntry;
ScheduleEntry *pWaitingEnd;
@ -234,6 +235,7 @@ static int do_check_waiting(ScheduleContext *pContext)
int allocCount;
int newCount;
int deleteCount;
int waitingCount;
pScheduleArray = &(pContext->scheduleArray);
deleteCount = 0;
@ -271,7 +273,9 @@ static int do_check_waiting(ScheduleContext *pContext)
waiting_del_id = -1;
}
if (waiting_schedule_array.count == 0)
waitingCount = __sync_add_and_fetch(
&waiting_schedule_array.count, 0);
if (waitingCount == 0)
{
if (deleteCount > 0)
{
@ -282,7 +286,9 @@ static int do_check_waiting(ScheduleContext *pContext)
return ENOENT;
}
allocCount = pScheduleArray->count + waiting_schedule_array.count;
waitingEntries = __sync_add_and_fetch(
&waiting_schedule_array.entries, 0);
allocCount = pScheduleArray->count + waitingCount;
newEntries = (ScheduleEntry *)fc_malloc(sizeof(ScheduleEntry) * allocCount);
if (newEntries == NULL)
{
@ -295,13 +301,13 @@ static int do_check_waiting(ScheduleContext *pContext)
if (pScheduleArray->count > 0)
{
memcpy(newEntries, pScheduleArray->entries, \
memcpy(newEntries, pScheduleArray->entries,
sizeof(ScheduleEntry) * pScheduleArray->count);
}
newCount = pScheduleArray->count;
pWaitingEnd = waiting_schedule_array.entries + waiting_schedule_array.count;
for (pWaitingEntry=waiting_schedule_array.entries; \
pWaitingEntry<pWaitingEnd; pWaitingEntry++)
pWaitingEnd = waitingEntries + waitingCount;
for (pWaitingEntry=waitingEntries; pWaitingEntry<pWaitingEnd;
pWaitingEntry++)
{
pSchedEnd = newEntries + newCount;
for (pSchedEntry=newEntries; pSchedEntry<pSchedEnd; \
@ -324,7 +330,7 @@ static int do_check_waiting(ScheduleContext *pContext)
logDebug("file: "__FILE__", line: %d, " \
"schedule add entries: %d, replace entries: %d",
__LINE__, newCount - pScheduleArray->count, \
waiting_schedule_array.count - (newCount - pScheduleArray->count));
waitingCount - (newCount - pScheduleArray->count));
if (pScheduleArray->entries != NULL)
{
@ -333,12 +339,13 @@ static int do_check_waiting(ScheduleContext *pContext)
pScheduleArray->entries = newEntries;
pScheduleArray->count = newCount;
free(waiting_schedule_array.entries);
waiting_schedule_array.count = 0;
waiting_schedule_array.entries = NULL;
__sync_bool_compare_and_swap(&waiting_schedule_array.count,
waitingCount, 0);
__sync_bool_compare_and_swap(&waiting_schedule_array.entries,
waitingEntries, NULL);
free(waitingEntries);
sched_make_chain(pContext);
return 0;
}
@ -398,7 +405,7 @@ static void *sched_thread_entrance(void *args)
}
sched_make_chain(pContext);
g_schedule_flag = true;
__sync_bool_compare_and_swap(&g_schedule_flag, 0, 1);
while (*(pContext->pcontinue_flag))
{
g_current_time = time(NULL);
@ -527,7 +534,7 @@ static void *sched_thread_entrance(void *args)
}
}
g_schedule_flag = false;
__sync_bool_compare_and_swap(&g_schedule_flag, 1, 0);
logDebug("file: "__FILE__", line: %d, " \
"schedule thread exit", __LINE__);
@ -598,6 +605,7 @@ static int sched_append_array(const ScheduleArray *pSrcArray, \
int sched_add_entries(const ScheduleArray *pScheduleArray)
{
int result;
int i;
ScheduleArray temp_schedule_array;
if (pScheduleArray->count == 0)
@ -617,21 +625,39 @@ int sched_add_entries(const ScheduleArray *pScheduleArray)
return result;
}
if (waiting_schedule_array.entries != NULL)
if (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL)
{
if (g_schedule_flag)
{
while (waiting_schedule_array.entries != NULL)
if (!__sync_add_and_fetch(&g_schedule_flag, 0))
{
logDebug("file: "__FILE__", line: %d, "
"waiting for schedule array ready ...", __LINE__);
"schedule thread not ready yet", __LINE__);
}
i = 0;
while (__sync_add_and_fetch(&waiting_schedule_array.entries, 0) != NULL)
{
if (i == 0)
{
logDebug("file: "__FILE__", line: %d, "
"waiting for schedule array ready, "
"schedule_flag: %d ...", __LINE__,
__sync_add_and_fetch(&g_schedule_flag, 0));
}
else if (++i == 10)
{
logError("file: "__FILE__", line: %d, "
"waiting for schedule array ready timeout, "
"schedule_flag: %d, wait count: %d", __LINE__,
__sync_add_and_fetch(&g_schedule_flag, 0), i);
break;
}
sleep(1);
}
}
}
waiting_schedule_array.entries = temp_schedule_array.entries;
waiting_schedule_array.count = temp_schedule_array.count;
__sync_bool_compare_and_swap(&waiting_schedule_array.entries,
NULL, temp_schedule_array.entries);
__sync_bool_compare_and_swap(&waiting_schedule_array.count,
0, temp_schedule_array.count);
return 0;
}
@ -685,8 +711,9 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
if (timer_slot_count > 0)
{
if ((result=fast_mblock_init(&pContext->mblock,
sizeof(FastDelayTask), mblock_alloc_once)) != 0)
if ((result=fast_mblock_init_ex1(&pContext->delay_task_allocator,
"sched_delay_task", sizeof(FastDelayTask),
mblock_alloc_once, 0, NULL, NULL, true)) != 0)
{
free(pContext);
return result;
@ -699,7 +726,9 @@ int sched_start_ex(ScheduleArray *pScheduleArray, pthread_t *ptid,
free(pContext);
return result;
}
if ((result=init_pthread_lock(&pContext->delay_queue.lock)) != 0)
if ((result=fc_queue_init(&pContext->delay_queue, (long)
(&((FastDelayTask *)NULL)->next))) != 0)
{
free(pContext);
return result;
@ -755,6 +784,8 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
void *func_args, const int delay_seconds, const bool new_thread)
{
FastDelayTask *task;
bool notify;
if (!pContext->timer_init)
{
logError("file: "__FILE__", line: %d, "
@ -763,7 +794,8 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
return EOPNOTSUPP;
}
task = (FastDelayTask *)fast_mblock_alloc_object(&pContext->mblock);
task = (FastDelayTask *)fast_mblock_alloc_object(
&pContext->delay_task_allocator);
if (task == NULL)
{
return ENOMEM;
@ -781,18 +813,7 @@ int sched_add_delay_task_ex(ScheduleContext *pContext, TaskFunc task_func,
task->timer.expires = g_current_time;
}
pthread_mutex_lock(&pContext->delay_queue.lock);
if (pContext->delay_queue.head == NULL)
{
pContext->delay_queue.head = task;
}
else
{
pContext->delay_queue.tail->next = task;
}
pContext->delay_queue.tail = task;
pthread_mutex_unlock(&pContext->delay_queue.lock);
fc_queue_push_ex(&pContext->delay_queue, task, &notify);
return 0;
}
@ -806,18 +827,10 @@ int sched_add_delay_task(TaskFunc task_func, void *func_args,
static void sched_deal_task_queue(ScheduleContext *pContext)
{
FastDelayTask *task;
struct fc_queue_info qinfo;
pthread_mutex_lock(&pContext->delay_queue.lock);
if (pContext->delay_queue.head == NULL)
{
pthread_mutex_unlock(&pContext->delay_queue.lock);
return;
}
task = pContext->delay_queue.head;
pContext->delay_queue.head = NULL;
pContext->delay_queue.tail = NULL;
pthread_mutex_unlock(&pContext->delay_queue.lock);
fc_queue_pop_to_queue(&pContext->delay_queue, &qinfo);
task = qinfo.head;
while (task != NULL)
{
fast_timer_add(&pContext->timer, (FastTimerEntry *)task);
@ -849,7 +862,7 @@ static void *sched_call_delay_func(void *args)
logDebug("file: "__FILE__", line: %d, " \
"delay thread exit, task args: %p", __LINE__, task->func_args);
fast_mblock_free_object(&pContext->mblock, task);
fast_mblock_free_object(&pContext->delay_task_allocator, task);
pthread_detach(pthread_self());
return NULL;
}
@ -873,7 +886,7 @@ static void deal_timeout_tasks(ScheduleContext *pContext, FastTimerEntry *head)
if (!task->new_thread)
{
task->task_func(task->func_args);
fast_mblock_free_object(&pContext->mblock, task);
fast_mblock_free_object(&pContext->delay_task_allocator, task);
}
else
{

View File

@ -15,6 +15,7 @@
#include "common_define.h"
#include "fast_timer.h"
#include "fast_mblock.h"
#include "fc_queue.h"
typedef int (*TaskFunc) (void *args);
@ -58,23 +59,16 @@ typedef struct fast_delay_task {
struct fast_delay_task *next;
} FastDelayTask;
typedef struct
{
FastDelayTask *head;
FastDelayTask *tail;
pthread_mutex_t lock;
} FastDelayQueue;
typedef struct
{
ScheduleArray scheduleArray;
ScheduleEntry *head; //schedule chain head
ScheduleEntry *tail; //schedule chain tail
struct fast_mblock_man mblock; //for timer entry
struct fast_mblock_man delay_task_allocator; //for FastDelayTask
FastTimer timer; //for delay task
bool timer_init;
FastDelayQueue delay_queue;
struct fc_queue delay_queue;
bool *pcontinue_flag;
} ScheduleContext;
@ -103,10 +97,9 @@ typedef struct
extern "C" {
#endif
extern volatile bool g_schedule_flag; //schedule continue running flag
extern volatile int g_schedule_flag; //schedule continue running flag
extern volatile time_t g_current_time; //the current time
#define get_current_time() (g_schedule_flag ? g_current_time: time(NULL))
/** generate next id