diff --git a/HISTORY b/HISTORY index 5be34f3..c835420 100644 --- a/HISTORY +++ b/HISTORY @@ -1,5 +1,5 @@ -Version 1.44 2020-05-21 +Version 1.44 2020-06-01 * add test file src/tests/test_pthread_lock.c * add uniq_skiplist.[hc] * add function split_string_ex @@ -30,6 +30,7 @@ Version 1.44 2020-05-21 * fast_task_queue.[hc]: free_queue support init_callback * ini_file_reader.c: use mutex lock when access dynamic content array * uniq_skiplist add function uniq_skiplist_replace_ex + * add fc_queue.[hc] Version 1.43 2019-12-25 * replace function call system to getExecResult, diff --git a/src/Makefile.in b/src/Makefile.in index d2b2f51..511edc5 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -15,7 +15,8 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \ system_info.lo fast_blocked_queue.lo id_generator.lo \ char_converter.lo char_convert_loader.lo common_blocked_queue.lo \ multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \ - json_parser.lo buffered_file_writer.lo server_id_func.lo + json_parser.lo buffered_file_writer.lo server_id_func.lo \ + fc_queue.lo FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ logger.o sockopt.o base64.o sched_thread.o \ @@ -27,7 +28,8 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \ system_info.o fast_blocked_queue.o id_generator.o \ char_converter.o char_convert_loader.o common_blocked_queue.o \ multi_socket_client.o skiplist_set.o uniq_skiplist.o \ - json_parser.o buffered_file_writer.o server_id_func.lo + json_parser.o buffered_file_writer.o server_id_func.o \ + fc_queue.o HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ shared_func.h pthread_func.h ini_file_reader.h _os_define.h \ @@ -40,7 +42,8 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ php7_ext_wrapper.h id_generator.h char_converter.h \ char_convert_loader.h common_blocked_queue.h \ multi_socket_client.h skiplist_set.h uniq_skiplist.h \ - fc_list.h json_parser.h buffered_file_writer.h server_id_func.h + fc_list.h json_parser.h buffered_file_writer.h server_id_func.h \ + fc_queue.h ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) diff --git a/src/fc_queue.c b/src/fc_queue.c new file mode 100644 index 0000000..d523d83 --- /dev/null +++ b/src/fc_queue.c @@ -0,0 +1,112 @@ +//fc_queue.c + +#include +#include +#include +#include "logger.h" +#include "shared_func.h" +#include "pthread_func.h" +#include "fc_queue.h" + +#define FC_QUEUE_NEXT_PTR(queue, data) \ + *((void **)(((char *)data) + queue->next_ptr_offset)) + +int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset) +{ + int result; + + if ((result=init_pthread_lock(&queue->lock)) != 0) { + logError("file: "__FILE__", line: %d, " + "init_pthread_lock fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + if ((result=pthread_cond_init(&queue->cond, NULL)) != 0) { + logError("file: "__FILE__", line: %d, " + "pthread_cond_init fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + queue->head = NULL; + queue->tail = NULL; + queue->next_ptr_offset = next_ptr_offset; + return 0; +} + +void fc_queue_destroy(struct fc_queue *queue) +{ + pthread_cond_destroy(&queue->cond); + pthread_mutex_destroy(&queue->lock); +} + +void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify) +{ + PTHREAD_MUTEX_LOCK(&queue->lock); + FC_QUEUE_NEXT_PTR(queue, data) = NULL; + if (queue->tail == NULL) { + queue->head = data; + *notify = true; + } else { + FC_QUEUE_NEXT_PTR(queue, queue->tail) = data; + *notify = false; + } + queue->tail = data; + + PTHREAD_MUTEX_UNLOCK(&queue->lock); +} + +void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked) +{ + void *data; + + PTHREAD_MUTEX_LOCK(&queue->lock); + do { + data = queue->head; + if (data == NULL) { + if (!blocked) { + break; + } + + pthread_cond_wait(&queue->cond, &queue->lock); + data = queue->head; + } + + if (data != NULL) { + queue->head = FC_QUEUE_NEXT_PTR(queue, data); + if (queue->head == NULL) { + queue->tail = NULL; + } + } + } while (0); + + PTHREAD_MUTEX_UNLOCK(&queue->lock); + return data; +} + +void *fc_queue_pop_all_ex(struct fc_queue *queue, const bool blocked) +{ + void *data; + + PTHREAD_MUTEX_LOCK(&queue->lock); + do { + data = queue->head; + if (data == NULL) { + if (!blocked) { + break; + } + + pthread_cond_wait(&queue->cond, &queue->lock); + data = queue->head; + } + + if (data != NULL) { + queue->head = queue->tail = NULL; + } + } while (0); + + PTHREAD_MUTEX_UNLOCK(&queue->lock); + return data; +} diff --git a/src/fc_queue.h b/src/fc_queue.h new file mode 100644 index 0000000..7d60c8a --- /dev/null +++ b/src/fc_queue.h @@ -0,0 +1,69 @@ +//fc_queue.h + +#ifndef _FC_QUEUE_H +#define _FC_QUEUE_H + +#include +#include +#include +#include +#include "common_define.h" +#include "fast_mblock.h" + +struct fc_queue +{ + void *head; + void *tail; + pthread_mutex_t lock; + pthread_cond_t cond; + int next_ptr_offset; +}; + +#ifdef __cplusplus +extern "C" { +#endif + +int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset); + +void fc_queue_destroy(struct fc_queue *queue); + +static inline void fc_queue_terminate(struct fc_queue *queue) +{ + pthread_cond_signal(&queue->cond); +} + +static inline void fc_queue_terminate_all( + struct fc_queue *queue, const int count) +{ + int i; + for (i=0; icond)); + } +} + +//notify by the caller +void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify); + +static inline void fc_queue_push(struct fc_queue *queue, void *data) +{ + bool notify; + + fc_queue_push_ex(queue, data, ¬ify); + if (notify) { + pthread_cond_signal(&(queue->cond)); + } +} + +void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked); +#define fc_queue_pop(queue) fc_queue_pop_ex(queue, true) +#define fc_queue_try_pop(queue) fc_queue_pop_ex(queue, false) + +void *fc_queue_pop_all_ex(struct fc_queue *queue, const bool blocked); +#define fc_queue_pop_all(queue) fc_queue_pop_all_ex(queue, true) +#define fc_queue_try_pop_all(queue) fc_queue_pop_all_ex(queue, false) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/shared_func.c b/src/shared_func.c index be676cb..a478079 100644 --- a/src/shared_func.c +++ b/src/shared_func.c @@ -3102,3 +3102,32 @@ void fc_free_buffer(BufferInfo *buffer) buffer->alloc_size = buffer->length = 0; } } + +int fc_check_mkdir_ex(const char *path, const mode_t mode, bool *create) +{ + int result; + + *create = false; + if (access(path, F_OK) == 0) { + return 0; + } + + result = errno != 0 ? errno : EPERM; + if (result != ENOENT) { + logError("file: "__FILE__", line: %d, " + "access %s fail, errno: %d, error info: %s", + __LINE__, path, result, STRERROR(result)); + return result; + } + + if (mkdir(path, mode) != 0) { + result = errno != 0 ? errno : EPERM; + logError("file: "__FILE__", line: %d, " + "mkdir %s fail, errno: %d, error info: %s", + __LINE__, path, result, STRERROR(result)); + return result; + } + + *create = true; + return 0; +} diff --git a/src/shared_func.h b/src/shared_func.h index 251c47c..8c54d12 100644 --- a/src/shared_func.h +++ b/src/shared_func.h @@ -963,6 +963,14 @@ static inline int fc_get_umask() return mode; } +int fc_check_mkdir_ex(const char *path, const mode_t mode, bool *create); + +static inline int fc_check_mkdir(const char *path, const mode_t mode) +{ + bool create; + return fc_check_mkdir_ex(path, mode, &create); +} + #ifdef __cplusplus } #endif