sorted queue use double link chain for quick push

fstore_storage_engine
YuQing 2023-05-04 17:08:51 +08:00
parent f1691b7480
commit 6dbc8b8937
7 changed files with 306 additions and 149 deletions

1
.gitignore vendored
View File

@ -59,6 +59,7 @@ src/tests/test_mutex_lock_perf
src/tests/test_queue_perf
src/tests/test_normalize_path
src/tests/test_sorted_array
src/tests/test_sorted_queue
# other
*.swp

View File

@ -1,7 +1,8 @@
Version 1.66 2023-03-27
Version 1.66 2023-05-04
* struct fast_task_info add field: notify_next for nio notify queue
* lc_pair in struct fc_queue change to lcp
* sorted queue use double link chain for quick push
Version 1.65 2023-01-09
* locked_list.h: add functions locked_list_move and locked_list_move_tail

View File

@ -45,6 +45,16 @@ fc_list_add_before (struct fc_list_head *_new, struct fc_list_head *current)
_new->next->prev = _new;
}
static inline void
fc_list_add_after (struct fc_list_head *_new, struct fc_list_head *current)
{
_new->prev = current;
_new->next = current->next;
current->next->prev = _new;
current->next = _new;
}
static inline void
fc_list_add_internal (struct fc_list_head *_new, struct fc_list_head *prev,
struct fc_list_head *next)

View File

@ -18,135 +18,143 @@
#include "pthread_func.h"
#include "sorted_queue.h"
int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset,
int sorted_queue_init(struct sorted_queue *sq, const int dlink_offset,
int (*compare_func)(const void *, const void *))
{
int result;
if ((result=init_pthread_lock_cond_pair(&sq->lcp)) != 0) {
return result;
}
FC_INIT_LIST_HEAD(&sq->head);
sq->dlink_offset = dlink_offset;
sq->compare_func = compare_func;
return fc_queue_init(&sq->queue, next_ptr_offset);
return 0;
}
void sorted_queue_destroy(struct sorted_queue *sq)
{
fc_queue_destroy(&sq->queue);
destroy_pthread_lock_cond_pair(&sq->lcp);
}
void sorted_queue_push_ex(struct sorted_queue *sq, void *data, bool *notify)
{
void *previous;
void *current;
struct fc_list_head *dlink;
struct fc_list_head *current;
int count = 0;
PTHREAD_MUTEX_LOCK(&sq->queue.lcp.lock);
if (sq->queue.tail == NULL) {
FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL;
sq->queue.head = sq->queue.tail = data;
dlink = FC_SORTED_QUEUE_DLINK_PTR(sq, data);
PTHREAD_MUTEX_LOCK(&sq->lcp.lock);
if (fc_list_empty(&sq->head)) {
fc_list_add(dlink, &sq->head);
*notify = true;
} else {
if (sq->compare_func(data, sq->queue.tail) >= 0) {
FC_QUEUE_NEXT_PTR(&sq->queue, data) = NULL;
FC_QUEUE_NEXT_PTR(&sq->queue, sq->queue.tail) = data;
sq->queue.tail = data;
if (sq->compare_func(data, FC_SORTED_QUEUE_DATA_PTR(
sq, sq->head.prev)) >= 0)
{
fc_list_add_tail(dlink, &sq->head);
*notify = false;
} else if (sq->compare_func(data, sq->queue.head) < 0) {
FC_QUEUE_NEXT_PTR(&sq->queue, data) = sq->queue.head;
sq->queue.head = data;
} else if (sq->compare_func(data, FC_SORTED_QUEUE_DATA_PTR(
sq, sq->head.next)) < 0)
{
fc_list_add(dlink, &sq->head);
*notify = true;
} else {
previous = sq->queue.head;
current = FC_QUEUE_NEXT_PTR(&sq->queue, previous);
while (sq->compare_func(data, current) >= 0) {
previous = current;
current = FC_QUEUE_NEXT_PTR(&sq->queue, previous);
current = sq->head.prev->prev;
while (sq->compare_func(data, FC_SORTED_QUEUE_DATA_PTR(
sq, current)) < 0)
{
current = current->prev;
++count;
}
FC_QUEUE_NEXT_PTR(&sq->queue, data) = FC_QUEUE_NEXT_PTR(
&sq->queue, previous);
FC_QUEUE_NEXT_PTR(&sq->queue, previous) = data;
fc_list_add_after(dlink, current);
*notify = false;
if (count >= 10) {
logInfo("================== compare count: %d ==========", count);
}
}
}
PTHREAD_MUTEX_UNLOCK(&sq->queue.lcp.lock);
PTHREAD_MUTEX_UNLOCK(&sq->lcp.lock);
}
void *sorted_queue_pop_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked)
void sorted_queue_pop_ex(struct sorted_queue *sq, void *less_equal,
struct fc_list_head *head, const bool blocked)
{
void *data;
struct fc_list_head *current;
PTHREAD_MUTEX_LOCK(&sq->queue.lcp.lock);
PTHREAD_MUTEX_LOCK(&sq->lcp.lock);
do {
if (sq->queue.head == NULL || sq->compare_func(
sq->queue.head, less_equal) > 0)
{
if (fc_list_empty(&sq->head)) {
if (!blocked) {
data = NULL;
FC_INIT_LIST_HEAD(head);
break;
}
pthread_cond_wait(&sq->queue.lcp.cond,
&sq->queue.lcp.lock);
pthread_cond_wait(&sq->lcp.cond,
&sq->lcp.lock);
}
if (sq->queue.head == NULL) {
data = NULL;
if (fc_list_empty(&sq->head)) {
FC_INIT_LIST_HEAD(head);
} else {
if (sq->compare_func(sq->queue.head, less_equal) <= 0) {
data = sq->queue.head;
sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue, data);
if (sq->queue.head == NULL) {
sq->queue.tail = NULL;
}
} else {
data = NULL;
}
}
} while (0);
PTHREAD_MUTEX_UNLOCK(&sq->queue.lcp.lock);
return data;
}
void sorted_queue_pop_to_queue_ex(struct sorted_queue *sq,
void *less_equal, struct fc_queue_info *qinfo,
const bool blocked)
{
PTHREAD_MUTEX_LOCK(&sq->queue.lcp.lock);
do {
if (sq->queue.head == NULL) {
if (!blocked) {
qinfo->head = qinfo->tail = NULL;
break;
}
pthread_cond_wait(&sq->queue.lcp.cond,
&sq->queue.lcp.lock);
}
if (sq->queue.head == NULL) {
qinfo->head = qinfo->tail = NULL;
} else {
if (sq->compare_func(sq->queue.head, less_equal) <= 0) {
qinfo->head = qinfo->tail = sq->queue.head;
sq->queue.head = FC_QUEUE_NEXT_PTR(&sq->queue,
sq->queue.head);
while (sq->queue.head != NULL && sq->compare_func(
sq->queue.head, less_equal) <= 0)
current = sq->head.next;
if (sq->compare_func(FC_SORTED_QUEUE_DATA_PTR(
sq, current), less_equal) <= 0)
{
head->next = current;
current->prev = head;
current = current->next;
while (current != &sq->head && sq->compare_func(
FC_SORTED_QUEUE_DATA_PTR(sq, current),
less_equal) <= 0)
{
qinfo->tail = sq->queue.head;
sq->queue.head = FC_QUEUE_NEXT_PTR(
&sq->queue, sq->queue.head);
current = current->next;
}
if (sq->queue.head == NULL) {
sq->queue.tail = NULL;
head->prev = current->prev;
current->prev->next = head;
if (current == &sq->head) {
FC_INIT_LIST_HEAD(&sq->head);
} else {
FC_QUEUE_NEXT_PTR(&sq->queue, qinfo->tail) = NULL;
sq->head.next = current;
current->prev = &sq->head;
}
} else {
qinfo->head = qinfo->tail = NULL;
FC_INIT_LIST_HEAD(head);
}
}
} while (0);
PTHREAD_MUTEX_UNLOCK(&sq->queue.lcp.lock);
PTHREAD_MUTEX_UNLOCK(&sq->lcp.lock);
}
int sorted_queue_free_chain(struct sorted_queue *sq,
struct fast_mblock_man *mblock, struct fc_list_head *head)
{
struct fast_mblock_node *previous;
struct fast_mblock_node *current;
struct fast_mblock_chain chain;
struct fc_list_head *node;
if (fc_list_empty(&sq->head)) {
return 0;
}
node = head->next;
chain.head = previous = fast_mblock_to_node_ptr(
FC_SORTED_QUEUE_DATA_PTR(sq, node));
node = node->next;
while (node != head) {
current = fast_mblock_to_node_ptr(FC_SORTED_QUEUE_DATA_PTR(sq, node));
previous->next = current;
previous = current;
node = node->next;
}
previous->next = NULL;
chain.tail = previous;
return fast_mblock_batch_free(mblock, &chain);
}

View File

@ -18,32 +18,44 @@
#ifndef _FC_SORTED_QUEUE_H
#define _FC_SORTED_QUEUE_H
#include "fc_queue.h"
#include "fast_mblock.h"
#include "fc_list.h"
struct sorted_queue
{
struct fc_queue queue;
struct fc_list_head head;
pthread_lock_cond_pair_t lcp;
int dlink_offset;
int (*compare_func)(const void *, const void *);
};
#define FC_SORTED_QUEUE_DLINK_PTR(sq, data) \
((void *)(((char *)data) + (sq)->dlink_offset))
#define FC_SORTED_QUEUE_DATA_PTR(sq, dlink) \
((void *)((char *)(dlink) - (sq)->dlink_offset))
#ifdef __cplusplus
extern "C" {
#endif
int sorted_queue_init(struct sorted_queue *sq, const int next_ptr_offset,
int sorted_queue_init(struct sorted_queue *sq, const int dlink_offset,
int (*compare_func)(const void *, const void *));
void sorted_queue_destroy(struct sorted_queue *sq);
static inline void sorted_queue_terminate(struct sorted_queue *sq)
{
fc_queue_terminate(&sq->queue);
pthread_cond_signal(&sq->lcp.cond);
}
static inline void sorted_queue_terminate_all(
struct sorted_queue *sq, const int count)
{
fc_queue_terminate_all(&sq->queue, count);
int i;
for (i=0; i<count; i++) {
pthread_cond_signal(&(sq->lcp.cond));
}
}
//notify by the caller
@ -55,76 +67,33 @@ static inline void sorted_queue_push(struct sorted_queue *sq, void *data)
sorted_queue_push_ex(sq, data, &notify);
if (notify) {
pthread_cond_signal(&(sq->queue.lcp.cond));
pthread_cond_signal(&(sq->lcp.cond));
}
}
static inline void sorted_queue_push_silence(struct sorted_queue *sq, void *data)
static inline void sorted_queue_push_silence(
struct sorted_queue *sq, void *data)
{
bool notify;
sorted_queue_push_ex(sq, data, &notify);
}
void *sorted_queue_pop_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked);
void sorted_queue_pop_ex(struct sorted_queue *sq, void *less_equal,
struct fc_list_head *head, const bool blocked);
#define sorted_queue_pop(sq, less_equal) \
sorted_queue_pop_ex(sq, less_equal, true)
#define sorted_queue_pop(sq, less_equal, head) \
sorted_queue_pop_ex(sq, less_equal, head, true)
#define sorted_queue_try_pop(sq, less_equal) \
sorted_queue_pop_ex(sq, less_equal, false)
void sorted_queue_pop_to_queue_ex(struct sorted_queue *sq,
void *less_equal, struct fc_queue_info *qinfo,
const bool blocked);
#define sorted_queue_pop_to_queue(sq, less_equal, qinfo) \
sorted_queue_pop_to_queue_ex(sq, less_equal, qinfo, true)
#define sorted_queue_try_pop_to_queue(sq, less_equal, qinfo) \
sorted_queue_pop_to_queue_ex(sq, less_equal, qinfo, false)
static inline void *sorted_queue_pop_all_ex(struct sorted_queue *sq,
void *less_equal, const bool blocked)
{
struct fc_queue_info chain;
sorted_queue_pop_to_queue_ex(sq, less_equal, &chain, blocked);
return chain.head;
}
#define sorted_queue_pop_all(sq, less_equal) \
sorted_queue_pop_all_ex(sq, less_equal, true)
#define sorted_queue_try_pop_all(sq, less_equal) \
sorted_queue_pop_all_ex(sq, less_equal, false)
#define sorted_queue_try_pop(sq, less_equal, head) \
sorted_queue_pop_ex(sq, less_equal, head, false)
static inline bool sorted_queue_empty(struct sorted_queue *sq)
{
return fc_queue_empty(&sq->queue);
return fc_list_empty(&sq->head);
}
static inline void *sorted_queue_timedpeek(struct sorted_queue *sq,
const int timeout, const int time_unit)
{
return fc_queue_timedpeek(&sq->queue, timeout, time_unit);
}
#define sorted_queue_timedpeek_sec(sq, timeout) \
sorted_queue_timedpeek(sq, timeout, FC_TIME_UNIT_SECOND)
#define sorted_queue_timedpeek_ms(sq, timeout_ms) \
sorted_queue_timedpeek(sq, timeout_ms, FC_TIME_UNIT_MSECOND)
#define sorted_queue_timedpeek_us(sq, timeout_us) \
sorted_queue_timedpeek(sq, timeout_us, FC_TIME_UNIT_USECOND)
static inline int sorted_queue_free_chain(struct sorted_queue *sq,
struct fast_mblock_man *mblock, struct fc_queue_info *qinfo)
{
return fc_queue_free_chain(&sq->queue, mblock, qinfo);
}
int sorted_queue_free_chain(struct sorted_queue *sq,
struct fast_mblock_man *mblock, struct fc_list_head *head);
#ifdef __cplusplus
}

View File

@ -10,7 +10,7 @@ ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blo
test_json_parser test_pthread_lock test_uniq_skiplist test_split_string \
test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock \
test_pthread_wait test_thread_pool test_data_visible test_mutex_lock_perf \
test_queue_perf test_normalize_path test_sorted_array
test_queue_perf test_normalize_path test_sorted_array test_sorted_queue
all: $(ALL_PRGS)
.c:

View File

@ -0,0 +1,168 @@
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the Lesser GNU General Public License, version 3
* or later ("LGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <assert.h>
#include <inttypes.h>
#include <sys/time.h>
#include "fastcommon/sorted_queue.h"
#include "fastcommon/logger.h"
#include "fastcommon/shared_func.h"
#define COUNT 100
#define LAST_INDEX (COUNT - 1)
typedef struct {
int n;
struct fc_list_head dlink;
} DoubleLinkNumber;
static DoubleLinkNumber *numbers;
static struct sorted_queue sq;
static int compare_func(const void *p1, const void *p2)
{
return ((DoubleLinkNumber *)p1)->n - ((DoubleLinkNumber *)p2)->n;
}
void set_rand_numbers(const int multiple)
{
int i;
int tmp;
int index1;
int index2;
for (i=0; i<COUNT; i++) {
numbers[i].n = multiple * i + 1;
}
for (i=0; i<COUNT; i++) {
index1 = (LAST_INDEX * (int64_t)rand()) / (int64_t)RAND_MAX;
index2 = (LAST_INDEX * (int64_t)rand()) / (int64_t)RAND_MAX;
if (index1 == index2) {
continue;
}
tmp = numbers[index1].n;
numbers[index1].n = numbers[index2].n;
numbers[index2].n = tmp;
}
}
static void test1()
{
int i;
DoubleLinkNumber less_equal;
DoubleLinkNumber *number;
struct fc_list_head head;
set_rand_numbers(1);
for (i=0; i<COUNT; i++) {
sorted_queue_push_silence(&sq, numbers + i);
}
less_equal.n = COUNT;
sorted_queue_try_pop(&sq, &less_equal, &head);
assert(sorted_queue_empty(&sq));
i = 0;
fc_list_for_each_entry (number, &head, dlink) {
i++;
if (i != number->n) {
fprintf(stderr, "i: %d != value: %d\n", i, number->n);
break;
}
}
assert(i == COUNT);
sorted_queue_try_pop(&sq, &less_equal, &head);
assert(fc_list_empty(&head));
}
static void test2()
{
#define MULTIPLE 2
int i;
int n;
DoubleLinkNumber less_equal;
DoubleLinkNumber *number;
struct fc_list_head head;
set_rand_numbers(MULTIPLE);
for (i=0; i<COUNT; i++) {
sorted_queue_push_silence(&sq, numbers + i);
}
less_equal.n = 0;
sorted_queue_try_pop(&sq, &less_equal, &head);
assert(fc_list_empty(&head));
less_equal.n = COUNT;
sorted_queue_try_pop(&sq, &less_equal, &head);
assert(!sorted_queue_empty(&sq));
i = 0;
fc_list_for_each_entry (number, &head, dlink) {
n = i++ * MULTIPLE + 1;
if (n != number->n) {
fprintf(stderr, "%d. n: %d != value: %d\n", i, n, number->n);
break;
}
}
less_equal.n = 2 * COUNT + 1;
sorted_queue_try_pop(&sq, &less_equal, &head);
assert(sorted_queue_empty(&sq));
fc_list_for_each_entry (number, &head, dlink) {
n = i++ * MULTIPLE + 1;
if (n != number->n) {
fprintf(stderr, "%d. n: %d != value: %d\n", i, n, number->n);
break;
}
}
assert(i == COUNT);
}
int main(int argc, char *argv[])
{
int result;
int64_t start_time;
int64_t end_time;
start_time = get_current_time_ms();
log_init();
numbers = (DoubleLinkNumber *)malloc(sizeof(DoubleLinkNumber) * COUNT);
srand(time(NULL));
if ((result=sorted_queue_init(&sq, (long)(&((DoubleLinkNumber *)
NULL)->dlink), compare_func)) != 0)
{
return result;
}
test1();
test2();
end_time = get_current_time_ms();
printf("pass OK, time used: %"PRId64" ms\n", end_time - start_time);
return 0;
}