diff --git a/src/Makefile.in b/src/Makefile.in index 23993ec..10f595e 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -43,7 +43,7 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \ char_convert_loader.h common_blocked_queue.h \ multi_socket_client.h skiplist_set.h uniq_skiplist.h \ fc_list.h json_parser.h buffered_file_writer.h server_id_func.h \ - fc_queue.h fc_memory.h shared_buffer.h thread_pool.h + fc_queue.h fc_memory.h shared_buffer.h thread_pool.h fc_atomic.h ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS) diff --git a/src/common_blocked_queue.c b/src/common_blocked_queue.c index e36725f..2b96f2e 100644 --- a/src/common_blocked_queue.c +++ b/src/common_blocked_queue.c @@ -14,23 +14,11 @@ int common_blocked_queue_init_ex(struct common_blocked_queue *queue, const int64_t alloc_elements_limit = 0; int result; - if ((result=init_pthread_lock(&queue->lock)) != 0) + if ((result=init_pthread_lock_cond_pair(&queue->lc_pair)) != 0) { - logError("file: "__FILE__", line: %d, " - "init_pthread_lock fail, errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); return result; } - if ((result=pthread_cond_init(&queue->cond, NULL)) != 0) - { - logError("file: "__FILE__", line: %d, " - "pthread_cond_init fail, " - "errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); - return result; - } - if ((result=fast_mblock_init_ex1(&queue->mblock, "queue_node", sizeof(struct common_blocked_node), alloc_elements_once, alloc_elements_limit, @@ -47,8 +35,7 @@ int common_blocked_queue_init_ex(struct common_blocked_queue *queue, void common_blocked_queue_destroy(struct common_blocked_queue *queue) { - pthread_cond_destroy(&queue->cond); - pthread_mutex_destroy(&queue->lock); + destroy_pthread_lock_cond_pair(&queue->lc_pair); fast_mblock_destroy(&queue->mblock); } @@ -58,7 +45,7 @@ int common_blocked_queue_push_ex(struct common_blocked_queue *queue, int result; struct common_blocked_node *node; - if ((result=pthread_mutex_lock(&(queue->lock))) != 0) + if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ @@ -71,7 +58,7 @@ int common_blocked_queue_push_ex(struct common_blocked_queue *queue, &queue->mblock); if (node == NULL) { - pthread_mutex_unlock(&(queue->lock)); + pthread_mutex_unlock(&(queue->lc_pair.lock)); return ENOMEM; } @@ -89,7 +76,7 @@ int common_blocked_queue_push_ex(struct common_blocked_queue *queue, } queue->tail = node; - if ((result=pthread_mutex_unlock(&(queue->lock))) != 0) + if ((result=pthread_mutex_unlock(&(queue->lc_pair.lock))) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ @@ -115,14 +102,14 @@ void common_blocked_queue_return_nodes(struct common_blocked_queue *queue, last = last->next; } - pthread_mutex_lock(&(queue->lock)); + pthread_mutex_lock(&(queue->lc_pair.lock)); last->next = queue->head; queue->head = node; if (queue->tail == NULL) { queue->tail = last; } - pthread_mutex_unlock(&(queue->lock)); + pthread_mutex_unlock(&(queue->lc_pair.lock)); } void *common_blocked_queue_pop_ex(struct common_blocked_queue *queue, @@ -132,7 +119,7 @@ void *common_blocked_queue_pop_ex(struct common_blocked_queue *queue, void *data; int result; - if ((result=pthread_mutex_lock(&(queue->lock))) != 0) + if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_lock fail, " \ @@ -151,7 +138,7 @@ void *common_blocked_queue_pop_ex(struct common_blocked_queue *queue, break; } - pthread_cond_wait(&(queue->cond), &(queue->lock)); + pthread_cond_wait(&(queue->lc_pair.cond), &(queue->lc_pair.lock)); node = queue->head; } @@ -172,7 +159,7 @@ void *common_blocked_queue_pop_ex(struct common_blocked_queue *queue, } } while (0); - if ((result=pthread_mutex_unlock(&(queue->lock))) != 0) + if ((result=pthread_mutex_unlock(&(queue->lc_pair.lock))) != 0) { logError("file: "__FILE__", line: %d, " \ "call pthread_mutex_unlock fail, " \ @@ -189,7 +176,7 @@ struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex( struct common_blocked_node *node; int result; - if ((result=pthread_mutex_lock(&(queue->lock))) != 0) + if ((result=pthread_mutex_lock(&(queue->lc_pair.lock))) != 0) { logError("file: "__FILE__", line: %d, " "call pthread_mutex_lock fail, " @@ -202,13 +189,13 @@ struct common_blocked_node *common_blocked_queue_pop_all_nodes_ex( { if (blocked) { - pthread_cond_wait(&(queue->cond), &(queue->lock)); + pthread_cond_wait(&(queue->lc_pair.cond), &(queue->lc_pair.lock)); } } node = queue->head; queue->head = queue->tail = NULL; - if ((result=pthread_mutex_unlock(&(queue->lock))) != 0) + if ((result=pthread_mutex_unlock(&(queue->lc_pair.lock))) != 0) { logError("file: "__FILE__", line: %d, " "call pthread_mutex_unlock fail, " @@ -224,11 +211,11 @@ void common_blocked_queue_free_all_nodes(struct common_blocked_queue *queue, { struct common_blocked_node *deleted; - pthread_mutex_lock(&(queue->lock)); + pthread_mutex_lock(&(queue->lc_pair.lock)); while (node != NULL) { deleted = node; node = node->next; fast_mblock_free_object(&queue->mblock, deleted); } - pthread_mutex_unlock(&(queue->lock)); + pthread_mutex_unlock(&(queue->lc_pair.lock)); } diff --git a/src/common_blocked_queue.h b/src/common_blocked_queue.h index bb8bb43..f23c92c 100644 --- a/src/common_blocked_queue.h +++ b/src/common_blocked_queue.h @@ -29,8 +29,7 @@ struct common_blocked_queue struct common_blocked_node *head; struct common_blocked_node *tail; struct fast_mblock_man mblock; - pthread_mutex_t lock; - pthread_cond_t cond; + pthread_lock_cond_pair_t lc_pair; }; #ifdef __cplusplus @@ -48,7 +47,7 @@ void common_blocked_queue_destroy(struct common_blocked_queue *queue); static inline void common_blocked_queue_terminate( struct common_blocked_queue *queue) { - pthread_cond_signal(&(queue->cond)); + pthread_cond_signal(&(queue->lc_pair.cond)); } static inline void common_blocked_queue_terminate_all( @@ -57,7 +56,7 @@ static inline void common_blocked_queue_terminate_all( int i; for (i=0; icond)); + pthread_cond_signal(&(queue->lc_pair.cond)); } } @@ -75,7 +74,7 @@ static inline int common_blocked_queue_push(struct common_blocked_queue { if (notify) { - pthread_cond_signal(&(queue->cond)); + pthread_cond_signal(&(queue->lc_pair.cond)); } } diff --git a/src/common_define.h b/src/common_define.h index 9643431..a66f1fd 100644 --- a/src/common_define.h +++ b/src/common_define.h @@ -204,6 +204,12 @@ typedef struct int count; } key_value_array_t; +typedef struct +{ + pthread_mutex_t lock; + pthread_cond_t cond; +} pthread_lock_cond_pair_t; + typedef void (*FreeDataFunc)(void *ptr); typedef int (*CompareFunc)(void *p1, void *p2); typedef void* (*MallocFunc)(size_t size); diff --git a/src/fc_atomic.h b/src/fc_atomic.h new file mode 100644 index 0000000..9043033 --- /dev/null +++ b/src/fc_atomic.h @@ -0,0 +1,20 @@ +#ifndef _FC_ATOMIC_H +#define _FC_ATOMIC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define FC_ATOMIC_CAS(var, old_value, new_value) \ + do { \ + if (__sync_bool_compare_and_swap(&var, old_value, new_value)) { \ + break; \ + } \ + old_value = __sync_add_and_fetch(&var, 0); \ + } while (new_value != old_value) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/fc_queue.c b/src/fc_queue.c index 9100efd..b68e80a 100644 --- a/src/fc_queue.c +++ b/src/fc_queue.c @@ -15,21 +15,11 @@ int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset) { int result; - if ((result=init_pthread_lock(&queue->lock)) != 0) { - logError("file: "__FILE__", line: %d, " - "init_pthread_lock fail, errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); + if ((result=init_pthread_lock_cond_pair(&queue->lc_pair)) != 0) + { return result; } - if ((result=pthread_cond_init(&queue->cond, NULL)) != 0) { - logError("file: "__FILE__", line: %d, " - "pthread_cond_init fail, " - "errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); - return result; - } - queue->head = NULL; queue->tail = NULL; queue->next_ptr_offset = next_ptr_offset; @@ -38,13 +28,12 @@ int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset) void fc_queue_destroy(struct fc_queue *queue) { - pthread_cond_destroy(&queue->cond); - pthread_mutex_destroy(&queue->lock); + destroy_pthread_lock_cond_pair(&queue->lc_pair); } void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify) { - PTHREAD_MUTEX_LOCK(&queue->lock); + PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock); FC_QUEUE_NEXT_PTR(queue, data) = NULL; if (queue->tail == NULL) { queue->head = data; @@ -55,14 +44,14 @@ void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify) } queue->tail = data; - PTHREAD_MUTEX_UNLOCK(&queue->lock); + PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); } void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked) { void *data; - PTHREAD_MUTEX_LOCK(&queue->lock); + PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock); do { data = queue->head; if (data == NULL) { @@ -70,7 +59,7 @@ void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked) break; } - pthread_cond_wait(&queue->cond, &queue->lock); + pthread_cond_wait(&queue->lc_pair.cond, &queue->lc_pair.lock); data = queue->head; } @@ -82,7 +71,7 @@ void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked) } } while (0); - PTHREAD_MUTEX_UNLOCK(&queue->lock); + PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); return data; } @@ -90,7 +79,7 @@ void *fc_queue_pop_all_ex(struct fc_queue *queue, const bool blocked) { void *data; - PTHREAD_MUTEX_LOCK(&queue->lock); + PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock); do { data = queue->head; if (data == NULL) { @@ -98,7 +87,7 @@ void *fc_queue_pop_all_ex(struct fc_queue *queue, const bool blocked) break; } - pthread_cond_wait(&queue->cond, &queue->lock); + pthread_cond_wait(&queue->lc_pair.cond, &queue->lc_pair.lock); data = queue->head; } @@ -107,7 +96,7 @@ void *fc_queue_pop_all_ex(struct fc_queue *queue, const bool blocked) } } while (0); - PTHREAD_MUTEX_UNLOCK(&queue->lock); + PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); return data; } @@ -119,7 +108,7 @@ void fc_queue_push_queue_to_head_ex(struct fc_queue *queue, return; } - PTHREAD_MUTEX_LOCK(&queue->lock); + PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock); FC_QUEUE_NEXT_PTR(queue, qinfo->tail) = queue->head; queue->head = qinfo->head; if (queue->tail == NULL) { @@ -128,13 +117,13 @@ void fc_queue_push_queue_to_head_ex(struct fc_queue *queue, } else { *notify = false; } - PTHREAD_MUTEX_UNLOCK(&queue->lock); + PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); } void fc_queue_pop_to_queue(struct fc_queue *queue, struct fc_queue_info *qinfo) { - PTHREAD_MUTEX_LOCK(&queue->lock); + PTHREAD_MUTEX_LOCK(&queue->lc_pair.lock); if (queue->head != NULL) { qinfo->head = queue->head; qinfo->tail = queue->tail; @@ -142,5 +131,5 @@ void fc_queue_pop_to_queue(struct fc_queue *queue, } else { qinfo->head = qinfo->tail = NULL; } - PTHREAD_MUTEX_UNLOCK(&queue->lock); + PTHREAD_MUTEX_UNLOCK(&queue->lc_pair.lock); } diff --git a/src/fc_queue.h b/src/fc_queue.h index 08888ca..7b3f85c 100644 --- a/src/fc_queue.h +++ b/src/fc_queue.h @@ -20,8 +20,7 @@ struct fc_queue { void *head; void *tail; - pthread_mutex_t lock; - pthread_cond_t cond; + pthread_lock_cond_pair_t lc_pair; int next_ptr_offset; }; @@ -35,7 +34,7 @@ void fc_queue_destroy(struct fc_queue *queue); static inline void fc_queue_terminate(struct fc_queue *queue) { - pthread_cond_signal(&queue->cond); + pthread_cond_signal(&queue->lc_pair.cond); } static inline void fc_queue_terminate_all( @@ -43,7 +42,7 @@ static inline void fc_queue_terminate_all( { int i; for (i=0; icond)); + pthread_cond_signal(&(queue->lc_pair.cond)); } } @@ -56,7 +55,7 @@ static inline void fc_queue_push(struct fc_queue *queue, void *data) fc_queue_push_ex(queue, data, ¬ify); if (notify) { - pthread_cond_signal(&(queue->cond)); + pthread_cond_signal(&(queue->lc_pair.cond)); } } @@ -70,7 +69,7 @@ static inline void fc_queue_push_queue_to_head(struct fc_queue *queue, fc_queue_push_queue_to_head_ex(queue, qinfo, ¬ify); if (notify) { - pthread_cond_signal(&(queue->cond)); + pthread_cond_signal(&(queue->lc_pair.cond)); } } diff --git a/src/ini_file_reader.c b/src/ini_file_reader.c index 336b1c9..2208b94 100644 --- a/src/ini_file_reader.c +++ b/src/ini_file_reader.c @@ -116,7 +116,7 @@ static SetDirectiveVars *iniGetVars(IniContext *pContext); #define RETRY_FETCH_GLOBAL(szSectionName, bRetryGlobal) \ - ((szSectionName != NULL && szSectionName != '\0') && bRetryGlobal) + ((szSectionName != NULL && *szSectionName != '\0') && bRetryGlobal) static void iniDoSetAnnotations(AnnotationEntry *src, const int src_count, AnnotationEntry *dest, int *dest_count) diff --git a/src/ioevent_loop.c b/src/ioevent_loop.c index f8e9591..aff8a3c 100644 --- a/src/ioevent_loop.c +++ b/src/ioevent_loop.c @@ -89,7 +89,7 @@ int ioevent_loop(struct nio_thread_data *pThreadData, int count; memset(&ev_notify, 0, sizeof(ev_notify)); - ev_notify.fd = pThreadData->pipe_fds[0]; + ev_notify.fd = FC_NOTIFY_READ_FD(pThreadData); ev_notify.callback = recv_notify_callback; ev_notify.timer.data = pThreadData; if (ioevent_attach(&pThreadData->ev_puller, diff --git a/src/pthread_func.c b/src/pthread_func.c index 224495b..6de66d0 100644 --- a/src/pthread_func.c +++ b/src/pthread_func.c @@ -257,3 +257,33 @@ int fc_create_thread(pthread_t *tid, void *(*start_func)(void *), pthread_attr_destroy(&thread_attr); return result; } + +int init_pthread_lock_cond_pair(pthread_lock_cond_pair_t *lcp) +{ + int result; + + if ((result=init_pthread_lock(&lcp->lock)) != 0) + { + logError("file: "__FILE__", line: %d, " + "init_pthread_lock fail, errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + if ((result=pthread_cond_init(&lcp->cond, NULL)) != 0) + { + logError("file: "__FILE__", line: %d, " + "pthread_cond_init fail, " + "errno: %d, error info: %s", + __LINE__, result, STRERROR(result)); + return result; + } + + return 0; +} + +void destroy_pthread_lock_cond_pair(pthread_lock_cond_pair_t *lcp) +{ + pthread_cond_destroy(&lcp->cond); + pthread_mutex_destroy(&lcp->lock); +} diff --git a/src/pthread_func.h b/src/pthread_func.h index a5c78ef..c9b72b2 100644 --- a/src/pthread_func.h +++ b/src/pthread_func.h @@ -24,6 +24,9 @@ extern "C" { int init_pthread_lock(pthread_mutex_t *pthread_lock); int init_pthread_attr(pthread_attr_t *pattr, const int stack_size); +int init_pthread_lock_cond_pair(pthread_lock_cond_pair_t *lcp); +void destroy_pthread_lock_cond_pair(pthread_lock_cond_pair_t *lcp); + #define PTHREAD_MUTEX_LOCK(lock) \ do { \ int lock_res; \