add files: sorted_queue.[hc]

pull/37/merge
YuQing 2021-09-02 21:05:09 +08:00
parent e1ef38d6a4
commit a66370d0f8
6 changed files with 257 additions and 20 deletions

View File

@ -1,9 +1,10 @@
Version 1.54 2021-08-17
Version 1.54 2021-09-02
* fast_allocator.[hc]: correct reclaim_interval logic
* shared_func.[hc]: add functions getFileContentEx1 and getFileContent1
* fc_queue.[hc]: add function fc_queue_timedpeek
* pthread_func.[hc]: add function init_pthread_rwlock
* add files: sorted_queue.[hc]
Version 1.53 2021-06-30
* process_action support action status

View File

@ -14,9 +14,10 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \
fast_buffer.lo multi_skiplist.lo flat_skiplist.lo \
system_info.lo fast_blocked_queue.lo id_generator.lo \
char_converter.lo char_convert_loader.lo common_blocked_queue.lo \
multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \
json_parser.lo buffered_file_writer.lo server_id_func.lo \
fc_queue.lo fc_memory.lo shared_buffer.lo thread_pool.lo
multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \
json_parser.lo buffered_file_writer.lo server_id_func.lo \
fc_queue.lo sorted_queue.lo fc_memory.lo shared_buffer.lo \
thread_pool.lo
FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
logger.o sockopt.o base64.o sched_thread.o \
@ -27,9 +28,10 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
fast_buffer.o multi_skiplist.o flat_skiplist.o \
system_info.o fast_blocked_queue.o id_generator.o \
char_converter.o char_convert_loader.o common_blocked_queue.o \
multi_socket_client.o skiplist_set.o uniq_skiplist.o \
multi_socket_client.o skiplist_set.o uniq_skiplist.o \
json_parser.o buffered_file_writer.o server_id_func.o \
fc_queue.o fc_memory.o shared_buffer.o thread_pool.o
fc_queue.o sorted_queue.o fc_memory.o shared_buffer.o \
thread_pool.o
HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
shared_func.h pthread_func.h ini_file_reader.h _os_define.h \
@ -43,8 +45,8 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
char_convert_loader.h common_blocked_queue.h \
multi_socket_client.h skiplist_set.h uniq_skiplist.h \
fc_list.h locked_list.h json_parser.h buffered_file_writer.h \
server_id_func.h fc_queue.h fc_memory.h shared_buffer.h \
thread_pool.h fc_atomic.h
server_id_func.h fc_queue.h sorted_queue.h fc_memory.h \
shared_buffer.h thread_pool.h fc_atomic.h
ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS)

View File

@ -15,17 +15,9 @@
//fc_queue.c
#include <errno.h>
#include <pthread.h>
#include <inttypes.h>
#include "logger.h"
#include "shared_func.h"
#include "pthread_func.h"
#include "fc_queue.h"
#define FC_QUEUE_NEXT_PTR(queue, data) \
*((void **)(((char *)data) + queue->next_ptr_offset))
int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset)
{
int result;

View File

@ -18,10 +18,6 @@
#ifndef _FC_QUEUE_H
#define _FC_QUEUE_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "common_define.h"
struct fc_queue_info
@ -38,6 +34,10 @@ struct fc_queue
int next_ptr_offset;
};
#define FC_QUEUE_NEXT_PTR(queue, data) \
*((void **)(((char *)data) + (queue)->next_ptr_offset))
#ifdef __cplusplus
extern "C" {
#endif

146
src/sorted_queue.c Normal file
View File

@ -0,0 +1,146 @@
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the Lesser GNU General Public License, version 3
* or later ("LGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//sorted_queue.c
#include "pthread_func.h"
#include "sorted_queue.h"
int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset,
int (*compare_func)(const void *, const void *))
{
sq->compare_func = compare_func;
return fc_queue_init(&sq->queue, next_ptr_offset);
}
void sorted_queue_destroy(struct sorted_queue *sq)
{
fc_queue_destroy(&sq->queue);
}
void sorted_queue_push_ex(struct sorted_queue *sq, void *data, bool *notify)
{
void *previous;
void *current;
PTHREAD_MUTEX_LOCK(&sq->queue.lc_pair.lock);
if (sq->queue.tail == NULL) {
FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL;
sq->queue.head = sq->queue.tail = data;
*notify = true;
} else {
if (sq->compare_func(data, sq->queue.tail) >= 0) {
FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL;
FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.tail) = data;
sq->queue.tail = data;
} else if (sq->compare_func(data, sq->queue.head) < 0) {
FC_QUEUE_NEXT_PTR(&sq->queue, data) = sq->queue.head;
sq->queue.head = data;
} else {
previous = sq->queue.head;
current = FC_QUEUE_NEXT_PTR(&sq->queue, previous);
while (sq->compare_func(data, current) >= 0) {
previous = current;
current = FC_QUEUE_NEXT_PTR(&sq->queue, previous);
}
FC_QUEUE_NEXT_PTR(&sq->queue, data) = FC_QUEUE_NEXT_PTR(
&sq->queue, previous);
FC_QUEUE_NEXT_PTR(&sq->queue, previous) = data;
}
*notify = false;
}
PTHREAD_MUTEX_UNLOCK(&sq->queue.lc_pair.lock);
}
void *sorted_queue_pop_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked)
{
void *data;
PTHREAD_MUTEX_LOCK(&sq->queue.lc_pair.lock);
do {
if (sq->queue.head == NULL) {
if (!blocked) {
data = NULL;
break;
}
pthread_cond_wait(&sq->queue.lc_pair.cond,
&sq->queue.lc_pair.lock);
}
if (sq->queue.head == NULL) {
data = NULL;
} else {
if (sq->compare_func(sq->queue.head, less_equal) <= 0) {
data = sq->queue.head;
sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, data);
if (sq->queue.head == NULL) {
sq->queue.tail = NULL;
}
} else {
data = NULL;
}
}
} while (0);
PTHREAD_MUTEX_UNLOCK(&sq->queue.lc_pair.lock);
return data;
}
void *sorted_queue_pop_all_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked)
{
struct fc_queue_info chain;
PTHREAD_MUTEX_LOCK(&sq->queue.lc_pair.lock);
do {
if (sq->queue.head == NULL) {
if (!blocked) {
chain.head = NULL;
break;
}
pthread_cond_wait(&sq->queue.lc_pair.cond,
&sq->queue.lc_pair.lock);
}
if (sq->queue.head == NULL) {
chain.head = NULL;
} else {
if (sq->compare_func(sq->queue.head, less_equal) <= 0) {
chain.head = chain.tail = sq->queue.head;
sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.head);
while (sq->compare_func(sq->queue.head, less_equal) <= 0) {
chain.tail = sq->queue.head;
sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.head);
}
if (sq->queue.head == NULL) {
sq->queue.tail = NULL;
} else {
FC_QUEUE_NEXT_PTR(&sq->queue, chain.tail) = NULL;
}
} else {
chain.head = NULL;
}
}
} while (0);
PTHREAD_MUTEX_UNLOCK(&sq->queue.lc_pair.lock);
return chain.head;
}

96
src/sorted_queue.h Normal file
View File

@ -0,0 +1,96 @@
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the Lesser GNU General Public License, version 3
* or later ("LGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//sorted_queue.h
#ifndef _FC_SORTED_QUEUE_H
#define _FC_SORTED_QUEUE_H
#include "fc_queue.h"
struct sorted_queue
{
struct fc_queue queue;
int next_ptr_offset;
int (*compare_func)(const void *, const void *);
};
#ifdef __cplusplus
extern "C" {
#endif
int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset,
int (*compare_func)(const void *, const void *));
void sorted_queue_destroy(struct sorted_queue *sq);
static inline void sorted_queue_terminate(struct sorted_queue *sq)
{
fc_queue_terminate(&sq->queue);
}
static inline void sorted_queue_terminate_all(
struct sorted_queue *sq, const int count)
{
fc_queue_terminate_all(&sq->queue, count);
}
//notify by the caller
void sorted_queue_push_ex(struct sorted_queue *sq, void *data, bool *notify);
static inline void sorted_queue_push(struct sorted_queue *sq, void *data)
{
bool notify;
sorted_queue_push_ex(sq, data, &notify);
if (notify) {
pthread_cond_signal(&(sq->queue.lc_pair.cond));
}
}
static inline void sorted_queue_push_silence(struct sorted_queue *sq, void *data)
{
bool notify;
sorted_queue_push_ex(sq, data, &notify);
}
void *sorted_queue_pop_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked);
#define sorted_queue_pop(sq, less_equal) \
sorted_queue_pop_ex(sq, less_equal, true)
#define sorted_queue_try_pop(sq, less_equal) \
sorted_queue_pop_ex(sq, less_equal, false)
void *sorted_queue_pop_all_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked);
#define sorted_queue_pop_all(sq, less_equal) \
sorted_queue_pop_all_ex(sq, less_equal, true)
#define sorted_queue_try_pop_all(sq, less_equal) \
sorted_queue_pop_all_ex(sq, less_equal, false)
static inline bool sorted_queue_empty(struct sorted_queue *sq)
{
return fc_queue_empty(&sq->queue);
}
#ifdef __cplusplus
}
#endif
#endif