connection_pool.[hc]: use CAS instead of pthread mutex lock

use_iouring
YuQing 2025-08-02 19:16:03 +08:00
parent dd0d4dbc19
commit 9acc202481
8 changed files with 487 additions and 141 deletions

View File

@ -1,7 +1,8 @@
Version 1.78 2025-07-22
Version 1.78 2025-08-02
* getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6
* add files: spinlock.[hc]
* connection_pool.[hc]: use CAS instead of pthread mutex lock
Version 1.77 2025-03-18
* impl. shorten_path for /./ and /../

View File

@ -23,6 +23,7 @@
#include "shared_func.h"
#include "sched_thread.h"
#include "server_id_func.h"
#include "fc_atomic.h"
#include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks = {
@ -55,14 +56,16 @@ static inline void conn_pool_get_key(const ConnectionInfo *conn,
*key_len = strlen(conn->ip_addr);
memcpy(key, conn->ip_addr, *key_len);
*(key + (*key_len)++) = '-';
*(key + (*key_len)++) = (conn->port >> 8) & 0xFF;
*(key + (*key_len)++) = conn->port & 0xFF;
*key_len += fc_itoa(conn->port, key + (*key_len));
}
static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
ConnectionInfo *conn, const bool bForce)
{
ConnectionNode *node;
int64_t index;
int64_t head;
int64_t tail;
char formatted_ip[FORMATTED_IP_SIZE];
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
@ -75,80 +78,109 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
return EINVAL;
}
if (bForce)
{
cm->total_count--;
while (1) {
head = FC_ATOMIC_GET(cm->ring.head);
tail = FC_ATOMIC_GET(cm->ring.tail);
if (bForce || (head - tail) >= cp->ring_size - 1) {
FC_ATOMIC_DEC(cm->total_count);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"server %s:%u, release connection: %d, "
"total_count: %d, free_count: %d",
__LINE__, formatted_ip, conn->port,
conn->sock, cm->total_count, cm->free_count);
}
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"server %s:%u, release connection: %d, "
"total_count: %d, free_count: %d",
__LINE__, formatted_ip, conn->port,
conn->sock, cm->total_count, cm->free_count);
}
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
close_connection(conn);
fast_mblock_free_object(&cp->node_allocator, node);
G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
close_connection(conn);
fast_mblock_free_object(&cp->node_allocator, node);
node = cm->head;
while (node != NULL)
{
node->conn->validate_flag = true;
node = node->next;
}
}
else
{
node->atime = get_current_time();
node->next = cm->head;
cm->head = node;
cm->free_count++;
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"server %s:%u, free connection: %d, "
"total_count: %d, free_count: %d",
__LINE__, formatted_ip, conn->port,
conn->sock, cm->total_count, cm->free_count);
}
}
return 0;
}
static ConnectionManager *find_manager(ConnectionPool *cp,
ConnectionBucket *bucket, const string_t *key,
const bool need_create)
{
ConnectionManager *cm;
if (bucket->head != NULL)
{
if (fc_string_equal(&bucket->head->key, key)) //fast path
{
return bucket->head;
}
else
{
cm = bucket->head->next;
while (cm != NULL)
{
if (fc_string_equal(&cm->key, key))
{
return cm;
if (bForce) {
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) {
node = (ConnectionNode *)FC_ATOMIC_GET(
cm->ring.nodes[index % cp->ring_size]);
if (node != NULL) {
node->conn->validate_flag = true;
}
}
cm = cm->next;
}
break;
}
if (__sync_bool_compare_and_swap(&cm->ring.nodes[
head % cp->ring_size], NULL, node))
{
if (__sync_bool_compare_and_swap(&cm->ring.head, head, head + 1)) {
node->atime = get_current_time();
FC_ATOMIC_INC(cm->free_count);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"server %s:%u, free connection: %d, "
"total_count: %d, free_count: %d",
__LINE__, formatted_ip, conn->port,
conn->sock, cm->total_count, cm->free_count);
}
break;
} else { //rollback
__sync_bool_compare_and_swap(&cm->ring.nodes[
head % cp->ring_size], node, NULL);
sched_yield();
}
}
}
if (!need_create)
return 0;
}
static inline ConnectionManager *do_find_manager(
ConnectionManager *head, const string_t *key)
{
ConnectionManager *cm;
if (fc_string_equal(&head->key, key)) //fast path
{
return head;
}
cm = (ConnectionManager *)FC_ATOMIC_GET(head->next);
while (cm != NULL)
{
if (fc_string_equal(&cm->key, key))
{
return cm;
}
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
}
return NULL;
}
static ConnectionManager *find_manager(ConnectionBucket *bucket,
const string_t *key)
{
ConnectionManager *head;
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
if (head == NULL)
{
return NULL;
}
return do_find_manager(head, key);
}
static ConnectionManager *create_manager(ConnectionPool *cp,
ConnectionBucket *bucket, ConnectionManager *old_head,
const string_t *key, int *result)
{
ConnectionManager *cm;
char *buff;
int aligned_key_len;
int bytes;
cm = (ConnectionManager *)fast_mblock_alloc_object(
&cp->manager_allocator);
@ -157,23 +189,65 @@ static ConnectionManager *find_manager(ConnectionPool *cp,
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__,
(int)sizeof(ConnectionManager));
*result = ENOMEM;
return NULL;
}
cm->head = NULL;
cm->ring.head = cm->ring.tail = 0;
cm->total_count = 0;
cm->free_count = 0;
if ((cm->key.str=fc_malloc(key->len + 1)) == NULL)
aligned_key_len = MEM_ALIGN(key->len);
bytes = aligned_key_len + sizeof(ConnectionNode *) * cp->ring_size;
if ((buff=fc_malloc(bytes)) == NULL)
{
*result = ENOMEM;
return NULL;
}
memcpy(cm->key.str, key->str, key->len + 1);
memset(buff, 0, bytes);
cm->key.str = buff;
cm->ring.nodes = (ConnectionNode **)(buff + aligned_key_len);
memcpy(cm->key.str, key->str, key->len);
cm->key.len = key->len;
//add to manager chain
cm->next = bucket->head;
bucket->head = cm;
return cm;
FC_ATOMIC_SET(cm->next, old_head);
if (__sync_bool_compare_and_swap(&bucket->head, old_head, cm))
{
return cm;
}
else
{
fast_mblock_free_object(&cp->manager_allocator, cm);
*result = EAGAIN;
return NULL;
}
}
static ConnectionManager *get_manager(ConnectionPool *cp,
ConnectionBucket *bucket, const string_t *key)
{
ConnectionManager *head;
ConnectionManager *cm;
int result = 0;
do {
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
if (head != NULL)
{
if ((cm=do_find_manager(head, key)) != NULL)
{
return cm;
}
}
if ((cm=create_manager(cp, bucket, head, key, &result)) != NULL)
{
return cm;
}
} while (result == EAGAIN);
return NULL;
}
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
@ -185,8 +259,7 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
int result;
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
pthread_mutex_lock(&bucket->lock);
if ((cm=find_manager(cp, bucket, key, false)) != NULL)
if ((cm=find_manager(bucket, key)) != NULL)
{
result = close_conn(cp, cm, conn, bForce);
}
@ -198,7 +271,6 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
__LINE__, formatted_ip, conn->port);
result = ENOENT;
}
pthread_mutex_unlock(&bucket->lock);
return result;
}
@ -239,7 +311,6 @@ static void cp_tls_destroy(void *ptr)
static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
{
int bytes;
int result;
unsigned int *hash_capacity;
ConnectionBucket *bucket;
ConnectionBucket *end;
@ -264,10 +335,6 @@ static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
{
bucket->head = NULL;
if ((result=init_pthread_lock(&bucket->lock)) != 0)
{
return result;
}
}
return 0;
@ -288,12 +355,12 @@ int conn_pool_init_ex1(ConnectionPool *cp, const int connect_timeout,
cp->connect_timeout_ms = connect_timeout * 1000;
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
cp->ring_size = max_count_per_entry > 0 ? max_count_per_entry : 1024;
cp->extra_data_size = extra_data_size;
cp->connect_done_callback.func = connect_done_func;
cp->connect_done_callback.args = connect_done_args;
cp->validate_callback.func = validate_func;
cp->validate_callback.args = validate_args;
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager",
sizeof(ConnectionManager), 256, alloc_elements_limit,
NULL, NULL, true)) != 0)
@ -356,31 +423,31 @@ static void conn_pool_hash_walk(ConnectionPool *cp,
end = cp->hashtable.buckets + cp->hashtable.capacity;
for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
{
pthread_mutex_lock(&bucket->lock);
cm = bucket->head;
cm = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
while (cm != NULL)
{
current = cm;
cm = cm->next;
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
callback(cp, current, args);
}
pthread_mutex_unlock(&bucket->lock);
}
}
static void cp_destroy_walk_callback(ConnectionPool *cp,
ConnectionManager *cm, void *args)
{
int64_t index;
int64_t head;
ConnectionNode *node;
ConnectionNode *deleted;
node = cm->head;
while (node != NULL)
{
deleted = node;
node = node->next;
G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type].
close_connection(deleted->conn);
head = FC_ATOMIC_GET(cm->ring.head);
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) {
node = (ConnectionNode *)FC_ATOMIC_GET(
cm->ring.nodes[index % cp->ring_size]);
if (node != NULL) {
G_COMMON_CONNECTION_CALLBACKS[node->conn->comm_type].
close_connection(node->conn);
}
}
free(cm->key.str);
@ -388,19 +455,11 @@ static void cp_destroy_walk_callback(ConnectionPool *cp,
void conn_pool_destroy(ConnectionPool *cp)
{
ConnectionBucket *bucket;
ConnectionBucket *end;
if (cp->hashtable.buckets == NULL) {
return;
}
conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp);
end = cp->hashtable.buckets + cp->hashtable.capacity;
for (bucket=cp->hashtable.buckets; bucket<end; bucket++) {
pthread_mutex_destroy(&bucket->lock);
}
free(cp->hashtable.buckets);
cp->hashtable.buckets = NULL;
@ -498,10 +557,11 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
}
static ConnectionInfo *get_conn(ConnectionPool *cp,
ConnectionManager *cm, pthread_mutex_t *lock,
const ConnectionInfo *conn, const char *service_name,
int *err_no)
ConnectionManager *cm, const ConnectionInfo *conn,
const char *service_name, int *err_no)
{
int64_t tail;
int index;
ConnectionNode *node;
ConnectionInfo *ci;
char formatted_ip[FORMATTED_IP_SIZE];
@ -510,10 +570,11 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
current_time = get_current_time();
while (1)
{
if (cm->head == NULL)
tail = FC_ATOMIC_GET(cm->ring.tail);
if (tail == FC_ATOMIC_GET(cm->ring.head)) //empty
{
if ((cp->max_count_per_entry > 0) &&
(cm->total_count >= cp->max_count_per_entry))
if ((cp->max_count_per_entry > 0) && FC_ATOMIC_GET(
cm->total_count) >= cp->max_count_per_entry)
{
format_ip_address(conn->ip_addr, formatted_ip);
*err_no = ENOSPC;
@ -539,9 +600,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
node->manager = cm;
node->next = NULL;
node->atime = 0;
cm->total_count++;
pthread_mutex_unlock(lock);
FC_ATOMIC_INC(cm->total_count);
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
node->conn->port = conn->port;
@ -563,8 +622,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
close_connection(node->conn);
fast_mblock_free_object(&cp->node_allocator, node);
pthread_mutex_lock(lock);
cm->total_count--; //rollback
FC_ATOMIC_DEC(cm->total_count); //rollback
return NULL;
}
@ -577,19 +635,27 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
node->conn->sock, cm->total_count,
cm->free_count);
}
pthread_mutex_lock(lock);
return node->conn;
}
else
{
bool invalid;
node = cm->head;
ci = node->conn;
cm->head = node->next;
cm->free_count--;
if (!__sync_bool_compare_and_swap(&cm->ring.tail, tail, tail + 1)) {
sched_yield();
continue;
}
index = tail % cp->ring_size;
node = (ConnectionNode *)FC_ATOMIC_GET(
cm->ring.nodes[index]);
if (node == NULL) {
continue;
}
__sync_bool_compare_and_swap(&cm->ring.nodes[index], node, NULL);
ci = node->conn;
FC_ATOMIC_DEC(cm->free_count);
if (current_time - node->atime > cp->max_idle_time)
{
if (cp->validate_callback.func != NULL)
@ -619,8 +685,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
if (invalid)
{
cm->total_count--;
FC_ATOMIC_DEC(cm->total_count);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
@ -663,10 +728,9 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
ConnectionInfo *ci;
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
pthread_mutex_lock(&bucket->lock);
if ((cm=find_manager(cp, bucket, key, true)) != NULL)
if ((cm=get_manager(cp, bucket, key)) != NULL)
{
ci = get_conn(cp, cm, &bucket->lock, conn, service_name, err_no);
ci = get_conn(cp, cm, conn, service_name, err_no);
if (ci != NULL)
{
ci->shared = shared;
@ -677,7 +741,6 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
*err_no = ENOMEM;
ci = NULL;
}
pthread_mutex_unlock(&bucket->lock);
return ci;
}
@ -826,8 +889,8 @@ static void cp_stat_walk_callback(ConnectionPool *cp,
stat = args;
stat->server_count++;
stat->connection.total_count += cm->total_count;
stat->connection.free_count += cm->free_count;
stat->connection.total_count += FC_ATOMIC_GET(cm->total_count);
stat->connection.free_count += FC_ATOMIC_GET(cm->free_count);
}
void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat)

View File

@ -147,21 +147,24 @@ struct tagConnectionManager;
typedef struct tagConnectionNode {
ConnectionInfo *conn;
struct tagConnectionManager *manager;
struct tagConnectionNode *next;
struct tagConnectionNode *next; //for thread local
time_t atime; //last access time
} ConnectionNode;
typedef struct tagConnectionManager {
string_t key;
ConnectionNode *head;
int total_count; //total connections
int free_count; //free connections
struct tagConnectionManager *next;
struct {
ConnectionNode **nodes;
volatile int64_t head; //producer
volatile int64_t tail; //consumer
} ring;
volatile int total_count; //total connections
volatile int free_count; //free connections
volatile struct tagConnectionManager *next;
} ConnectionManager;
typedef struct tagConnectionBucket {
ConnectionManager *head;
pthread_mutex_t lock;
volatile ConnectionManager *head;
} ConnectionBucket;
struct tagConnectionPool;
@ -179,6 +182,7 @@ typedef struct tagConnectionPool {
int connect_timeout_ms;
int max_count_per_entry; //0 means no limit
int ring_size;
/*
connections whose idle time exceeds this time will be closed

View File

@ -27,9 +27,30 @@
#include "fastcommon/pthread_func.h"
#include "fastcommon/connection_pool.h"
static int thread_count = 2;
#define USE_CONN_POOL 1
//#define USE_CAS_LOCK 1
static int thread_count = 24;
static int64_t loop_count = 10000000;
static ConnectionPool cpool;
static char buff[1024];
static pthread_mutex_t lock;
volatile int mutex = 0;
#ifndef USE_CONN_POOL
static int64_t test_var1 = 0;
static int64_t test_var2 = 0;
#endif
static inline void conn_pool_get_key(const ConnectionInfo *conn,
char *key, int *key_len)
{
*key_len = strlen(conn->ip_addr);
memcpy(key, conn->ip_addr, *key_len);
*(key + (*key_len)++) = '-';
*(key + (*key_len)++) = (conn->port >> 8) & 0xFF;
*(key + (*key_len)++) = conn->port & 0xFF;
}
static void *thread_run(void *args)
{
@ -37,21 +58,52 @@ static void *thread_run(void *args)
int result;
int64_t i;
ConnectionInfo cinfo;
#ifdef USE_CONN_POOL
ConnectionInfo *conn;
#endif
thread_index = (long)args;
printf("thread #%d start\n", thread_index);
if ((result=conn_pool_parse_server_info("127.0.0.1:23000", &cinfo, 23000)) != 0) {
if ((result=conn_pool_parse_server_info("127.0.0.1", &cinfo, 23000)) != 0) {
return NULL;
}
for (i=0; i<loop_count; i++) {
#ifdef USE_CONN_POOL
if ((conn=conn_pool_get_connection_ex(&cpool, &cinfo,
NULL, true, &result)) == NULL)
{
break;
}
//fc_sleep_us(1);
conn_pool_close_connection(&cpool, conn);
#else
char key_buff[INET6_ADDRSTRLEN + 8];
string_t key;
uint32_t hash_code;
for (int j=0; j<2; j++) {
key.str = key_buff;
conn_pool_get_key(&cinfo, key.str, &key.len);
hash_code = fc_simple_hash(key.str, key.len);
#ifdef USE_CAS_LOCK
while (!__sync_bool_compare_and_swap(&mutex, 0, 1)) {
sched_yield();
}
__sync_fetch_and_add(&test_var1, 1);
__sync_fetch_and_add(&test_var2, 1);
#else
PTHREAD_MUTEX_LOCK(&lock);
test_var1++;
test_var2++;
#endif
#ifdef USE_CAS_LOCK
__sync_bool_compare_and_swap(&mutex, 1, 0);
#else
PTHREAD_MUTEX_UNLOCK(&lock);
#endif
}
#endif
}
if (i == loop_count) {
@ -96,6 +148,12 @@ int main(int argc, char *argv[])
return result;
}
memset(buff, 0, sizeof(buff));
if ((result=init_pthread_lock(&lock)) != 0) {
return result;
}
if ((result=pthread_attr_init(&thread_attr)) != 0) {
logError("file: "__FILE__", line: %d, "
"call pthread_attr_init fail, "
@ -137,6 +195,15 @@ int main(int argc, char *argv[])
qps = (thread_count * loop_count * 1000 * 1000) / time_used;
printf("time used: %"PRId64" ms, QPS: %"PRId64"\n", time_used / 1000, qps);
{
ConnectionPoolStat stat;
conn_pool_stat(&cpool, &stat);
printf("htable_capacity: %d, bucket_used: %d, server_count: %d, "
"connection {total_count: %d, free_count: %d}\n",
stat.htable_capacity, stat.bucket_used, stat.server_count,
stat.connection.total_count, stat.connection.free_count);
}
free(tids);
conn_pool_destroy(&cpool);

View File

@ -25,6 +25,7 @@
#include <sys/file.h>
#include "fastcommon/logger.h"
#include "fastcommon/shared_func.h"
#include "fastcommon/pthread_func.h"
#define OneArgument(a) printf("One Argument func is called!\n")
#define TwoArguments(a, b) printf("Two Arguments func is called!\n")
@ -36,7 +37,6 @@ static inline int get_lock_info(int fd, struct flock *lock)
{
int result;
memset(lock, 0, sizeof(struct flock));
lock->l_whence = SEEK_SET;
lock->l_type = F_WRLCK;
lock->l_pid = getpid();
@ -53,6 +53,69 @@ static inline int get_lock_info(int fd, struct flock *lock)
return result;
}
static inline int set_lock(int fd, const int operation,
const int start, const int length)
{
int result;
struct flock lock;
memset(&lock, 0, sizeof(struct flock));
lock.l_whence = SEEK_SET;
lock.l_type = operation;
lock.l_start = start;
lock.l_len = length;
lock.l_pid = getpid();
do {
if ((result=fcntl(fd, F_SETLKW, &lock)) != 0)
{
result = errno != 0 ? errno : ENOMEM;
fprintf(stderr, "line: %d, call fcntl fail, "
"errno: %d, error info: %s\n", __LINE__,
result, STRERROR(result));
} else {
printf("line: %d, call fcntl %d result: %d\n",
__LINE__, operation, result);
}
} while (result == EINTR);
return result;
}
static void *unlock_thread(void *args)
{
char *filename;
int result;
int fd;
struct flock lock;
filename = (char *)args;
fd = open(filename, O_RDWR | O_CREAT, 0644);
if (fd < 0) {
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, "
"open file %s fail, "
"errno: %d, error info: %s",
__LINE__, filename,
result, STRERROR(result));
return NULL;
}
memset(&lock, 0, sizeof(struct flock));
lock.l_start = 100;
if ((result=get_lock_info(fd, &lock)) == 0) {
logInfo("lock info: { type: %d, whence: %d, start: %"PRId64", "
"len: %"PRId64", pid: %d }",
lock.l_type, lock.l_whence, (int64_t)lock.l_start,
(int64_t)lock.l_len, lock.l_pid);
}
//set_lock(fd, F_WRLCK, 0, 0);
sleep(5);
//set_lock(fd, F_UNLCK, 0, 0);
close(fd);
return NULL;
}
int main(int argc, char *argv[])
{
#define SEEK_POS (2 * 1024)
@ -61,12 +124,15 @@ int main(int argc, char *argv[])
int result;
int sleep_seconds;
int n = 0;
pthread_t tid;
char buf[1024];
struct flock lock;
Macro(1);
Macro(1, 2);
Macro(1, 2, 3);
printf("%0*d\n", 3, 1);
if (argc < 2) {
fprintf(stderr, "Usage: %s <filename>\n", argv[0]);
return 1;
@ -80,7 +146,7 @@ int main(int argc, char *argv[])
sleep_seconds = 1;
}
fd = open(filename, O_RDWR | O_CREAT, 0644);
fd = open(filename, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
if (fd < 0) {
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, "
@ -91,6 +157,61 @@ int main(int argc, char *argv[])
return result;
}
{
int flags;
flags = fcntl(fd, F_GETFD, 0);
if (flags < 0)
{
logError("file: "__FILE__", line: %d, " \
"fcntl failed, errno: %d, error info: %s.", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
printf("flags: %d, on: %d\n", flags, (flags & FD_CLOEXEC));
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) < 0)
{
logError("file: "__FILE__", line: %d, " \
"fcntl failed, errno: %d, error info: %s.", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EACCES;
}
flags = fcntl(fd, F_GETFD, 0);
printf("flags: %d, on: %d\n", flags, (flags & FD_CLOEXEC));
}
fork();
memset(&lock, 0, sizeof(struct flock));
lock.l_start = 1024;
if ((result=get_lock_info(fd, &lock)) == 0) {
logInfo("pid: %d, lock info: { type: %d, whence: %d, "
"start: %"PRId64", len: %"PRId64", pid: %d }", getpid(),
lock.l_type, lock.l_whence, (int64_t)lock.l_start,
(int64_t)lock.l_len, lock.l_pid);
}
set_lock(fd, F_WRLCK, 0, 10);
set_lock(fd, F_WRLCK, 10, 10);
set_lock(fd, F_WRLCK, 30, 10);
//set_lock(fd, F_WRLCK, 0, 0);
//set_lock(fd, F_UNLCK, 0, 10);
//set_lock(fd, F_UNLCK, 5, 35);
fc_create_thread(&tid, unlock_thread, filename, 64 * 1024);
sleep(100);
memset(&lock, 0, sizeof(struct flock));
lock.l_start = 100;
if ((result=get_lock_info(fd, &lock)) == 0) {
logInfo("lock info: { type: %d, whence: %d, start: %"PRId64", "
"len: %"PRId64", pid: %d }",
lock.l_type, lock.l_whence, (int64_t)lock.l_start,
(int64_t)lock.l_len, lock.l_pid);
}
if (flock(fd, LOCK_EX) < 0) {
logError("file: "__FILE__", line: %d, "
"flock file %s fail, "

View File

@ -50,7 +50,7 @@ int main(int argc, char *argv[])
*/
//fd = open(filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
fd = open(filename, O_RDWR | O_CREAT, 0644);
fd = open(filename, O_RDWR | O_CREAT | O_APPEND, 0644);
if (fd < 0) {
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, " \
@ -69,7 +69,8 @@ int main(int argc, char *argv[])
return errno != 0 ? errno : EIO;
}
if (lseek(fd, SEEK_POS, SEEK_SET) < 0) {
for (int i=0; i<5; i++) {
if (lseek(fd, 0, SEEK_SET) < 0) {
logError("file: "__FILE__", line: %d, " \
"lseek file %s fail, " \
"errno: %d, error info: %s", __LINE__, \
@ -86,6 +87,7 @@ int main(int argc, char *argv[])
return errno != 0 ? errno : EIO;
}
printf("write bytes: %d\n", n);
}
if (lseek(fd, 0, SEEK_SET) < 0) {
logError("file: "__FILE__", line: %d, " \

View File

@ -162,7 +162,7 @@ int main(int argc, char *argv[])
printf("cpu count: %d\n", get_sys_cpu_count());
end_time = get_current_time_ms();
logInfo("time used: %d ms", (int)(end_time - start_time));
logInfo("time used: %d ms\n", (int)(end_time - start_time));
fast_mblock_manager_init();
@ -182,19 +182,33 @@ int main(int argc, char *argv[])
sched_add_delay_task(test_delay, objs + i, delay, false);
}
printf("mblock1 total count: %"PRId64", free count: %u\n",
fast_mblock_total_count(&mblock1),
fast_mblock_free_count(&mblock1));
/*
for (i=0; i<count; i++)
{
fast_mblock_free_object(&mblock1, objs[i]);
fast_mblock_free_object(objs[i].mblock, objs[i].obj);
}
*/
obj1 = fast_mblock_alloc_object(&mblock1);
obj2 = fast_mblock_alloc_object(&mblock1);
fast_mblock_free_object(&mblock1, obj1);
printf("mblock1 total count: %"PRId64", free count: %u\n",
fast_mblock_total_count(&mblock1),
fast_mblock_free_count(&mblock1));
//fast_mblock_delay_free_object(&mblock1, obj2, 10);
fast_mblock_free_object(&mblock1, obj2);
printf("mblock1 total count: %"PRId64", free count: %u\n\n",
fast_mblock_total_count(&mblock1),
fast_mblock_free_count(&mblock1));
obj1 = fast_mblock_alloc_object(&mblock2);
obj2 = fast_mblock_alloc_object(&mblock2);
fast_mblock_delay_free_object(&mblock2, obj1, 20);
@ -206,6 +220,13 @@ int main(int argc, char *argv[])
fast_mblock_reclaim(&mblock1, reclaim_target, &reclaim_count, NULL);
fast_mblock_reclaim(&mblock2, reclaim_target, &reclaim_count, NULL);
printf("\nmblock1 total count: %"PRId64", free count: %u, "
"mblock2 total count: %"PRId64", free count: %u\n\n",
fast_mblock_total_count(&mblock1),
fast_mblock_free_count(&mblock1),
fast_mblock_total_count(&mblock2),
fast_mblock_free_count(&mblock2));
fast_mblock_manager_stat_print(false);
sleep(31);
@ -217,11 +238,25 @@ int main(int argc, char *argv[])
fast_mblock_reclaim(&mblock1, reclaim_target, &reclaim_count, NULL);
fast_mblock_reclaim(&mblock2, reclaim_target, &reclaim_count, NULL);
printf("\nmblock1 total count: %"PRId64", free count: %u, "
"mblock2 total count: %"PRId64", free count: %u\n\n",
fast_mblock_total_count(&mblock1),
fast_mblock_free_count(&mblock1),
fast_mblock_total_count(&mblock2),
fast_mblock_free_count(&mblock2));
fast_mblock_manager_stat_print(false);
obj1 = fast_mblock_alloc_object(&mblock1);
obj2 = fast_mblock_alloc_object(&mblock2);
printf("mblock1 total count: %"PRId64", free count: %u, "
"mblock2 total count: %"PRId64", free count: %u\n\n",
fast_mblock_total_count(&mblock1),
fast_mblock_free_count(&mblock1),
fast_mblock_total_count(&mblock2),
fast_mblock_free_count(&mblock2));
fast_mblock_manager_stat_print(false);
fast_mblock_destroy(&mblock1);

View File

@ -84,6 +84,56 @@ static void sigQuitHandler(int sig)
__LINE__, sig);
}
static void test_remove()
{
#define COUNT 8
typedef struct node {
int n;
struct node *next;
} Node;
struct fc_queue queue;
Node nodes[COUNT];
Node *node;
Node *end;
int result;
if ((result=fc_queue_init(&queue, (long)&((Node *)NULL)->next)) != 0)
{
return;
}
fc_queue_push(&queue, nodes);
printf("remove: %d\n", fc_queue_remove(&queue, nodes + 1));
printf("remove: %d\n", fc_queue_remove(&queue, nodes + 2));
printf("remove: %d\n", fc_queue_remove(&queue, nodes));
printf("count: %d\n", fc_queue_count(&queue));
end = nodes + COUNT / 2;
for (node=nodes; node<end; node++) {
node->n = (node - nodes) + 1;
fc_queue_push(&queue, node);
}
printf("remove: %d\n", fc_queue_remove(&queue, node));
printf("remove: %d\n", fc_queue_remove(&queue, node - 1));
printf("remove: %d\n", fc_queue_remove(&queue, node - 3));
printf("remove: %d\n", fc_queue_remove(&queue, nodes));
end = nodes + COUNT;
for (; node<end; node++) {
node->n = (node - nodes) + 1;
fc_queue_push(&queue, node);
}
printf("count: %d\n\n", fc_queue_count(&queue));
for (node=end-1; node>=nodes; node--) {
printf("remove: %d\n", fc_queue_remove(&queue, node));
fc_queue_push(&queue, node);
}
printf("count: %d\n", fc_queue_count(&queue));
}
int main(int argc, char *argv[])
{
const int alloc_elements_once = 8 * 1024;
@ -107,6 +157,9 @@ int main(int argc, char *argv[])
log_init();
g_log_context.log_level = LOG_DEBUG;
test_remove();
return 0;
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigQuitHandler;