diff --git a/src/tests/Makefile b/src/tests/Makefile index 0f68b46..17a6acc 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile @@ -4,7 +4,7 @@ COMPILE = $(CC) -g -O1 -Wall -D_FILE_OFFSET_BITS=64 -g -DDEBUG_FLAG INC_PATH = -I/usr/include/fastcommon LIB_PATH = -lfastcommon -ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock +ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blocked_queue all: $(ALL_PRGS) .c: diff --git a/src/tests/test_blocked_queue.c b/src/tests/test_blocked_queue.c new file mode 100644 index 0000000..910d14b --- /dev/null +++ b/src/tests/test_blocked_queue.c @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include +#include +#include +#include "logger.h" +#include "shared_func.h" +#include "sched_thread.h" +#include "ini_file_reader.h" +#include "fast_task_queue.h" +#include "fast_blocked_queue.h" + +static bool g_continue_flag = true; +static int64_t produce_count = 0; +static int64_t consume_count = 0; +static struct fast_blocked_queue blocked_queue; + +#define MAX_USLEEP 10000 + +void *producer_thread(void *arg) +{ + int usleep_time; + int64_t count; + struct fast_task_info *pTask; + + while (g_continue_flag) { + usleep_time = (int64_t) MAX_USLEEP * (int64_t)rand() / RAND_MAX; + if (usleep_time > 0) { + usleep(usleep_time); + } + + count = __sync_add_and_fetch(&produce_count, 1); + if (count % 10000 == 0) { + printf("produce count: %"PRId64"\n", count); + } + + pTask = free_queue_pop(); + if (pTask != NULL) { + blocked_queue_push(&blocked_queue, pTask); + } + } + + return NULL; +} + +static void sigQuitHandler(int sig) +{ + g_continue_flag = false; + blocked_queue_terminate(&blocked_queue); + + logCrit("file: "__FILE__", line: %d, " \ + "catch signal %d, program exiting...", \ + __LINE__, sig); +} + +int main(int argc, char *argv[]) +{ + pthread_t tid; + struct sigaction act; + const int min_buff_size = 1024; + const int max_buff_size = 1024; + const int arg_size = 0; + int result; + int64_t count; + struct fast_task_info *pTask; + + srand(time(NULL)); + log_init(); + g_log_context.log_level = LOG_DEBUG; + + memset(&act, 0, sizeof(act)); + sigemptyset(&act.sa_mask); + act.sa_handler = sigQuitHandler; + if(sigaction(SIGINT, &act, NULL) < 0 || + sigaction(SIGTERM, &act, NULL) < 0 || + sigaction(SIGQUIT, &act, NULL) < 0) + { + logCrit("file: "__FILE__", line: %d, " \ + "call sigaction fail, errno: %d, error info: %s", \ + __LINE__, errno, STRERROR(errno)); + logCrit("exit abnormally!\n"); + return errno; + } + + result = free_queue_init(1024, min_buff_size, \ + max_buff_size, arg_size); + if (result != 0) { + return result; + } + + if ((result=blocked_queue_init(&blocked_queue)) != 0) { + return result; + } + + pthread_create(&tid, NULL, producer_thread, NULL); + pthread_create(&tid, NULL, producer_thread, NULL); + pthread_create(&tid, NULL, producer_thread, NULL); + pthread_create(&tid, NULL, producer_thread, NULL); + + while (g_continue_flag) { + pTask = blocked_queue_pop(&blocked_queue); + if (pTask != NULL) { + count = __sync_add_and_fetch(&consume_count, 1); + if (count % 10000 == 0) { + printf("consume count: %"PRId64"\n", count); + } + free_queue_push(pTask); + usleep(1000); + } + } + + return 0; +} +