sched_add_entries use temp ScheduleArray for rare case

pull/37/head
YuQing 2020-03-21 09:50:03 +08:00
parent 21336eee3e
commit edb8b2c4dd
3 changed files with 127 additions and 137 deletions

View File

@ -1,5 +1,5 @@
Version 1.44 2020-03-12 Version 1.44 2020-03-21
* add test file src/tests/test_pthread_lock.c * add test file src/tests/test_pthread_lock.c
* add uniq_skiplist.[hc] * add uniq_skiplist.[hc]
* add function split_string_ex * add function split_string_ex
@ -20,6 +20,7 @@ Version 1.44 2020-03-12
* struct fast_task_info add canceled field for complicated thread model * struct fast_task_info add canceled field for complicated thread model
* nio_thread_data support thread notify * nio_thread_data support thread notify
* pthread_func.[hc] add functions: create_work_threads_ex and fc_create_thread * pthread_func.[hc] add functions: create_work_threads_ex and fc_create_thread
* sched_add_entries use temp ScheduleArray for rare case
Version 1.43 2019-12-25 Version 1.43 2019-12-25
* replace function call system to getExecResult, * replace function call system to getExecResult,

View File

@ -33,7 +33,7 @@ static int sched_dup_array(const ScheduleArray *pSrcArray,
static int sched_cmp_by_next_call_time(const void *p1, const void *p2) static int sched_cmp_by_next_call_time(const void *p1, const void *p2)
{ {
return ((ScheduleEntry *)p1)->next_call_time - \ return ((ScheduleEntry *)p1)->next_call_time -
((ScheduleEntry *)p2)->next_call_time; ((ScheduleEntry *)p2)->next_call_time;
} }
@ -44,6 +44,7 @@ static int sched_init_entries(ScheduleArray *pScheduleArray)
time_t time_base; time_t time_base;
struct tm tm_current; struct tm tm_current;
struct tm tm_base; struct tm tm_base;
time_t current_time;
int remain; int remain;
int interval; int interval;
@ -59,8 +60,8 @@ static int sched_init_entries(ScheduleArray *pScheduleArray)
return 0; return 0;
} }
g_current_time = time(NULL); current_time = time(NULL);
localtime_r((time_t *)&g_current_time, &tm_current); localtime_r((time_t *)&current_time, &tm_current);
pEnd = pScheduleArray->entries + pScheduleArray->count; pEnd = pScheduleArray->entries + pScheduleArray->count;
for (pEntry=pScheduleArray->entries; pEntry<pEnd; pEntry++) for (pEntry=pScheduleArray->entries; pEntry<pEnd; pEntry++)
{ {
@ -79,20 +80,20 @@ static int sched_init_entries(ScheduleArray *pScheduleArray)
if (pEntry->time_base.hour == TIME_NONE) if (pEntry->time_base.hour == TIME_NONE)
{ {
pEntry->next_call_time = g_current_time + \ pEntry->next_call_time = current_time +
pEntry->interval; pEntry->interval;
} }
else else
{ {
if (tm_current.tm_hour > pEntry->time_base.hour || \ if (tm_current.tm_hour > pEntry->time_base.hour ||
(tm_current.tm_hour == pEntry->time_base.hour \ (tm_current.tm_hour == pEntry->time_base.hour
&& tm_current.tm_min >= pEntry->time_base.minute)) && tm_current.tm_min >= pEntry->time_base.minute))
{ {
memcpy(&tm_base, &tm_current, sizeof(struct tm)); tm_base = tm_current;
} }
else else
{ {
time_base = g_current_time - 24 * 3600; time_base = current_time - 24 * 3600;
localtime_r(&time_base, &tm_base); localtime_r(&time_base, &tm_base);
} }
@ -107,7 +108,7 @@ static int sched_init_entries(ScheduleArray *pScheduleArray)
tm_base.tm_sec = 0; tm_base.tm_sec = 0;
} }
time_base = mktime(&tm_base); time_base = mktime(&tm_base);
remain = g_current_time - time_base; remain = current_time - time_base;
if (remain > 0) if (remain > 0)
{ {
interval = pEntry->interval - remain % pEntry->interval; interval = pEntry->interval - remain % pEntry->interval;
@ -121,17 +122,17 @@ static int sched_init_entries(ScheduleArray *pScheduleArray)
interval = 0; interval = 0;
} }
pEntry->next_call_time = g_current_time + interval; pEntry->next_call_time = current_time + interval;
} }
/* /*
{ {
char buff1[32]; char buff1[32];
char buff2[32]; char buff2[32];
logInfo("id=%d, current time=%s, first call time=%s\n", \ logInfo("id=%d, current time=%s, first call time=%s",
pEntry->id, formatDatetime(g_current_time, \ pEntry->id, formatDatetime(current_time,
"%Y-%m-%d %H:%M:%S", buff1, sizeof(buff1)), \ "%Y-%m-%d %H:%M:%S", buff1, sizeof(buff1)),
formatDatetime(pEntry->next_call_time, \ formatDatetime(pEntry->next_call_time,
"%Y-%m-%d %H:%M:%S", buff2, sizeof(buff2))); "%Y-%m-%d %H:%M:%S", buff2, sizeof(buff2)));
} }
*/ */
@ -153,7 +154,7 @@ static void sched_make_chain(ScheduleContext *pContext)
return; return;
} }
qsort(pScheduleArray->entries, pScheduleArray->count, \ qsort(pScheduleArray->entries, pScheduleArray->count,
sizeof(ScheduleEntry), sched_cmp_by_next_call_time); sizeof(ScheduleEntry), sched_cmp_by_next_call_time);
pContext->head = pScheduleArray->entries; pContext->head = pScheduleArray->entries;
@ -210,10 +211,11 @@ static int print_all_sched_entries(ScheduleArray *pScheduleArray)
pEntry->time_base.minute, pEntry->time_base.second); pEntry->time_base.minute, pEntry->time_base.second);
} }
logInfo("id: %u, time_base: %s, interval: %d, " logInfo("id: %u, time_base: %s, interval: %d, "
"new_thread: %s, task_func: %p, args: %p", "new_thread: %s, task_func: %p, args: %p, "
pEntry->id, timebase, pEntry->interval, "next_call_time: %d", pEntry->id, timebase,
pEntry->new_thread ? "true" : "false", pEntry->interval, pEntry->new_thread ? "true" : "false",
pEntry->task_func, pEntry->func_args); pEntry->task_func, pEntry->func_args,
(int)pEntry->next_call_time);
} }
free(sortedByIdArray.entries); free(sortedByIdArray.entries);
@ -314,16 +316,14 @@ static int do_check_waiting(ScheduleContext *pContext)
{ {
if (pWaitingEntry->id == pSchedEntry->id) if (pWaitingEntry->id == pSchedEntry->id)
{ {
memcpy(pSchedEntry, pWaitingEntry, \ *pSchedEntry = *pWaitingEntry;
sizeof(ScheduleEntry));
break; break;
} }
} }
if (pSchedEntry == pSchedEnd) if (pSchedEntry == pSchedEnd)
{ {
memcpy(pSchedEntry, pWaitingEntry, \ *pSchedEntry = *pWaitingEntry;
sizeof(ScheduleEntry));
newCount++; newCount++;
} }
} }
@ -418,6 +418,11 @@ static void *sched_thread_entrance(void *args)
continue; continue;
} }
/*
logInfo("task count: %d, next_call_time: %d, g_current_time: %d",
pContext->scheduleArray.count,
(int)pContext->head->next_call_time, (int)g_current_time);
*/
while (pContext->head->next_call_time > g_current_time && while (pContext->head->next_call_time > g_current_time &&
*(pContext->pcontinue_flag)) *(pContext->pcontinue_flag))
{ {
@ -568,6 +573,7 @@ static int sched_dup_array(const ScheduleArray *pSrcArray, \
return 0; return 0;
} }
/*
static int sched_append_array(const ScheduleArray *pSrcArray, \ static int sched_append_array(const ScheduleArray *pSrcArray, \
ScheduleArray *pDestArray) ScheduleArray *pDestArray)
{ {
@ -605,38 +611,46 @@ static int sched_append_array(const ScheduleArray *pSrcArray, \
pDestArray->count += pSrcArray->count; pDestArray->count += pSrcArray->count;
return 0; return 0;
} }
*/
int sched_add_entries(const ScheduleArray *pScheduleArray) int sched_add_entries(const ScheduleArray *pScheduleArray)
{ {
int result; int result;
ScheduleArray temp_schedule_array;
if (pScheduleArray->count == 0) if (pScheduleArray->count == 0)
{ {
logDebug("file: "__FILE__", line: %d, " \ logDebug("file: "__FILE__", line: %d, "
"no schedule entry", __LINE__); "no schedule entry", __LINE__);
return ENOENT; return ENOENT;
} }
if ((result=sched_dup_array(pScheduleArray,
&temp_schedule_array)) != 0)
{
return result;
}
if ((result=sched_init_entries(&temp_schedule_array)) != 0)
{
return result;
}
if (waiting_schedule_array.entries != NULL) if (waiting_schedule_array.entries != NULL)
{ {
if (g_schedule_flag) if (g_schedule_flag)
{ {
while (waiting_schedule_array.entries != NULL) while (waiting_schedule_array.entries != NULL)
{ {
logDebug("file: "__FILE__", line: %d, " \ logDebug("file: "__FILE__", line: %d, "
"waiting for schedule array ready ...", __LINE__); "waiting for schedule array ready ...", __LINE__);
sleep(1); sleep(1);
} }
} }
} }
if ((result=sched_append_array(pScheduleArray, waiting_schedule_array.entries = temp_schedule_array.entries;
&waiting_schedule_array)) != 0) waiting_schedule_array.count = temp_schedule_array.count;
{ return 0;
return result;
}
return sched_init_entries(&waiting_schedule_array);
} }
int sched_del_entry(const int id) int sched_del_entry(const int id)

View File

@ -10,56 +10,52 @@
#include <sys/stat.h> #include <sys/stat.h>
#include "fastcommon/logger.h" #include "fastcommon/logger.h"
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
#include "fastcommon/sched_thread.h"
#include "fastcommon/server_id_func.h" #include "fastcommon/server_id_func.h"
static int test_open_lseek(const char *filename) static int mblock_stat_task_func1(void *args)
{ {
int result; logInfo("file: "__FILE__", line: %d, func: %s",
int fd; __LINE__, __FUNCTION__);
int bytes;
char buff[1024];
int64_t offset = 1024 * 1024;
fd = open(filename, O_RDONLY);
if (fd < 0) {
result = errno != 0 ? errno : EACCES;
logError("file: "__FILE__", line: %d, "
"open file \"%s\" fail, "
"errno: %d, error info: %s",
__LINE__, filename,
result, STRERROR(result));
return result;
}
if (offset > 0) {
if (lseek(fd, offset, SEEK_SET) < 0) {
result = errno != 0 ? errno : EACCES;
logError("file: "__FILE__", line: %d, "
"lseek file \"%s\" fail, offset: %"PRId64", "
"errno: %d, error info: %s", __LINE__,
filename, offset,
result, STRERROR(result));
return result;
} else {
logInfo("lseek %"PRId64" successfully.", offset);
}
}
if ((bytes=read(fd, buff, sizeof(buff))) < 0) {
result = errno != 0 ? errno : EACCES;
logError("file: "__FILE__", line: %d, "
"read file \"%s\" fail, offset: %"PRId64", "
"errno: %d, error info: %s", __LINE__,
filename, offset, result, STRERROR(result));
return result;
}
printf("read bytes: %d\n", bytes);
close(fd);
return 0; return 0;
} }
static int mblock_stat_task_func2(void *args)
{
sched_print_all_entries();
logInfo("file: "__FILE__", line: %d, func: %s",
__LINE__, __FUNCTION__);
return 0;
}
volatile bool continue_flag = true;
static pthread_t tid;
static int setup_mblock_stat_task()
{
ScheduleEntry schedule_entry[2];
ScheduleArray schedule_array;
INIT_SCHEDULE_ENTRY(schedule_entry[1], sched_generate_next_id(),
TIME_NONE, TIME_NONE, TIME_NONE, 1, mblock_stat_task_func1, NULL);
INIT_SCHEDULE_ENTRY(schedule_entry[0], sched_generate_next_id(),
0, 0, 0, 1, mblock_stat_task_func2, NULL);
schedule_array.count = 2;
schedule_array.entries = schedule_entry;
return sched_start(&schedule_array, &tid,
64 * 1024, (bool *)&continue_flag);
}
static void sigQuitHandler(int sig)
{
if (continue_flag) {
continue_flag = false;
logCrit("file: "__FILE__", line: %d, "
"catch signal %d, program exiting...",
__LINE__, sig);
}
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
int result; int result;
@ -69,54 +65,16 @@ int main(int argc, char *argv[])
const int min_hosts_each_group = 1; const int min_hosts_each_group = 1;
const bool share_between_groups = true; const bool share_between_groups = true;
FastBuffer buffer; FastBuffer buffer;
struct sigaction act;
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
if (argc > 1) { if (argc > 1) {
config_filename = argv[1]; config_filename = argv[1];
} }
log_init(); log_init();
{
union {
int64_t flags;
struct {
union {
int flags: 4;
struct {
bool ns: 1; //namespace
bool pt: 1; //path
bool hc: 1; //hash code
};
} path_info;
bool user_data : 1;
bool extra_data: 1;
bool mode : 1;
bool ctime: 1;
bool mtime: 1;
bool size : 1;
};
} options;
char *endptr;
int64_t n;
endptr = NULL;
n = strtoll(argv[1], &endptr, 10);
printf("sizeof(mode_t): %d\n", (int)sizeof(mode_t));
printf("sizeof(options): %d\n", (int)sizeof(options));
options.path_info.ns = options.path_info.pt = options.path_info.hc = 1;
printf("union flags: %d\n", options.path_info.flags);
printf("n: %"PRId64", endptr: %s(%d)\n", n, endptr, (int)strlen(endptr));
n = snprintf(NULL, 0, "%"PRId64, n);
printf("expect len: %d\n", (int)n);
test_open_lseek(config_filename);
return 1;
}
if ((result=fc_server_load_from_file_ex(&ctx, config_filename, if ((result=fc_server_load_from_file_ex(&ctx, config_filename,
default_port, min_hosts_each_group, default_port, min_hosts_each_group,
share_between_groups)) != 0) share_between_groups)) != 0)
@ -124,6 +82,21 @@ int main(int argc, char *argv[])
return result; return result;
} }
act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 ||
sigaction(SIGTERM, &act, NULL) < 0 ||
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, "
"call sigaction fail, errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
logCrit("exit abnormally!\n");
return errno;
}
setup_mblock_stat_task();
if ((result=fast_buffer_init_ex(&buffer, 1024)) != 0) { if ((result=fast_buffer_init_ex(&buffer, 1024)) != 0) {
return result; return result;
} }
@ -131,6 +104,8 @@ int main(int argc, char *argv[])
printf("%.*s", buffer.length, buffer.data); printf("%.*s", buffer.length, buffer.data);
//printf("%.*s\n(%d)", buffer.length, buffer.data, buffer.length); //printf("%.*s\n(%d)", buffer.length, buffer.data, buffer.length);
sleep(10);
fast_buffer_destroy(&buffer); fast_buffer_destroy(&buffer);
//fc_server_to_log(&ctx); //fc_server_to_log(&ctx);