From cb820096568a3a592dc30705ec35179629c547bb Mon Sep 17 00:00:00 2001 From: yuqing Date: Tue, 26 Jan 2016 09:42:53 +0800 Subject: [PATCH] add fast_blocked_queue.[hc] --- HISTORY | 3 +- src/Makefile.in | 6 +- src/fast_blocked_queue.c | 123 +++++++++++++++++++++++++++++++++++++++ src/fast_blocked_queue.h | 43 ++++++++++++++ 4 files changed, 171 insertions(+), 4 deletions(-) create mode 100644 src/fast_blocked_queue.c create mode 100644 src/fast_blocked_queue.h diff --git a/HISTORY b/HISTORY index 0376c69..5858244 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.24 2016-01-19 +Version 1.24 2016-01-26 * php extension compiled on PHP 7 * add skiplist which support stable sort * make.sh: use sed to replace perl @@ -8,6 +8,7 @@ Version 1.24 2016-01-19 * add function get_mounted_filesystems * add function get_processes for Linux * ini_file_reader add iniGetSectionNames and iniGetSectionItems + * add fast_blocked_queue.[hc] Version 1.23 2015-11-16 * sched_thread.c: task can execute in a new thread diff --git a/src/Makefile.in b/src/Makefile.in index 4c4841b..1c9f473 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -11,7 +11,7 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \ fast_timer.lo process_ctrl.lo fast_mblock.lo \ connection_pool.lo fast_mpool.lo fast_allocator.lo \ fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \ - system_info.lo + system_info.lo fast_blocked_queue.lo FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ logger.o sockopt.o base64.o sched_thread.o \ @@ -20,7 +20,7 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ fast_timer.o process_ctrl.o fast_mblock.o \ connection_pool.o fast_mpool.o fast_allocator.o \ fast_buffer.o multi_skiplist.o flat_skiplist.o \ - system_info.o + system_info.o fast_blocked_queue.o HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ @@ -29,7 +29,7 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ fast_timer.h process_ctrl.h fast_mblock.h \ connection_pool.h fast_mpool.h fast_allocator.h \ fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h \ - skiplist_common.h system_info.h + skiplist_common.h system_info.h fast_blocked_queue.h ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) diff --git a/src/fast_blocked_queue.c b/src/fast_blocked_queue.c new file mode 100644 index 0000000..1d89aab --- /dev/null +++ b/src/fast_blocked_queue.c @@ -0,0 +1,123 @@ +//fast_blocked_queue.c + +#include +#include +#include +#include "logger.h" +#include "shared_func.h" +#include "pthread_func.h" +#include "fast_blocked_queue.h" + +int blocked_queue_init(struct fast_blocked_queue *pQueue) +{ + int result; + + if ((result=init_pthread_lock(&(pQueue->lock))) != 0) + { + logError("file: "__FILE__", line: %d, " + "init_pthread_lock fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + result = pthread_cond_init(&(pQueue->cond), NULL); + if (result != 0) + { + logError("file: "__FILE__", line: %d, " + "pthread_cond_init fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + pQueue->head = NULL; + pQueue->tail = NULL; + + return 0; +} + +int blocked_queue_push(struct fast_blocked_queue *pQueue, + struct fast_task_info *pTask) +{ + int result; + bool notify; + + if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "call pthread_mutex_lock fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + return result; + } + + pTask->next = NULL; + if (pQueue->tail == NULL) + { + pQueue->head = pTask; + notify = true; + } + else + { + pQueue->tail->next = pTask; + notify = false; + } + pQueue->tail = pTask; + + if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "call pthread_mutex_unlock fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + } + + if (notify) + { + pthread_cond_signal(&(pQueue->cond)); + } + + return 0; +} + +struct fast_task_info *blocked_queue_pop(struct fast_blocked_queue *pQueue) +{ + struct fast_task_info *pTask; + int result; + + if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "call pthread_mutex_lock fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + return NULL; + } + + pTask = pQueue->head; + if (pTask == NULL) + { + pthread_cond_wait(&(pQueue->cond), &(pQueue->lock)); + pTask = pQueue->head; + } + + if (pTask != NULL) + { + pQueue->head = pTask->next; + if (pQueue->head == NULL) + { + pQueue->tail = NULL; + } + } + + if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) + { + logError("file: "__FILE__", line: %d, " \ + "call pthread_mutex_unlock fail, " \ + "errno: %d, error info: %s", \ + __LINE__, result, STRERROR(result)); + } + + return pTask; +} + diff --git a/src/fast_blocked_queue.h b/src/fast_blocked_queue.h new file mode 100644 index 0000000..e4ef303 --- /dev/null +++ b/src/fast_blocked_queue.h @@ -0,0 +1,43 @@ +/** +* Copyright (C) 2008 Happy Fish / YuQing +* +* FastDFS may be copied only under the terms of the GNU General +* Public License V3, which may be found in the FastDFS source kit. +* Please visit the FastDFS Home Page http://www.csource.org/ for more detail. +**/ + +//fast_blocked_queue.h + +#ifndef _FAST_BLOCKED_QUEUE_H +#define _FAST_BLOCKED_QUEUE_H + +#include +#include +#include +#include +#include "common_define.h" +#include "fast_task_queue.h" + +struct fast_blocked_queue +{ + struct fast_task_info *head; + struct fast_task_info *tail; + pthread_mutex_t lock; + pthread_cond_t cond; +}; + +#ifdef __cplusplus +extern "C" { +#endif + +int blocked_queue_init(struct fast_blocked_queue *pQueue); +int blocked_queue_push(struct fast_blocked_queue *pQueue, + struct fast_task_info *pTask); +struct fast_task_info *blocked_queue_pop(struct fast_blocked_queue *pQueue); + +#ifdef __cplusplus +} +#endif + +#endif +