Compare commits
12 Commits
8fbcc79c8c
...
a1da445899
| Author | SHA1 | Date |
|---|---|---|
|
|
a1da445899 | |
|
|
9acc202481 | |
|
|
dd0d4dbc19 | |
|
|
a1ae1cbcb0 | |
|
|
fda2679435 | |
|
|
f0484579e0 | |
|
|
6a18162a12 | |
|
|
8e834f7165 | |
|
|
a256976600 | |
|
|
62a29b55a5 | |
|
|
a6dc24e2f3 | |
|
|
a16fde8070 |
|
|
@ -62,6 +62,8 @@ src/tests/test_sorted_array
|
|||
src/tests/test_sorted_queue
|
||||
src/tests/test_thread_local
|
||||
src/tests/test_memcpy
|
||||
src/tests/mblock_benchmark
|
||||
src/tests/cpool_benchmark
|
||||
|
||||
# other
|
||||
*.swp
|
||||
|
|
|
|||
4
HISTORY
4
HISTORY
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
Version 1.78 2025-06-19
|
||||
Version 1.78 2025-08-02
|
||||
* getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6
|
||||
* add files: spinlock.[hc]
|
||||
* connection_pool.[hc]: use CAS instead of pthread mutex lock
|
||||
|
||||
Version 1.77 2025-03-18
|
||||
* impl. shorten_path for /./ and /../
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ FAST_SHARED_OBJS = hash.lo chain.lo shared_func.lo ini_file_reader.lo \
|
|||
multi_socket_client.lo skiplist_set.lo uniq_skiplist.lo \
|
||||
json_parser.lo buffered_file_writer.lo server_id_func.lo \
|
||||
fc_queue.lo sorted_queue.lo fc_memory.lo shared_buffer.lo \
|
||||
thread_pool.lo array_allocator.lo sorted_array.lo
|
||||
thread_pool.lo array_allocator.lo sorted_array.lo spinlock.lo
|
||||
|
||||
FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
|
||||
logger.o sockopt.o base64.o sched_thread.o \
|
||||
|
|
@ -31,7 +31,7 @@ FAST_STATIC_OBJS = hash.o chain.o shared_func.o ini_file_reader.o \
|
|||
multi_socket_client.o skiplist_set.o uniq_skiplist.o \
|
||||
json_parser.o buffered_file_writer.o server_id_func.o \
|
||||
fc_queue.o sorted_queue.o fc_memory.o shared_buffer.o \
|
||||
thread_pool.o array_allocator.o sorted_array.o
|
||||
thread_pool.o array_allocator.o sorted_array.o spinlock.o
|
||||
|
||||
HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
|
||||
shared_func.h pthread_func.h ini_file_reader.h _os_define.h \
|
||||
|
|
@ -47,7 +47,7 @@ HEADER_FILES = common_define.h hash.h chain.h logger.h base64.h \
|
|||
fc_list.h locked_list.h json_parser.h buffered_file_writer.h \
|
||||
server_id_func.h fc_queue.h sorted_queue.h fc_memory.h \
|
||||
shared_buffer.h thread_pool.h fc_atomic.h array_allocator.h \
|
||||
sorted_array.h
|
||||
sorted_array.h spinlock.h
|
||||
|
||||
ALL_OBJS = $(FAST_STATIC_OBJS) $(FAST_SHARED_OBJS)
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ libfastcommon.a: $(FAST_STATIC_OBJS)
|
|||
.c:
|
||||
$(COMPILE) -o $@ $< $(FAST_STATIC_OBJS) $(LIB_PATH) $(INC_PATH)
|
||||
.c.o:
|
||||
$(COMPILE) -c -o $@ $< $(INC_PATH)
|
||||
$(COMPILE) -c -fPIC -o $@ $< $(INC_PATH)
|
||||
.c.lo:
|
||||
$(COMPILE) -c -fPIC -o $@ $< $(INC_PATH)
|
||||
install:
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@
|
|||
#include "shared_func.h"
|
||||
#include "sched_thread.h"
|
||||
#include "server_id_func.h"
|
||||
#include "fc_atomic.h"
|
||||
#include "connection_pool.h"
|
||||
|
||||
ConnectionCallbacks g_connection_callbacks = {
|
||||
|
|
@ -52,13 +53,19 @@ static int node_init_for_rdma(ConnectionNode *node,
|
|||
static inline void conn_pool_get_key(const ConnectionInfo *conn,
|
||||
char *key, int *key_len)
|
||||
{
|
||||
*key_len = sprintf(key, "%s-%u", conn->ip_addr, conn->port);
|
||||
*key_len = strlen(conn->ip_addr);
|
||||
memcpy(key, conn->ip_addr, *key_len);
|
||||
*(key + (*key_len)++) = '-';
|
||||
*key_len += fc_itoa(conn->port, key + (*key_len));
|
||||
}
|
||||
|
||||
static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
|
||||
ConnectionInfo *conn, const bool bForce)
|
||||
{
|
||||
ConnectionNode *node;
|
||||
int64_t index;
|
||||
int64_t head;
|
||||
int64_t tail;
|
||||
char formatted_ip[FORMATTED_IP_SIZE];
|
||||
|
||||
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
|
||||
|
|
@ -71,9 +78,11 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
|
|||
return EINVAL;
|
||||
}
|
||||
|
||||
if (bForce)
|
||||
{
|
||||
cm->total_count--;
|
||||
while (1) {
|
||||
head = FC_ATOMIC_GET(cm->ring.head);
|
||||
tail = FC_ATOMIC_GET(cm->ring.tail);
|
||||
if (bForce || (head - tail) >= cp->ring_size - 1) {
|
||||
FC_ATOMIC_DEC(cm->total_count);
|
||||
|
||||
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
||||
format_ip_address(conn->ip_addr, formatted_ip);
|
||||
|
|
@ -88,19 +97,25 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
|
|||
close_connection(conn);
|
||||
fast_mblock_free_object(&cp->node_allocator, node);
|
||||
|
||||
node = cm->head;
|
||||
while (node != NULL)
|
||||
{
|
||||
if (bForce) {
|
||||
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) {
|
||||
node = (ConnectionNode *)FC_ATOMIC_GET(
|
||||
cm->ring.nodes[index % cp->ring_size]);
|
||||
if (node != NULL) {
|
||||
node->conn->validate_flag = true;
|
||||
node = node->next;
|
||||
}
|
||||
}
|
||||
else
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (__sync_bool_compare_and_swap(&cm->ring.nodes[
|
||||
head % cp->ring_size], NULL, node))
|
||||
{
|
||||
if (__sync_bool_compare_and_swap(&cm->ring.head, head, head + 1)) {
|
||||
node->atime = get_current_time();
|
||||
node->next = cm->head;
|
||||
cm->head = node;
|
||||
cm->free_count++;
|
||||
FC_ATOMIC_INC(cm->free_count);
|
||||
|
||||
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
||||
format_ip_address(conn->ip_addr, formatted_ip);
|
||||
|
|
@ -110,41 +125,62 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
|
|||
__LINE__, formatted_ip, conn->port,
|
||||
conn->sock, cm->total_count, cm->free_count);
|
||||
}
|
||||
|
||||
break;
|
||||
} else { //rollback
|
||||
__sync_bool_compare_and_swap(&cm->ring.nodes[
|
||||
head % cp->ring_size], node, NULL);
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static ConnectionManager *find_manager(ConnectionPool *cp,
|
||||
ConnectionBucket *bucket, const string_t *key,
|
||||
const bool need_create)
|
||||
static inline ConnectionManager *do_find_manager(
|
||||
ConnectionManager *head, const string_t *key)
|
||||
{
|
||||
ConnectionManager *cm;
|
||||
|
||||
if (bucket->head != NULL)
|
||||
if (fc_string_equal(&head->key, key)) //fast path
|
||||
{
|
||||
if (fc_string_equal(&bucket->head->key, key)) //fast path
|
||||
{
|
||||
return bucket->head;
|
||||
return head;
|
||||
}
|
||||
else
|
||||
{
|
||||
cm = bucket->head->next;
|
||||
|
||||
cm = (ConnectionManager *)FC_ATOMIC_GET(head->next);
|
||||
while (cm != NULL)
|
||||
{
|
||||
if (fc_string_equal(&cm->key, key))
|
||||
{
|
||||
return cm;
|
||||
}
|
||||
cm = cm->next;
|
||||
}
|
||||
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!need_create)
|
||||
static ConnectionManager *find_manager(ConnectionBucket *bucket,
|
||||
const string_t *key)
|
||||
{
|
||||
ConnectionManager *head;
|
||||
|
||||
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
|
||||
if (head == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
return do_find_manager(head, key);
|
||||
}
|
||||
|
||||
static ConnectionManager *create_manager(ConnectionPool *cp,
|
||||
ConnectionBucket *bucket, ConnectionManager *old_head,
|
||||
const string_t *key, int *result)
|
||||
{
|
||||
ConnectionManager *cm;
|
||||
char *buff;
|
||||
int aligned_key_len;
|
||||
int bytes;
|
||||
|
||||
cm = (ConnectionManager *)fast_mblock_alloc_object(
|
||||
&cp->manager_allocator);
|
||||
|
|
@ -153,24 +189,66 @@ static ConnectionManager *find_manager(ConnectionPool *cp,
|
|||
logError("file: "__FILE__", line: %d, "
|
||||
"malloc %d bytes fail", __LINE__,
|
||||
(int)sizeof(ConnectionManager));
|
||||
*result = ENOMEM;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
cm->head = NULL;
|
||||
cm->ring.head = cm->ring.tail = 0;
|
||||
cm->total_count = 0;
|
||||
cm->free_count = 0;
|
||||
if ((cm->key.str=fc_malloc(key->len + 1)) == NULL)
|
||||
|
||||
aligned_key_len = MEM_ALIGN(key->len);
|
||||
bytes = aligned_key_len + sizeof(ConnectionNode *) * cp->ring_size;
|
||||
if ((buff=fc_malloc(bytes)) == NULL)
|
||||
{
|
||||
*result = ENOMEM;
|
||||
return NULL;
|
||||
}
|
||||
memcpy(cm->key.str, key->str, key->len + 1);
|
||||
memset(buff, 0, bytes);
|
||||
cm->key.str = buff;
|
||||
cm->ring.nodes = (ConnectionNode **)(buff + aligned_key_len);
|
||||
memcpy(cm->key.str, key->str, key->len);
|
||||
cm->key.len = key->len;
|
||||
|
||||
//add to manager chain
|
||||
cm->next = bucket->head;
|
||||
bucket->head = cm;
|
||||
FC_ATOMIC_SET(cm->next, old_head);
|
||||
if (__sync_bool_compare_and_swap(&bucket->head, old_head, cm))
|
||||
{
|
||||
return cm;
|
||||
}
|
||||
else
|
||||
{
|
||||
fast_mblock_free_object(&cp->manager_allocator, cm);
|
||||
*result = EAGAIN;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static ConnectionManager *get_manager(ConnectionPool *cp,
|
||||
ConnectionBucket *bucket, const string_t *key)
|
||||
{
|
||||
ConnectionManager *head;
|
||||
ConnectionManager *cm;
|
||||
int result = 0;
|
||||
|
||||
do {
|
||||
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
|
||||
if (head != NULL)
|
||||
{
|
||||
if ((cm=do_find_manager(head, key)) != NULL)
|
||||
{
|
||||
return cm;
|
||||
}
|
||||
}
|
||||
|
||||
if ((cm=create_manager(cp, bucket, head, key, &result)) != NULL)
|
||||
{
|
||||
return cm;
|
||||
}
|
||||
} while (result == EAGAIN);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
||||
const string_t *key, uint32_t hash_code, const bool bForce)
|
||||
|
|
@ -181,8 +259,7 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
|||
int result;
|
||||
|
||||
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
|
||||
pthread_mutex_lock(&bucket->lock);
|
||||
if ((cm=find_manager(cp, bucket, key, false)) != NULL)
|
||||
if ((cm=find_manager(bucket, key)) != NULL)
|
||||
{
|
||||
result = close_conn(cp, cm, conn, bForce);
|
||||
}
|
||||
|
|
@ -194,7 +271,6 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
|
|||
__LINE__, formatted_ip, conn->port);
|
||||
result = ENOENT;
|
||||
}
|
||||
pthread_mutex_unlock(&bucket->lock);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
@ -235,7 +311,6 @@ static void cp_tls_destroy(void *ptr)
|
|||
static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
|
||||
{
|
||||
int bytes;
|
||||
int result;
|
||||
unsigned int *hash_capacity;
|
||||
ConnectionBucket *bucket;
|
||||
ConnectionBucket *end;
|
||||
|
|
@ -260,10 +335,6 @@ static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
|
|||
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
|
||||
{
|
||||
bucket->head = NULL;
|
||||
if ((result=init_pthread_lock(&bucket->lock)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
@ -284,12 +355,12 @@ int conn_pool_init_ex1(ConnectionPool *cp, const int connect_timeout,
|
|||
cp->connect_timeout_ms = connect_timeout * 1000;
|
||||
cp->max_count_per_entry = max_count_per_entry;
|
||||
cp->max_idle_time = max_idle_time;
|
||||
cp->ring_size = max_count_per_entry > 0 ? max_count_per_entry : 1024;
|
||||
cp->extra_data_size = extra_data_size;
|
||||
cp->connect_done_callback.func = connect_done_func;
|
||||
cp->connect_done_callback.args = connect_done_args;
|
||||
cp->validate_callback.func = validate_func;
|
||||
cp->validate_callback.args = validate_args;
|
||||
|
||||
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager",
|
||||
sizeof(ConnectionManager), 256, alloc_elements_limit,
|
||||
NULL, NULL, true)) != 0)
|
||||
|
|
@ -352,31 +423,31 @@ static void conn_pool_hash_walk(ConnectionPool *cp,
|
|||
end = cp->hashtable.buckets + cp->hashtable.capacity;
|
||||
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
|
||||
{
|
||||
pthread_mutex_lock(&bucket->lock);
|
||||
cm = bucket->head;
|
||||
cm = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
|
||||
while (cm != NULL)
|
||||
{
|
||||
current = cm;
|
||||
cm = cm->next;
|
||||
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
|
||||
callback(cp, current, args);
|
||||
}
|
||||
pthread_mutex_unlock(&bucket->lock);
|
||||
}
|
||||
}
|
||||
|
||||
static void cp_destroy_walk_callback(ConnectionPool *cp,
|
||||
ConnectionManager *cm, void *args)
|
||||
{
|
||||
int64_t index;
|
||||
int64_t head;
|
||||
ConnectionNode *node;
|
||||
ConnectionNode *deleted;
|
||||
|
||||
node = cm->head;
|
||||
while (node != NULL)
|
||||
{
|
||||
deleted = node;
|
||||
node = node->next;
|
||||
G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type].
|
||||
close_connection(deleted->conn);
|
||||
head = FC_ATOMIC_GET(cm->ring.head);
|
||||
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) {
|
||||
node = (ConnectionNode *)FC_ATOMIC_GET(
|
||||
cm->ring.nodes[index % cp->ring_size]);
|
||||
if (node != NULL) {
|
||||
G_COMMON_CONNECTION_CALLBACKS[node->conn->comm_type].
|
||||
close_connection(node->conn);
|
||||
}
|
||||
}
|
||||
|
||||
free(cm->key.str);
|
||||
|
|
@ -384,19 +455,11 @@ static void cp_destroy_walk_callback(ConnectionPool *cp,
|
|||
|
||||
void conn_pool_destroy(ConnectionPool *cp)
|
||||
{
|
||||
ConnectionBucket *bucket;
|
||||
ConnectionBucket *end;
|
||||
|
||||
if (cp->hashtable.buckets == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp);
|
||||
|
||||
end = cp->hashtable.buckets + cp->hashtable.capacity;
|
||||
for (bucket=cp->hashtable.buckets; bucket<end; bucket++) {
|
||||
pthread_mutex_destroy(&bucket->lock);
|
||||
}
|
||||
free(cp->hashtable.buckets);
|
||||
cp->hashtable.buckets = NULL;
|
||||
|
||||
|
|
@ -494,10 +557,11 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
|
|||
}
|
||||
|
||||
static ConnectionInfo *get_conn(ConnectionPool *cp,
|
||||
ConnectionManager *cm, pthread_mutex_t *lock,
|
||||
const ConnectionInfo *conn, const char *service_name,
|
||||
int *err_no)
|
||||
ConnectionManager *cm, const ConnectionInfo *conn,
|
||||
const char *service_name, int *err_no)
|
||||
{
|
||||
int64_t tail;
|
||||
int index;
|
||||
ConnectionNode *node;
|
||||
ConnectionInfo *ci;
|
||||
char formatted_ip[FORMATTED_IP_SIZE];
|
||||
|
|
@ -506,10 +570,11 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
|||
current_time = get_current_time();
|
||||
while (1)
|
||||
{
|
||||
if (cm->head == NULL)
|
||||
tail = FC_ATOMIC_GET(cm->ring.tail);
|
||||
if (tail == FC_ATOMIC_GET(cm->ring.head)) //empty
|
||||
{
|
||||
if ((cp->max_count_per_entry > 0) &&
|
||||
(cm->total_count >= cp->max_count_per_entry))
|
||||
if ((cp->max_count_per_entry > 0) && FC_ATOMIC_GET(
|
||||
cm->total_count) >= cp->max_count_per_entry)
|
||||
{
|
||||
format_ip_address(conn->ip_addr, formatted_ip);
|
||||
*err_no = ENOSPC;
|
||||
|
|
@ -535,9 +600,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
|||
node->manager = cm;
|
||||
node->next = NULL;
|
||||
node->atime = 0;
|
||||
|
||||
cm->total_count++;
|
||||
pthread_mutex_unlock(lock);
|
||||
FC_ATOMIC_INC(cm->total_count);
|
||||
|
||||
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
|
||||
node->conn->port = conn->port;
|
||||
|
|
@ -559,8 +622,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
|||
close_connection(node->conn);
|
||||
fast_mblock_free_object(&cp->node_allocator, node);
|
||||
|
||||
pthread_mutex_lock(lock);
|
||||
cm->total_count--; //rollback
|
||||
FC_ATOMIC_DEC(cm->total_count); //rollback
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
@ -573,19 +635,27 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
|||
node->conn->sock, cm->total_count,
|
||||
cm->free_count);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(lock);
|
||||
return node->conn;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool invalid;
|
||||
|
||||
node = cm->head;
|
||||
ci = node->conn;
|
||||
cm->head = node->next;
|
||||
cm->free_count--;
|
||||
if (!__sync_bool_compare_and_swap(&cm->ring.tail, tail, tail + 1)) {
|
||||
sched_yield();
|
||||
continue;
|
||||
}
|
||||
|
||||
index = tail % cp->ring_size;
|
||||
node = (ConnectionNode *)FC_ATOMIC_GET(
|
||||
cm->ring.nodes[index]);
|
||||
if (node == NULL) {
|
||||
continue;
|
||||
}
|
||||
__sync_bool_compare_and_swap(&cm->ring.nodes[index], node, NULL);
|
||||
|
||||
ci = node->conn;
|
||||
FC_ATOMIC_DEC(cm->free_count);
|
||||
if (current_time - node->atime > cp->max_idle_time)
|
||||
{
|
||||
if (cp->validate_callback.func != NULL)
|
||||
|
|
@ -615,8 +685,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
|
|||
|
||||
if (invalid)
|
||||
{
|
||||
cm->total_count--;
|
||||
|
||||
FC_ATOMIC_DEC(cm->total_count);
|
||||
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
|
||||
format_ip_address(conn->ip_addr, formatted_ip);
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
|
|
@ -659,10 +728,9 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
|
|||
ConnectionInfo *ci;
|
||||
|
||||
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
|
||||
pthread_mutex_lock(&bucket->lock);
|
||||
if ((cm=find_manager(cp, bucket, key, true)) != NULL)
|
||||
if ((cm=get_manager(cp, bucket, key)) != NULL)
|
||||
{
|
||||
ci = get_conn(cp, cm, &bucket->lock, conn, service_name, err_no);
|
||||
ci = get_conn(cp, cm, conn, service_name, err_no);
|
||||
if (ci != NULL)
|
||||
{
|
||||
ci->shared = shared;
|
||||
|
|
@ -673,7 +741,6 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
|
|||
*err_no = ENOMEM;
|
||||
ci = NULL;
|
||||
}
|
||||
pthread_mutex_unlock(&bucket->lock);
|
||||
return ci;
|
||||
}
|
||||
|
||||
|
|
@ -822,8 +889,8 @@ static void cp_stat_walk_callback(ConnectionPool *cp,
|
|||
|
||||
stat = args;
|
||||
stat->server_count++;
|
||||
stat->connection.total_count += cm->total_count;
|
||||
stat->connection.free_count += cm->free_count;
|
||||
stat->connection.total_count += FC_ATOMIC_GET(cm->total_count);
|
||||
stat->connection.free_count += FC_ATOMIC_GET(cm->free_count);
|
||||
}
|
||||
|
||||
void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat)
|
||||
|
|
|
|||
|
|
@ -147,21 +147,24 @@ struct tagConnectionManager;
|
|||
typedef struct tagConnectionNode {
|
||||
ConnectionInfo *conn;
|
||||
struct tagConnectionManager *manager;
|
||||
struct tagConnectionNode *next;
|
||||
struct tagConnectionNode *next; //for thread local
|
||||
time_t atime; //last access time
|
||||
} ConnectionNode;
|
||||
|
||||
typedef struct tagConnectionManager {
|
||||
string_t key;
|
||||
ConnectionNode *head;
|
||||
int total_count; //total connections
|
||||
int free_count; //free connections
|
||||
struct tagConnectionManager *next;
|
||||
struct {
|
||||
ConnectionNode **nodes;
|
||||
volatile int64_t head; //producer
|
||||
volatile int64_t tail; //consumer
|
||||
} ring;
|
||||
volatile int total_count; //total connections
|
||||
volatile int free_count; //free connections
|
||||
volatile struct tagConnectionManager *next;
|
||||
} ConnectionManager;
|
||||
|
||||
typedef struct tagConnectionBucket {
|
||||
ConnectionManager *head;
|
||||
pthread_mutex_t lock;
|
||||
volatile ConnectionManager *head;
|
||||
} ConnectionBucket;
|
||||
|
||||
struct tagConnectionPool;
|
||||
|
|
@ -179,6 +182,7 @@ typedef struct tagConnectionPool {
|
|||
|
||||
int connect_timeout_ms;
|
||||
int max_count_per_entry; //0 means no limit
|
||||
int ring_size;
|
||||
|
||||
/*
|
||||
connections whose idle time exceeds this time will be closed
|
||||
|
|
|
|||
|
|
@ -477,8 +477,8 @@ static int fast_mblock_prealloc(struct fast_mblock_man *mblock)
|
|||
#endif
|
||||
}
|
||||
|
||||
((struct fast_mblock_node *)pLast)->next = mblock->free_chain_head;
|
||||
mblock->free_chain_head = (struct fast_mblock_node *)pTrunkStart;
|
||||
((struct fast_mblock_node *)pLast)->next = mblock->freelist.head;
|
||||
mblock->freelist.head = (struct fast_mblock_node *)pTrunkStart;
|
||||
|
||||
pMallocNode->ref_count = 0;
|
||||
pMallocNode->alloc_count = alloc_count;
|
||||
|
|
@ -558,7 +558,7 @@ int fast_mblock_init_ex2(struct fast_mblock_man *mblock, const char *name,
|
|||
mblock->info.trunk_total_count = 0;
|
||||
mblock->info.trunk_used_count = 0;
|
||||
mblock->info.delay_free_elements = 0;
|
||||
mblock->free_chain_head = NULL;
|
||||
mblock->freelist.head = NULL;
|
||||
mblock->delay_free_chain.head = NULL;
|
||||
mblock->delay_free_chain.tail = NULL;
|
||||
mblock->info.element_total_count = 0;
|
||||
|
|
@ -714,7 +714,7 @@ void fast_mblock_destroy(struct fast_mblock_man *mblock)
|
|||
INIT_HEAD(&mblock->trunks.head);
|
||||
mblock->info.trunk_total_count = 0;
|
||||
mblock->info.trunk_used_count = 0;
|
||||
mblock->free_chain_head = NULL;
|
||||
mblock->freelist.head = NULL;
|
||||
mblock->info.element_used_count = 0;
|
||||
mblock->info.delay_free_elements = 0;
|
||||
mblock->info.element_total_count = 0;
|
||||
|
|
@ -732,10 +732,10 @@ static inline struct fast_mblock_node *alloc_node(
|
|||
|
||||
while (1)
|
||||
{
|
||||
if (mblock->free_chain_head != NULL)
|
||||
if (mblock->freelist.head != NULL)
|
||||
{
|
||||
pNode = mblock->free_chain_head;
|
||||
mblock->free_chain_head = pNode->next;
|
||||
pNode = mblock->freelist.head;
|
||||
mblock->freelist.head = pNode->next;
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
@ -757,8 +757,8 @@ static inline struct fast_mblock_node *alloc_node(
|
|||
|
||||
if ((result=fast_mblock_prealloc(mblock)) == 0)
|
||||
{
|
||||
pNode = mblock->free_chain_head;
|
||||
mblock->free_chain_head = pNode->next;
|
||||
pNode = mblock->freelist.head;
|
||||
mblock->freelist.head = pNode->next;
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
@ -829,9 +829,9 @@ int fast_mblock_free(struct fast_mblock_man *mblock,
|
|||
return result;
|
||||
}
|
||||
|
||||
notify = (mblock->free_chain_head == NULL);
|
||||
pNode->next = mblock->free_chain_head;
|
||||
mblock->free_chain_head = pNode;
|
||||
notify = (mblock->freelist.head == NULL);
|
||||
pNode->next = mblock->freelist.head;
|
||||
mblock->freelist.head = pNode;
|
||||
fast_mblock_ref_counter_dec(mblock, pNode);
|
||||
|
||||
if (mblock->alloc_elements.need_wait && notify)
|
||||
|
|
@ -855,18 +855,21 @@ static inline void batch_free(struct fast_mblock_man *mblock,
|
|||
struct fast_mblock_chain *chain)
|
||||
{
|
||||
bool notify;
|
||||
int count;
|
||||
struct fast_mblock_node *pNode;
|
||||
|
||||
count = 0;
|
||||
pNode = chain->head;
|
||||
while (pNode != NULL)
|
||||
{
|
||||
++count;
|
||||
fast_mblock_ref_counter_dec(mblock, pNode);
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
notify = (mblock->free_chain_head == NULL);
|
||||
chain->tail->next = mblock->free_chain_head;
|
||||
mblock->free_chain_head = chain->head;
|
||||
notify = (mblock->freelist.head == NULL);
|
||||
chain->tail->next = mblock->freelist.head;
|
||||
mblock->freelist.head = chain->head;
|
||||
if (mblock->alloc_elements.need_wait && notify)
|
||||
{
|
||||
pthread_cond_broadcast(&mblock->lcp.cond);
|
||||
|
|
@ -1066,7 +1069,7 @@ static int fast_mblock_chain_count(struct fast_mblock_man *mblock,
|
|||
|
||||
int fast_mblock_free_count(struct fast_mblock_man *mblock)
|
||||
{
|
||||
return fast_mblock_chain_count(mblock, mblock->free_chain_head);
|
||||
return fast_mblock_chain_count(mblock, mblock->freelist.head);
|
||||
}
|
||||
|
||||
int fast_mblock_delay_free_count(struct fast_mblock_man *mblock)
|
||||
|
|
@ -1088,8 +1091,8 @@ static int fast_mblock_do_reclaim(struct fast_mblock_man *mblock,
|
|||
*reclaim_count = 0;
|
||||
freelist = NULL;
|
||||
free_chain_prev = NULL;
|
||||
current = mblock->free_chain_head;
|
||||
mblock->free_chain_head = NULL;
|
||||
current = mblock->freelist.head;
|
||||
mblock->freelist.head = NULL;
|
||||
while (current != NULL)
|
||||
{
|
||||
malloc_node = FAST_MBLOCK_GET_TRUNK(current);
|
||||
|
|
@ -1103,7 +1106,7 @@ static int fast_mblock_do_reclaim(struct fast_mblock_man *mblock,
|
|||
}
|
||||
else
|
||||
{
|
||||
mblock->free_chain_head = current;
|
||||
mblock->freelist.head = current;
|
||||
}
|
||||
|
||||
free_chain_prev = current;
|
||||
|
|
@ -1160,7 +1163,6 @@ OUTER:
|
|||
free_chain_prev->next = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
{
|
||||
bool old_need_lock;
|
||||
|
|
|
|||
|
|
@ -127,7 +127,11 @@ struct fast_mblock_man
|
|||
int64_t limit; //<= 0 for no limit
|
||||
bool *pcontinue_flag;
|
||||
} alloc_elements;
|
||||
struct fast_mblock_node *free_chain_head; //free node chain
|
||||
|
||||
struct {
|
||||
struct fast_mblock_node *head;
|
||||
} freelist; //free node chain
|
||||
|
||||
struct fast_mblock_trunks trunks;
|
||||
struct fast_mblock_chain delay_free_chain; //delay free node chain
|
||||
|
||||
|
|
@ -407,7 +411,7 @@ return the delay free node count of the mblock, return -1 if fail
|
|||
*/
|
||||
int fast_mblock_delay_free_count(struct fast_mblock_man *mblock);
|
||||
|
||||
#define fast_mblock_total_count(mblock) (mblock)->total_count
|
||||
#define fast_mblock_total_count(mblock) (mblock)->info.element_total_count
|
||||
|
||||
/**
|
||||
init mblock manager
|
||||
|
|
|
|||
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Copyright (c) 2025 YuQing <384681@qq.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the Lesser GNU General Public License, version 3
|
||||
* or later ("LGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the Lesser GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "common_define.h"
|
||||
#ifdef OS_LINUX
|
||||
#include <sys/syscall.h>
|
||||
#include <linux/futex.h>
|
||||
#define FUTEX(ptr, op, val) syscall(SYS_futex, ptr, op, val, NULL, NULL, 0)
|
||||
#else
|
||||
#include "pthread_func.h"
|
||||
#endif
|
||||
|
||||
#include "logger.h"
|
||||
#include "spinlock.h"
|
||||
|
||||
#ifdef OS_LINUX
|
||||
static inline int fc_futex(int *ptr, int op, int val)
|
||||
{
|
||||
if (FUTEX(ptr, op, val) == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return errno != 0 ? errno : EBUSY;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int fc_spinlock_init(FCSpinlock *lock, int *cond)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
lock->cond = cond;
|
||||
return pthread_spin_init(&lock->mutex, 0);
|
||||
#else
|
||||
return init_pthread_lock_cond_pair(&lock->lcp);
|
||||
#endif
|
||||
}
|
||||
|
||||
void fc_spinlock_destroy(FCSpinlock *lock)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
#else
|
||||
destroy_pthread_lock_cond_pair(&lock->lcp);
|
||||
#endif
|
||||
}
|
||||
|
||||
int fc_spinlock_lock(FCSpinlock *lock)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
return pthread_spin_lock(&lock->mutex);
|
||||
#else
|
||||
return pthread_mutex_lock(&lock->lcp.lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
int fc_spinlock_trylock(FCSpinlock *lock)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
return pthread_spin_trylock(&lock->mutex);
|
||||
#else
|
||||
return pthread_mutex_trylock(&lock->lcp.lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
int fc_spinlock_unlock(FCSpinlock *lock)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
return pthread_spin_unlock(&lock->mutex);
|
||||
#else
|
||||
return pthread_mutex_unlock(&lock->lcp.lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
int fc_spinlock_wait(FCSpinlock *lock, const int expected)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
int result;
|
||||
int lock_ret;
|
||||
|
||||
if ((result=pthread_spin_unlock(&lock->mutex)) != 0) {
|
||||
return result;
|
||||
}
|
||||
result = fc_futex(lock->cond, FUTEX_WAIT_PRIVATE, expected);
|
||||
lock_ret = pthread_spin_lock(&lock->mutex);
|
||||
return result == 0 ? lock_ret : result;
|
||||
#else
|
||||
return pthread_cond_wait(&lock->lcp.cond, &lock->lcp.lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
int fc_spinlock_wake_ex(FCSpinlock *lock, const int count)
|
||||
{
|
||||
#ifdef OS_LINUX
|
||||
if (FUTEX(lock->cond, FUTEX_WAKE_PRIVATE, count) >= 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return errno != 0 ? errno : EBUSY;
|
||||
}
|
||||
#else
|
||||
if (count == 1) {
|
||||
return pthread_cond_signal(&lock->lcp.cond);
|
||||
} else {
|
||||
return pthread_cond_broadcast(&lock->lcp.cond);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright (c) 2025 YuQing <384681@qq.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the Lesser GNU General Public License, version 3
|
||||
* or later ("LGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the Lesser GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef FC_SPINLOCK_H
|
||||
#define FC_SPINLOCK_H
|
||||
|
||||
#include <pthread.h>
|
||||
#include "common_define.h"
|
||||
|
||||
typedef struct fc_spinlock_t {
|
||||
#ifdef OS_LINUX
|
||||
pthread_spinlock_t mutex;
|
||||
int *cond;
|
||||
#else
|
||||
pthread_lock_cond_pair_t lcp;
|
||||
#endif
|
||||
} FCSpinlock;
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int fc_spinlock_init(FCSpinlock *lock, int *cond);
|
||||
|
||||
void fc_spinlock_destroy(FCSpinlock *lock);
|
||||
|
||||
int fc_spinlock_lock(FCSpinlock *lock);
|
||||
|
||||
int fc_spinlock_trylock(FCSpinlock *lock);
|
||||
|
||||
int fc_spinlock_unlock(FCSpinlock *lock);
|
||||
|
||||
int fc_spinlock_wait(FCSpinlock *lock, const int expected);
|
||||
|
||||
int fc_spinlock_wake_ex(FCSpinlock *lock, const int count);
|
||||
|
||||
static inline int fc_spinlock_wake(FCSpinlock *lock)
|
||||
{
|
||||
const int count = 1;
|
||||
return fc_spinlock_wake_ex(lock, count);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
|
@ -11,7 +11,7 @@ ALL_PRGS = test_allocator test_skiplist test_multi_skiplist test_mblock test_blo
|
|||
test_server_id_func test_pipe test_atomic test_file_write_hole test_file_lock \
|
||||
test_pthread_wait test_thread_pool test_data_visible test_mutex_lock_perf \
|
||||
test_queue_perf test_normalize_path test_sorted_array test_sorted_queue \
|
||||
test_thread_local test_memcpy
|
||||
test_thread_local test_memcpy mblock_benchmark cpool_benchmark
|
||||
|
||||
all: $(ALL_PRGS)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,211 @@
|
|||
/*
|
||||
* Copyright (c) 2025 YuQing <384681@qq.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the Lesser GNU General Public License, version 3
|
||||
* or later ("LGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the Lesser GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
#include <time.h>
|
||||
#include <inttypes.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/stat.h>
|
||||
#include "fastcommon/logger.h"
|
||||
#include "fastcommon/shared_func.h"
|
||||
#include "fastcommon/pthread_func.h"
|
||||
#include "fastcommon/connection_pool.h"
|
||||
|
||||
#define USE_CONN_POOL 1
|
||||
//#define USE_CAS_LOCK 1
|
||||
|
||||
static int thread_count = 24;
|
||||
static int64_t loop_count = 10000000;
|
||||
static ConnectionPool cpool;
|
||||
static char buff[1024];
|
||||
static pthread_mutex_t lock;
|
||||
volatile int mutex = 0;
|
||||
|
||||
#ifndef USE_CONN_POOL
|
||||
static int64_t test_var1 = 0;
|
||||
static int64_t test_var2 = 0;
|
||||
#endif
|
||||
|
||||
static inline void conn_pool_get_key(const ConnectionInfo *conn,
|
||||
char *key, int *key_len)
|
||||
{
|
||||
*key_len = strlen(conn->ip_addr);
|
||||
memcpy(key, conn->ip_addr, *key_len);
|
||||
*(key + (*key_len)++) = '-';
|
||||
*(key + (*key_len)++) = (conn->port >> 8) & 0xFF;
|
||||
*(key + (*key_len)++) = conn->port & 0xFF;
|
||||
}
|
||||
|
||||
static void *thread_run(void *args)
|
||||
{
|
||||
int thread_index;
|
||||
int result;
|
||||
int64_t i;
|
||||
ConnectionInfo cinfo;
|
||||
#ifdef USE_CONN_POOL
|
||||
ConnectionInfo *conn;
|
||||
#endif
|
||||
|
||||
thread_index = (long)args;
|
||||
printf("thread #%d start\n", thread_index);
|
||||
|
||||
if ((result=conn_pool_parse_server_info("127.0.0.1", &cinfo, 23000)) != 0) {
|
||||
return NULL;
|
||||
}
|
||||
for (i=0; i<loop_count; i++) {
|
||||
#ifdef USE_CONN_POOL
|
||||
if ((conn=conn_pool_get_connection_ex(&cpool, &cinfo,
|
||||
NULL, true, &result)) == NULL)
|
||||
{
|
||||
break;
|
||||
}
|
||||
//fc_sleep_us(1);
|
||||
conn_pool_close_connection(&cpool, conn);
|
||||
#else
|
||||
char key_buff[INET6_ADDRSTRLEN + 8];
|
||||
string_t key;
|
||||
uint32_t hash_code;
|
||||
for (int j=0; j<2; j++) {
|
||||
key.str = key_buff;
|
||||
conn_pool_get_key(&cinfo, key.str, &key.len);
|
||||
hash_code = fc_simple_hash(key.str, key.len);
|
||||
#ifdef USE_CAS_LOCK
|
||||
while (!__sync_bool_compare_and_swap(&mutex, 0, 1)) {
|
||||
sched_yield();
|
||||
}
|
||||
__sync_fetch_and_add(&test_var1, 1);
|
||||
__sync_fetch_and_add(&test_var2, 1);
|
||||
#else
|
||||
PTHREAD_MUTEX_LOCK(&lock);
|
||||
test_var1++;
|
||||
test_var2++;
|
||||
#endif
|
||||
#ifdef USE_CAS_LOCK
|
||||
__sync_bool_compare_and_swap(&mutex, 1, 0);
|
||||
#else
|
||||
PTHREAD_MUTEX_UNLOCK(&lock);
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
if (i == loop_count) {
|
||||
printf("thread #%d done\n", thread_index);
|
||||
} else {
|
||||
printf("thread #%d loop count: %"PRId64"\n", thread_index, i);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
const int stack_size = 64 * 1024;
|
||||
const int connect_timeout = 2;
|
||||
const int max_count_per_entry = 0;
|
||||
const int max_idle_time = 3600;
|
||||
const int htable_capacity = 163;
|
||||
const int extra_data_size = 0;
|
||||
int result;
|
||||
int i;
|
||||
int64_t qps;
|
||||
pthread_t *tids;
|
||||
pthread_attr_t thread_attr;
|
||||
int64_t start_time;
|
||||
int64_t end_time;
|
||||
int64_t time_used;
|
||||
ConnectionExtraParams params;
|
||||
|
||||
log_init();
|
||||
g_log_context.log_level = LOG_INFO;
|
||||
g_schedule_flag = true;
|
||||
g_current_time = time(NULL);
|
||||
|
||||
memset(¶ms, 0, sizeof(params));
|
||||
params.tls.enabled = false;
|
||||
params.tls.htable_capacity = 13;
|
||||
if ((result=conn_pool_init_ex1(&cpool, connect_timeout, max_count_per_entry,
|
||||
max_idle_time, htable_capacity, NULL, NULL, NULL, NULL,
|
||||
extra_data_size, ¶ms)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
memset(buff, 0, sizeof(buff));
|
||||
if ((result=init_pthread_lock(&lock)) != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
if ((result=pthread_attr_init(&thread_attr)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"call pthread_attr_init fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
if ((result=pthread_attr_setstacksize(&thread_attr,
|
||||
stack_size)) != 0)
|
||||
{
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"call pthread_attr_setstacksize to %d fail, "
|
||||
"errno: %d, error info: %s", __LINE__,
|
||||
stack_size, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
tids = fc_malloc(sizeof(pthread_t) * thread_count);
|
||||
|
||||
start_time = get_current_time_us();
|
||||
for (i=0; i<thread_count; i++) {
|
||||
if ((result=pthread_create(tids + i, &thread_attr,
|
||||
thread_run, (void *)((long)i))) != 0)
|
||||
{
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"create thread failed, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
for (i=0; i<thread_count; i++) {
|
||||
pthread_join(tids[i], NULL);
|
||||
}
|
||||
|
||||
end_time = get_current_time_us();
|
||||
time_used = end_time - start_time;
|
||||
qps = (thread_count * loop_count * 1000 * 1000) / time_used;
|
||||
printf("time used: %"PRId64" ms, QPS: %"PRId64"\n", time_used / 1000, qps);
|
||||
|
||||
{
|
||||
ConnectionPoolStat stat;
|
||||
conn_pool_stat(&cpool, &stat);
|
||||
printf("htable_capacity: %d, bucket_used: %d, server_count: %d, "
|
||||
"connection {total_count: %d, free_count: %d}\n",
|
||||
stat.htable_capacity, stat.bucket_used, stat.server_count,
|
||||
stat.connection.total_count, stat.connection.free_count);
|
||||
}
|
||||
|
||||
free(tids);
|
||||
conn_pool_destroy(&cpool);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
* Copyright (c) 2025 YuQing <384681@qq.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the Lesser GNU General Public License, version 3
|
||||
* or later ("LGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the Lesser GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
#include <time.h>
|
||||
#include <inttypes.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/stat.h>
|
||||
#include "fastcommon/logger.h"
|
||||
#include "fastcommon/shared_func.h"
|
||||
#include "fastcommon/pthread_func.h"
|
||||
#include "fastcommon/fast_mblock.h"
|
||||
|
||||
static int thread_count = 4;
|
||||
static int64_t loop_count = 10000000;
|
||||
static struct fast_mblock_man mblock;
|
||||
|
||||
static void *thread_run(void *args)
|
||||
{
|
||||
int thread_index;
|
||||
int i;
|
||||
void *obj;
|
||||
|
||||
thread_index = (long)args;
|
||||
printf("thread #%d start\n", thread_index);
|
||||
|
||||
for (i=0; i<loop_count; i++) {
|
||||
obj = fast_mblock_alloc_object(&mblock);
|
||||
fast_mblock_free_object(&mblock, obj);
|
||||
}
|
||||
|
||||
printf("thread #%d done\n", thread_index);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
const int stack_size = 64 * 1024;
|
||||
int result;
|
||||
int limit;
|
||||
int i;
|
||||
bool continue_flag = true;
|
||||
int64_t qps;
|
||||
pthread_t *tids;
|
||||
pthread_attr_t thread_attr;
|
||||
int64_t start_time;
|
||||
int64_t end_time;
|
||||
int64_t time_used;
|
||||
|
||||
log_init();
|
||||
g_log_context.log_level = LOG_DEBUG;
|
||||
|
||||
limit = (thread_count + 1) / 2;
|
||||
fast_mblock_manager_init();
|
||||
if ((result=fast_mblock_init_ex1(&mblock, "mblock", 1024,
|
||||
limit, limit, NULL, NULL, true)) != 0)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
fast_mblock_set_need_wait(&mblock, true, &continue_flag);
|
||||
|
||||
if ((result=pthread_attr_init(&thread_attr)) != 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"call pthread_attr_init fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
if ((result=pthread_attr_setstacksize(&thread_attr,
|
||||
stack_size)) != 0)
|
||||
{
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"call pthread_attr_setstacksize to %d fail, "
|
||||
"errno: %d, error info: %s", __LINE__,
|
||||
stack_size, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
tids = fc_malloc(sizeof(pthread_t) * thread_count);
|
||||
|
||||
start_time = get_current_time_us();
|
||||
for (i=0; i<thread_count; i++) {
|
||||
if ((result=pthread_create(tids + i, &thread_attr,
|
||||
thread_run, (void *)((long)i))) != 0)
|
||||
{
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"create thread failed, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, result, STRERROR(result));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
for (i=0; i<thread_count; i++) {
|
||||
pthread_join(tids[i], NULL);
|
||||
}
|
||||
|
||||
end_time = get_current_time_us();
|
||||
time_used = end_time - start_time;
|
||||
qps = (thread_count * loop_count * 1000 * 1000) / time_used;
|
||||
printf("time used: %"PRId64" ms, QPS: %"PRId64"\n", time_used / 1000, qps);
|
||||
|
||||
free(tids);
|
||||
fast_mblock_manager_stat_print(false);
|
||||
fast_mblock_destroy(&mblock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -25,6 +25,7 @@
|
|||
#include <sys/file.h>
|
||||
#include "fastcommon/logger.h"
|
||||
#include "fastcommon/shared_func.h"
|
||||
#include "fastcommon/pthread_func.h"
|
||||
|
||||
#define OneArgument(a) printf("One Argument func is called!\n")
|
||||
#define TwoArguments(a, b) printf("Two Arguments func is called!\n")
|
||||
|
|
@ -36,7 +37,6 @@ static inline int get_lock_info(int fd, struct flock *lock)
|
|||
{
|
||||
int result;
|
||||
|
||||
memset(lock, 0, sizeof(struct flock));
|
||||
lock->l_whence = SEEK_SET;
|
||||
lock->l_type = F_WRLCK;
|
||||
lock->l_pid = getpid();
|
||||
|
|
@ -53,6 +53,69 @@ static inline int get_lock_info(int fd, struct flock *lock)
|
|||
return result;
|
||||
}
|
||||
|
||||
static inline int set_lock(int fd, const int operation,
|
||||
const int start, const int length)
|
||||
{
|
||||
int result;
|
||||
struct flock lock;
|
||||
|
||||
memset(&lock, 0, sizeof(struct flock));
|
||||
lock.l_whence = SEEK_SET;
|
||||
lock.l_type = operation;
|
||||
lock.l_start = start;
|
||||
lock.l_len = length;
|
||||
lock.l_pid = getpid();
|
||||
do {
|
||||
if ((result=fcntl(fd, F_SETLKW, &lock)) != 0)
|
||||
{
|
||||
result = errno != 0 ? errno : ENOMEM;
|
||||
fprintf(stderr, "line: %d, call fcntl fail, "
|
||||
"errno: %d, error info: %s\n", __LINE__,
|
||||
result, STRERROR(result));
|
||||
} else {
|
||||
printf("line: %d, call fcntl %d result: %d\n",
|
||||
__LINE__, operation, result);
|
||||
}
|
||||
} while (result == EINTR);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static void *unlock_thread(void *args)
|
||||
{
|
||||
char *filename;
|
||||
int result;
|
||||
int fd;
|
||||
struct flock lock;
|
||||
|
||||
filename = (char *)args;
|
||||
fd = open(filename, O_RDWR | O_CREAT, 0644);
|
||||
if (fd < 0) {
|
||||
result = errno != 0 ? errno : EIO;
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"open file %s fail, "
|
||||
"errno: %d, error info: %s",
|
||||
__LINE__, filename,
|
||||
result, STRERROR(result));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(&lock, 0, sizeof(struct flock));
|
||||
lock.l_start = 100;
|
||||
if ((result=get_lock_info(fd, &lock)) == 0) {
|
||||
logInfo("lock info: { type: %d, whence: %d, start: %"PRId64", "
|
||||
"len: %"PRId64", pid: %d }",
|
||||
lock.l_type, lock.l_whence, (int64_t)lock.l_start,
|
||||
(int64_t)lock.l_len, lock.l_pid);
|
||||
}
|
||||
|
||||
//set_lock(fd, F_WRLCK, 0, 0);
|
||||
sleep(5);
|
||||
//set_lock(fd, F_UNLCK, 0, 0);
|
||||
close(fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
#define SEEK_POS (2 * 1024)
|
||||
|
|
@ -61,12 +124,15 @@ int main(int argc, char *argv[])
|
|||
int result;
|
||||
int sleep_seconds;
|
||||
int n = 0;
|
||||
pthread_t tid;
|
||||
char buf[1024];
|
||||
struct flock lock;
|
||||
|
||||
Macro(1);
|
||||
Macro(1, 2);
|
||||
Macro(1, 2, 3);
|
||||
|
||||
printf("%0*d\n", 3, 1);
|
||||
if (argc < 2) {
|
||||
fprintf(stderr, "Usage: %s <filename>\n", argv[0]);
|
||||
return 1;
|
||||
|
|
@ -80,7 +146,7 @@ int main(int argc, char *argv[])
|
|||
sleep_seconds = 1;
|
||||
}
|
||||
|
||||
fd = open(filename, O_RDWR | O_CREAT, 0644);
|
||||
fd = open(filename, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
|
||||
if (fd < 0) {
|
||||
result = errno != 0 ? errno : EIO;
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
|
|
@ -91,6 +157,61 @@ int main(int argc, char *argv[])
|
|||
return result;
|
||||
}
|
||||
|
||||
{
|
||||
int flags;
|
||||
flags = fcntl(fd, F_GETFD, 0);
|
||||
if (flags < 0)
|
||||
{
|
||||
logError("file: "__FILE__", line: %d, " \
|
||||
"fcntl failed, errno: %d, error info: %s.", \
|
||||
__LINE__, errno, STRERROR(errno));
|
||||
return errno != 0 ? errno : EACCES;
|
||||
}
|
||||
|
||||
printf("flags: %d, on: %d\n", flags, (flags & FD_CLOEXEC));
|
||||
|
||||
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) < 0)
|
||||
{
|
||||
logError("file: "__FILE__", line: %d, " \
|
||||
"fcntl failed, errno: %d, error info: %s.", \
|
||||
__LINE__, errno, STRERROR(errno));
|
||||
return errno != 0 ? errno : EACCES;
|
||||
}
|
||||
flags = fcntl(fd, F_GETFD, 0);
|
||||
printf("flags: %d, on: %d\n", flags, (flags & FD_CLOEXEC));
|
||||
}
|
||||
|
||||
|
||||
fork();
|
||||
|
||||
memset(&lock, 0, sizeof(struct flock));
|
||||
lock.l_start = 1024;
|
||||
if ((result=get_lock_info(fd, &lock)) == 0) {
|
||||
logInfo("pid: %d, lock info: { type: %d, whence: %d, "
|
||||
"start: %"PRId64", len: %"PRId64", pid: %d }", getpid(),
|
||||
lock.l_type, lock.l_whence, (int64_t)lock.l_start,
|
||||
(int64_t)lock.l_len, lock.l_pid);
|
||||
}
|
||||
|
||||
set_lock(fd, F_WRLCK, 0, 10);
|
||||
set_lock(fd, F_WRLCK, 10, 10);
|
||||
set_lock(fd, F_WRLCK, 30, 10);
|
||||
//set_lock(fd, F_WRLCK, 0, 0);
|
||||
//set_lock(fd, F_UNLCK, 0, 10);
|
||||
//set_lock(fd, F_UNLCK, 5, 35);
|
||||
|
||||
fc_create_thread(&tid, unlock_thread, filename, 64 * 1024);
|
||||
|
||||
sleep(100);
|
||||
memset(&lock, 0, sizeof(struct flock));
|
||||
lock.l_start = 100;
|
||||
if ((result=get_lock_info(fd, &lock)) == 0) {
|
||||
logInfo("lock info: { type: %d, whence: %d, start: %"PRId64", "
|
||||
"len: %"PRId64", pid: %d }",
|
||||
lock.l_type, lock.l_whence, (int64_t)lock.l_start,
|
||||
(int64_t)lock.l_len, lock.l_pid);
|
||||
}
|
||||
|
||||
if (flock(fd, LOCK_EX) < 0) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"flock file %s fail, "
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ int main(int argc, char *argv[])
|
|||
*/
|
||||
|
||||
//fd = open(filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
|
||||
fd = open(filename, O_RDWR | O_CREAT, 0644);
|
||||
fd = open(filename, O_RDWR | O_CREAT | O_APPEND, 0644);
|
||||
if (fd < 0) {
|
||||
result = errno != 0 ? errno : EIO;
|
||||
logError("file: "__FILE__", line: %d, " \
|
||||
|
|
@ -69,7 +69,8 @@ int main(int argc, char *argv[])
|
|||
return errno != 0 ? errno : EIO;
|
||||
}
|
||||
|
||||
if (lseek(fd, SEEK_POS, SEEK_SET) < 0) {
|
||||
for (int i=0; i<5; i++) {
|
||||
if (lseek(fd, 0, SEEK_SET) < 0) {
|
||||
logError("file: "__FILE__", line: %d, " \
|
||||
"lseek file %s fail, " \
|
||||
"errno: %d, error info: %s", __LINE__, \
|
||||
|
|
@ -86,6 +87,7 @@ int main(int argc, char *argv[])
|
|||
return errno != 0 ? errno : EIO;
|
||||
}
|
||||
printf("write bytes: %d\n", n);
|
||||
}
|
||||
|
||||
if (lseek(fd, 0, SEEK_SET) < 0) {
|
||||
logError("file: "__FILE__", line: %d, " \
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ int main(int argc, char *argv[])
|
|||
printf("cpu count: %d\n", get_sys_cpu_count());
|
||||
|
||||
end_time = get_current_time_ms();
|
||||
logInfo("time used: %d ms", (int)(end_time - start_time));
|
||||
logInfo("time used: %d ms\n", (int)(end_time - start_time));
|
||||
|
||||
fast_mblock_manager_init();
|
||||
|
||||
|
|
@ -182,19 +182,33 @@ int main(int argc, char *argv[])
|
|||
sched_add_delay_task(test_delay, objs + i, delay, false);
|
||||
}
|
||||
|
||||
printf("mblock1 total count: %"PRId64", free count: %u\n",
|
||||
fast_mblock_total_count(&mblock1),
|
||||
fast_mblock_free_count(&mblock1));
|
||||
|
||||
/*
|
||||
for (i=0; i<count; i++)
|
||||
{
|
||||
fast_mblock_free_object(&mblock1, objs[i]);
|
||||
fast_mblock_free_object(objs[i].mblock, objs[i].obj);
|
||||
}
|
||||
*/
|
||||
|
||||
obj1 = fast_mblock_alloc_object(&mblock1);
|
||||
obj2 = fast_mblock_alloc_object(&mblock1);
|
||||
fast_mblock_free_object(&mblock1, obj1);
|
||||
|
||||
|
||||
printf("mblock1 total count: %"PRId64", free count: %u\n",
|
||||
fast_mblock_total_count(&mblock1),
|
||||
fast_mblock_free_count(&mblock1));
|
||||
|
||||
//fast_mblock_delay_free_object(&mblock1, obj2, 10);
|
||||
fast_mblock_free_object(&mblock1, obj2);
|
||||
|
||||
printf("mblock1 total count: %"PRId64", free count: %u\n\n",
|
||||
fast_mblock_total_count(&mblock1),
|
||||
fast_mblock_free_count(&mblock1));
|
||||
|
||||
obj1 = fast_mblock_alloc_object(&mblock2);
|
||||
obj2 = fast_mblock_alloc_object(&mblock2);
|
||||
fast_mblock_delay_free_object(&mblock2, obj1, 20);
|
||||
|
|
@ -206,6 +220,13 @@ int main(int argc, char *argv[])
|
|||
fast_mblock_reclaim(&mblock1, reclaim_target, &reclaim_count, NULL);
|
||||
fast_mblock_reclaim(&mblock2, reclaim_target, &reclaim_count, NULL);
|
||||
|
||||
printf("\nmblock1 total count: %"PRId64", free count: %u, "
|
||||
"mblock2 total count: %"PRId64", free count: %u\n\n",
|
||||
fast_mblock_total_count(&mblock1),
|
||||
fast_mblock_free_count(&mblock1),
|
||||
fast_mblock_total_count(&mblock2),
|
||||
fast_mblock_free_count(&mblock2));
|
||||
|
||||
fast_mblock_manager_stat_print(false);
|
||||
|
||||
sleep(31);
|
||||
|
|
@ -217,11 +238,25 @@ int main(int argc, char *argv[])
|
|||
fast_mblock_reclaim(&mblock1, reclaim_target, &reclaim_count, NULL);
|
||||
fast_mblock_reclaim(&mblock2, reclaim_target, &reclaim_count, NULL);
|
||||
|
||||
printf("\nmblock1 total count: %"PRId64", free count: %u, "
|
||||
"mblock2 total count: %"PRId64", free count: %u\n\n",
|
||||
fast_mblock_total_count(&mblock1),
|
||||
fast_mblock_free_count(&mblock1),
|
||||
fast_mblock_total_count(&mblock2),
|
||||
fast_mblock_free_count(&mblock2));
|
||||
|
||||
fast_mblock_manager_stat_print(false);
|
||||
|
||||
obj1 = fast_mblock_alloc_object(&mblock1);
|
||||
obj2 = fast_mblock_alloc_object(&mblock2);
|
||||
|
||||
printf("mblock1 total count: %"PRId64", free count: %u, "
|
||||
"mblock2 total count: %"PRId64", free count: %u\n\n",
|
||||
fast_mblock_total_count(&mblock1),
|
||||
fast_mblock_free_count(&mblock1),
|
||||
fast_mblock_total_count(&mblock2),
|
||||
fast_mblock_free_count(&mblock2));
|
||||
|
||||
fast_mblock_manager_stat_print(false);
|
||||
|
||||
fast_mblock_destroy(&mblock1);
|
||||
|
|
|
|||
|
|
@ -84,6 +84,56 @@ static void sigQuitHandler(int sig)
|
|||
__LINE__, sig);
|
||||
}
|
||||
|
||||
static void test_remove()
|
||||
{
|
||||
#define COUNT 8
|
||||
typedef struct node {
|
||||
int n;
|
||||
struct node *next;
|
||||
} Node;
|
||||
struct fc_queue queue;
|
||||
Node nodes[COUNT];
|
||||
Node *node;
|
||||
Node *end;
|
||||
int result;
|
||||
|
||||
if ((result=fc_queue_init(&queue, (long)&((Node *)NULL)->next)) != 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
fc_queue_push(&queue, nodes);
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, nodes + 1));
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, nodes + 2));
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, nodes));
|
||||
printf("count: %d\n", fc_queue_count(&queue));
|
||||
|
||||
end = nodes + COUNT / 2;
|
||||
for (node=nodes; node<end; node++) {
|
||||
node->n = (node - nodes) + 1;
|
||||
fc_queue_push(&queue, node);
|
||||
}
|
||||
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, node));
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, node - 1));
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, node - 3));
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, nodes));
|
||||
|
||||
end = nodes + COUNT;
|
||||
for (; node<end; node++) {
|
||||
node->n = (node - nodes) + 1;
|
||||
fc_queue_push(&queue, node);
|
||||
}
|
||||
printf("count: %d\n\n", fc_queue_count(&queue));
|
||||
|
||||
for (node=end-1; node>=nodes; node--) {
|
||||
printf("remove: %d\n", fc_queue_remove(&queue, node));
|
||||
fc_queue_push(&queue, node);
|
||||
}
|
||||
|
||||
printf("count: %d\n", fc_queue_count(&queue));
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
const int alloc_elements_once = 8 * 1024;
|
||||
|
|
@ -107,6 +157,9 @@ int main(int argc, char *argv[])
|
|||
log_init();
|
||||
g_log_context.log_level = LOG_DEBUG;
|
||||
|
||||
test_remove();
|
||||
return 0;
|
||||
|
||||
memset(&act, 0, sizeof(act));
|
||||
sigemptyset(&act.sa_mask);
|
||||
act.sa_handler = sigQuitHandler;
|
||||
|
|
|
|||
Loading…
Reference in New Issue