From 9acc202481ee9e27175ae7717db006cea0481c98 Mon Sep 17 00:00:00 2001
From: YuQing <384681@qq.com>
Date: Sat, 2 Aug 2025 19:16:03 +0800
Subject: [PATCH] connection_pool.[hc]: use CAS instead of pthread mutex lock
---
HISTORY | 3 +-
src/connection_pool.c | 313 +++++++++++++++++++------------
src/connection_pool.h | 18 +-
src/tests/cpool_benchmark.c | 71 ++++++-
src/tests/test_file_lock.c | 125 +++++++++++-
src/tests/test_file_write_hole.c | 6 +-
src/tests/test_mblock.c | 39 +++-
src/tests/test_queue_perf.c | 53 ++++++
8 files changed, 487 insertions(+), 141 deletions(-)
diff --git a/HISTORY b/HISTORY
index 0d9a01d..e0e69cd 100644
--- a/HISTORY
+++ b/HISTORY
@@ -1,7 +1,8 @@
-Version 1.78 2025-07-22
+Version 1.78 2025-08-02
* getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6
* add files: spinlock.[hc]
+ * connection_pool.[hc]: use CAS instead of pthread mutex lock
Version 1.77 2025-03-18
* impl. shorten_path for /./ and /../
diff --git a/src/connection_pool.c b/src/connection_pool.c
index 9490dd5..10ecb6d 100644
--- a/src/connection_pool.c
+++ b/src/connection_pool.c
@@ -23,6 +23,7 @@
#include "shared_func.h"
#include "sched_thread.h"
#include "server_id_func.h"
+#include "fc_atomic.h"
#include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks = {
@@ -55,14 +56,16 @@ static inline void conn_pool_get_key(const ConnectionInfo *conn,
*key_len = strlen(conn->ip_addr);
memcpy(key, conn->ip_addr, *key_len);
*(key + (*key_len)++) = '-';
- *(key + (*key_len)++) = (conn->port >> 8) & 0xFF;
- *(key + (*key_len)++) = conn->port & 0xFF;
+ *key_len += fc_itoa(conn->port, key + (*key_len));
}
static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
ConnectionInfo *conn, const bool bForce)
{
ConnectionNode *node;
+ int64_t index;
+ int64_t head;
+ int64_t tail;
char formatted_ip[FORMATTED_IP_SIZE];
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
@@ -75,80 +78,109 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
return EINVAL;
}
- if (bForce)
- {
- cm->total_count--;
+ while (1) {
+ head = FC_ATOMIC_GET(cm->ring.head);
+ tail = FC_ATOMIC_GET(cm->ring.tail);
+ if (bForce || (head - tail) >= cp->ring_size - 1) {
+ FC_ATOMIC_DEC(cm->total_count);
- if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
- format_ip_address(conn->ip_addr, formatted_ip);
- logDebug("file: "__FILE__", line: %d, "
- "server %s:%u, release connection: %d, "
- "total_count: %d, free_count: %d",
- __LINE__, formatted_ip, conn->port,
- conn->sock, cm->total_count, cm->free_count);
- }
+ if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
+ format_ip_address(conn->ip_addr, formatted_ip);
+ logDebug("file: "__FILE__", line: %d, "
+ "server %s:%u, release connection: %d, "
+ "total_count: %d, free_count: %d",
+ __LINE__, formatted_ip, conn->port,
+ conn->sock, cm->total_count, cm->free_count);
+ }
- G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
- close_connection(conn);
- fast_mblock_free_object(&cp->node_allocator, node);
+ G_COMMON_CONNECTION_CALLBACKS[conn->comm_type].
+ close_connection(conn);
+ fast_mblock_free_object(&cp->node_allocator, node);
- node = cm->head;
- while (node != NULL)
- {
- node->conn->validate_flag = true;
- node = node->next;
- }
- }
- else
- {
- node->atime = get_current_time();
- node->next = cm->head;
- cm->head = node;
- cm->free_count++;
-
- if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
- format_ip_address(conn->ip_addr, formatted_ip);
- logDebug("file: "__FILE__", line: %d, "
- "server %s:%u, free connection: %d, "
- "total_count: %d, free_count: %d",
- __LINE__, formatted_ip, conn->port,
- conn->sock, cm->total_count, cm->free_count);
- }
- }
-
- return 0;
-}
-
-static ConnectionManager *find_manager(ConnectionPool *cp,
- ConnectionBucket *bucket, const string_t *key,
- const bool need_create)
-{
- ConnectionManager *cm;
-
- if (bucket->head != NULL)
- {
- if (fc_string_equal(&bucket->head->key, key)) //fast path
- {
- return bucket->head;
- }
- else
- {
- cm = bucket->head->next;
- while (cm != NULL)
- {
- if (fc_string_equal(&cm->key, key))
- {
- return cm;
+ if (bForce) {
+ for (index=FC_ATOMIC_GET(cm->ring.tail); index
ring.nodes[index % cp->ring_size]);
+ if (node != NULL) {
+ node->conn->validate_flag = true;
+ }
}
- cm = cm->next;
+ }
+
+ break;
+ }
+
+ if (__sync_bool_compare_and_swap(&cm->ring.nodes[
+ head % cp->ring_size], NULL, node))
+ {
+ if (__sync_bool_compare_and_swap(&cm->ring.head, head, head + 1)) {
+ node->atime = get_current_time();
+ FC_ATOMIC_INC(cm->free_count);
+
+ if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
+ format_ip_address(conn->ip_addr, formatted_ip);
+ logDebug("file: "__FILE__", line: %d, "
+ "server %s:%u, free connection: %d, "
+ "total_count: %d, free_count: %d",
+ __LINE__, formatted_ip, conn->port,
+ conn->sock, cm->total_count, cm->free_count);
+ }
+
+ break;
+ } else { //rollback
+ __sync_bool_compare_and_swap(&cm->ring.nodes[
+ head % cp->ring_size], node, NULL);
+ sched_yield();
}
}
}
- if (!need_create)
+ return 0;
+}
+
+static inline ConnectionManager *do_find_manager(
+ ConnectionManager *head, const string_t *key)
+{
+ ConnectionManager *cm;
+
+ if (fc_string_equal(&head->key, key)) //fast path
+ {
+ return head;
+ }
+
+ cm = (ConnectionManager *)FC_ATOMIC_GET(head->next);
+ while (cm != NULL)
+ {
+ if (fc_string_equal(&cm->key, key))
+ {
+ return cm;
+ }
+ cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
+ }
+ return NULL;
+}
+
+static ConnectionManager *find_manager(ConnectionBucket *bucket,
+ const string_t *key)
+{
+ ConnectionManager *head;
+
+ head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
+ if (head == NULL)
{
return NULL;
}
+ return do_find_manager(head, key);
+}
+
+static ConnectionManager *create_manager(ConnectionPool *cp,
+ ConnectionBucket *bucket, ConnectionManager *old_head,
+ const string_t *key, int *result)
+{
+ ConnectionManager *cm;
+ char *buff;
+ int aligned_key_len;
+ int bytes;
cm = (ConnectionManager *)fast_mblock_alloc_object(
&cp->manager_allocator);
@@ -157,23 +189,65 @@ static ConnectionManager *find_manager(ConnectionPool *cp,
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__,
(int)sizeof(ConnectionManager));
+ *result = ENOMEM;
return NULL;
}
- cm->head = NULL;
+ cm->ring.head = cm->ring.tail = 0;
cm->total_count = 0;
cm->free_count = 0;
- if ((cm->key.str=fc_malloc(key->len + 1)) == NULL)
+
+ aligned_key_len = MEM_ALIGN(key->len);
+ bytes = aligned_key_len + sizeof(ConnectionNode *) * cp->ring_size;
+ if ((buff=fc_malloc(bytes)) == NULL)
{
+ *result = ENOMEM;
return NULL;
}
- memcpy(cm->key.str, key->str, key->len + 1);
+ memset(buff, 0, bytes);
+ cm->key.str = buff;
+ cm->ring.nodes = (ConnectionNode **)(buff + aligned_key_len);
+ memcpy(cm->key.str, key->str, key->len);
cm->key.len = key->len;
//add to manager chain
- cm->next = bucket->head;
- bucket->head = cm;
- return cm;
+ FC_ATOMIC_SET(cm->next, old_head);
+ if (__sync_bool_compare_and_swap(&bucket->head, old_head, cm))
+ {
+ return cm;
+ }
+ else
+ {
+ fast_mblock_free_object(&cp->manager_allocator, cm);
+ *result = EAGAIN;
+ return NULL;
+ }
+}
+
+static ConnectionManager *get_manager(ConnectionPool *cp,
+ ConnectionBucket *bucket, const string_t *key)
+{
+ ConnectionManager *head;
+ ConnectionManager *cm;
+ int result = 0;
+
+ do {
+ head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
+ if (head != NULL)
+ {
+ if ((cm=do_find_manager(head, key)) != NULL)
+ {
+ return cm;
+ }
+ }
+
+ if ((cm=create_manager(cp, bucket, head, key, &result)) != NULL)
+ {
+ return cm;
+ }
+ } while (result == EAGAIN);
+
+ return NULL;
}
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
@@ -185,8 +259,7 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
int result;
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
- pthread_mutex_lock(&bucket->lock);
- if ((cm=find_manager(cp, bucket, key, false)) != NULL)
+ if ((cm=find_manager(bucket, key)) != NULL)
{
result = close_conn(cp, cm, conn, bForce);
}
@@ -198,7 +271,6 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
__LINE__, formatted_ip, conn->port);
result = ENOENT;
}
- pthread_mutex_unlock(&bucket->lock);
return result;
}
@@ -239,7 +311,6 @@ static void cp_tls_destroy(void *ptr)
static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
{
int bytes;
- int result;
unsigned int *hash_capacity;
ConnectionBucket *bucket;
ConnectionBucket *end;
@@ -264,10 +335,6 @@ static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
for (bucket=cp->hashtable.buckets; buckethead = NULL;
- if ((result=init_pthread_lock(&bucket->lock)) != 0)
- {
- return result;
- }
}
return 0;
@@ -288,12 +355,12 @@ int conn_pool_init_ex1(ConnectionPool *cp, const int connect_timeout,
cp->connect_timeout_ms = connect_timeout * 1000;
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
+ cp->ring_size = max_count_per_entry > 0 ? max_count_per_entry : 1024;
cp->extra_data_size = extra_data_size;
cp->connect_done_callback.func = connect_done_func;
cp->connect_done_callback.args = connect_done_args;
cp->validate_callback.func = validate_func;
cp->validate_callback.args = validate_args;
-
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager",
sizeof(ConnectionManager), 256, alloc_elements_limit,
NULL, NULL, true)) != 0)
@@ -356,31 +423,31 @@ static void conn_pool_hash_walk(ConnectionPool *cp,
end = cp->hashtable.buckets + cp->hashtable.capacity;
for (bucket=cp->hashtable.buckets; bucketlock);
- cm = bucket->head;
+ cm = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
while (cm != NULL)
{
current = cm;
- cm = cm->next;
+ cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next);
callback(cp, current, args);
}
- pthread_mutex_unlock(&bucket->lock);
}
}
static void cp_destroy_walk_callback(ConnectionPool *cp,
ConnectionManager *cm, void *args)
{
+ int64_t index;
+ int64_t head;
ConnectionNode *node;
- ConnectionNode *deleted;
- node = cm->head;
- while (node != NULL)
- {
- deleted = node;
- node = node->next;
- G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type].
- close_connection(deleted->conn);
+ head = FC_ATOMIC_GET(cm->ring.head);
+ for (index=FC_ATOMIC_GET(cm->ring.tail); indexring.nodes[index % cp->ring_size]);
+ if (node != NULL) {
+ G_COMMON_CONNECTION_CALLBACKS[node->conn->comm_type].
+ close_connection(node->conn);
+ }
}
free(cm->key.str);
@@ -388,19 +455,11 @@ static void cp_destroy_walk_callback(ConnectionPool *cp,
void conn_pool_destroy(ConnectionPool *cp)
{
- ConnectionBucket *bucket;
- ConnectionBucket *end;
-
if (cp->hashtable.buckets == NULL) {
return;
}
conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp);
-
- end = cp->hashtable.buckets + cp->hashtable.capacity;
- for (bucket=cp->hashtable.buckets; bucketlock);
- }
free(cp->hashtable.buckets);
cp->hashtable.buckets = NULL;
@@ -498,10 +557,11 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
}
static ConnectionInfo *get_conn(ConnectionPool *cp,
- ConnectionManager *cm, pthread_mutex_t *lock,
- const ConnectionInfo *conn, const char *service_name,
- int *err_no)
+ ConnectionManager *cm, const ConnectionInfo *conn,
+ const char *service_name, int *err_no)
{
+ int64_t tail;
+ int index;
ConnectionNode *node;
ConnectionInfo *ci;
char formatted_ip[FORMATTED_IP_SIZE];
@@ -510,10 +570,11 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
current_time = get_current_time();
while (1)
{
- if (cm->head == NULL)
+ tail = FC_ATOMIC_GET(cm->ring.tail);
+ if (tail == FC_ATOMIC_GET(cm->ring.head)) //empty
{
- if ((cp->max_count_per_entry > 0) &&
- (cm->total_count >= cp->max_count_per_entry))
+ if ((cp->max_count_per_entry > 0) && FC_ATOMIC_GET(
+ cm->total_count) >= cp->max_count_per_entry)
{
format_ip_address(conn->ip_addr, formatted_ip);
*err_no = ENOSPC;
@@ -539,9 +600,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
node->manager = cm;
node->next = NULL;
node->atime = 0;
-
- cm->total_count++;
- pthread_mutex_unlock(lock);
+ FC_ATOMIC_INC(cm->total_count);
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
node->conn->port = conn->port;
@@ -563,8 +622,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
close_connection(node->conn);
fast_mblock_free_object(&cp->node_allocator, node);
- pthread_mutex_lock(lock);
- cm->total_count--; //rollback
+ FC_ATOMIC_DEC(cm->total_count); //rollback
return NULL;
}
@@ -577,19 +635,27 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
node->conn->sock, cm->total_count,
cm->free_count);
}
-
- pthread_mutex_lock(lock);
return node->conn;
}
else
{
bool invalid;
- node = cm->head;
- ci = node->conn;
- cm->head = node->next;
- cm->free_count--;
+ if (!__sync_bool_compare_and_swap(&cm->ring.tail, tail, tail + 1)) {
+ sched_yield();
+ continue;
+ }
+ index = tail % cp->ring_size;
+ node = (ConnectionNode *)FC_ATOMIC_GET(
+ cm->ring.nodes[index]);
+ if (node == NULL) {
+ continue;
+ }
+ __sync_bool_compare_and_swap(&cm->ring.nodes[index], node, NULL);
+
+ ci = node->conn;
+ FC_ATOMIC_DEC(cm->free_count);
if (current_time - node->atime > cp->max_idle_time)
{
if (cp->validate_callback.func != NULL)
@@ -619,8 +685,7 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
if (invalid)
{
- cm->total_count--;
-
+ FC_ATOMIC_DEC(cm->total_count);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
@@ -663,10 +728,9 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
ConnectionInfo *ci;
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
- pthread_mutex_lock(&bucket->lock);
- if ((cm=find_manager(cp, bucket, key, true)) != NULL)
+ if ((cm=get_manager(cp, bucket, key)) != NULL)
{
- ci = get_conn(cp, cm, &bucket->lock, conn, service_name, err_no);
+ ci = get_conn(cp, cm, conn, service_name, err_no);
if (ci != NULL)
{
ci->shared = shared;
@@ -677,7 +741,6 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
*err_no = ENOMEM;
ci = NULL;
}
- pthread_mutex_unlock(&bucket->lock);
return ci;
}
@@ -826,8 +889,8 @@ static void cp_stat_walk_callback(ConnectionPool *cp,
stat = args;
stat->server_count++;
- stat->connection.total_count += cm->total_count;
- stat->connection.free_count += cm->free_count;
+ stat->connection.total_count += FC_ATOMIC_GET(cm->total_count);
+ stat->connection.free_count += FC_ATOMIC_GET(cm->free_count);
}
void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat)
diff --git a/src/connection_pool.h b/src/connection_pool.h
index b3f670c..ae2c649 100644
--- a/src/connection_pool.h
+++ b/src/connection_pool.h
@@ -147,21 +147,24 @@ struct tagConnectionManager;
typedef struct tagConnectionNode {
ConnectionInfo *conn;
struct tagConnectionManager *manager;
- struct tagConnectionNode *next;
+ struct tagConnectionNode *next; //for thread local
time_t atime; //last access time
} ConnectionNode;
typedef struct tagConnectionManager {
string_t key;
- ConnectionNode *head;
- int total_count; //total connections
- int free_count; //free connections
- struct tagConnectionManager *next;
+ struct {
+ ConnectionNode **nodes;
+ volatile int64_t head; //producer
+ volatile int64_t tail; //consumer
+ } ring;
+ volatile int total_count; //total connections
+ volatile int free_count; //free connections
+ volatile struct tagConnectionManager *next;
} ConnectionManager;
typedef struct tagConnectionBucket {
- ConnectionManager *head;
- pthread_mutex_t lock;
+ volatile ConnectionManager *head;
} ConnectionBucket;
struct tagConnectionPool;
@@ -179,6 +182,7 @@ typedef struct tagConnectionPool {
int connect_timeout_ms;
int max_count_per_entry; //0 means no limit
+ int ring_size;
/*
connections whose idle time exceeds this time will be closed
diff --git a/src/tests/cpool_benchmark.c b/src/tests/cpool_benchmark.c
index b2c49d7..4b88929 100644
--- a/src/tests/cpool_benchmark.c
+++ b/src/tests/cpool_benchmark.c
@@ -27,9 +27,30 @@
#include "fastcommon/pthread_func.h"
#include "fastcommon/connection_pool.h"
-static int thread_count = 2;
+#define USE_CONN_POOL 1
+//#define USE_CAS_LOCK 1
+
+static int thread_count = 24;
static int64_t loop_count = 10000000;
static ConnectionPool cpool;
+static char buff[1024];
+static pthread_mutex_t lock;
+volatile int mutex = 0;
+
+#ifndef USE_CONN_POOL
+static int64_t test_var1 = 0;
+static int64_t test_var2 = 0;
+#endif
+
+static inline void conn_pool_get_key(const ConnectionInfo *conn,
+ char *key, int *key_len)
+{
+ *key_len = strlen(conn->ip_addr);
+ memcpy(key, conn->ip_addr, *key_len);
+ *(key + (*key_len)++) = '-';
+ *(key + (*key_len)++) = (conn->port >> 8) & 0xFF;
+ *(key + (*key_len)++) = conn->port & 0xFF;
+}
static void *thread_run(void *args)
{
@@ -37,21 +58,52 @@ static void *thread_run(void *args)
int result;
int64_t i;
ConnectionInfo cinfo;
+#ifdef USE_CONN_POOL
ConnectionInfo *conn;
+#endif
thread_index = (long)args;
printf("thread #%d start\n", thread_index);
- if ((result=conn_pool_parse_server_info("127.0.0.1:23000", &cinfo, 23000)) != 0) {
+ if ((result=conn_pool_parse_server_info("127.0.0.1", &cinfo, 23000)) != 0) {
return NULL;
}
for (i=0; i
#include "fastcommon/logger.h"
#include "fastcommon/shared_func.h"
+#include "fastcommon/pthread_func.h"
#define OneArgument(a) printf("One Argument func is called!\n")
#define TwoArguments(a, b) printf("Two Arguments func is called!\n")
@@ -36,7 +37,6 @@ static inline int get_lock_info(int fd, struct flock *lock)
{
int result;
- memset(lock, 0, sizeof(struct flock));
lock->l_whence = SEEK_SET;
lock->l_type = F_WRLCK;
lock->l_pid = getpid();
@@ -53,6 +53,69 @@ static inline int get_lock_info(int fd, struct flock *lock)
return result;
}
+static inline int set_lock(int fd, const int operation,
+ const int start, const int length)
+{
+ int result;
+ struct flock lock;
+
+ memset(&lock, 0, sizeof(struct flock));
+ lock.l_whence = SEEK_SET;
+ lock.l_type = operation;
+ lock.l_start = start;
+ lock.l_len = length;
+ lock.l_pid = getpid();
+ do {
+ if ((result=fcntl(fd, F_SETLKW, &lock)) != 0)
+ {
+ result = errno != 0 ? errno : ENOMEM;
+ fprintf(stderr, "line: %d, call fcntl fail, "
+ "errno: %d, error info: %s\n", __LINE__,
+ result, STRERROR(result));
+ } else {
+ printf("line: %d, call fcntl %d result: %d\n",
+ __LINE__, operation, result);
+ }
+ } while (result == EINTR);
+
+ return result;
+}
+
+static void *unlock_thread(void *args)
+{
+ char *filename;
+ int result;
+ int fd;
+ struct flock lock;
+
+ filename = (char *)args;
+ fd = open(filename, O_RDWR | O_CREAT, 0644);
+ if (fd < 0) {
+ result = errno != 0 ? errno : EIO;
+ logError("file: "__FILE__", line: %d, "
+ "open file %s fail, "
+ "errno: %d, error info: %s",
+ __LINE__, filename,
+ result, STRERROR(result));
+ return NULL;
+ }
+
+ memset(&lock, 0, sizeof(struct flock));
+ lock.l_start = 100;
+ if ((result=get_lock_info(fd, &lock)) == 0) {
+ logInfo("lock info: { type: %d, whence: %d, start: %"PRId64", "
+ "len: %"PRId64", pid: %d }",
+ lock.l_type, lock.l_whence, (int64_t)lock.l_start,
+ (int64_t)lock.l_len, lock.l_pid);
+ }
+
+ //set_lock(fd, F_WRLCK, 0, 0);
+ sleep(5);
+ //set_lock(fd, F_UNLCK, 0, 0);
+ close(fd);
+ return NULL;
+}
+
int main(int argc, char *argv[])
{
#define SEEK_POS (2 * 1024)
@@ -61,12 +124,15 @@ int main(int argc, char *argv[])
int result;
int sleep_seconds;
int n = 0;
+ pthread_t tid;
char buf[1024];
struct flock lock;
Macro(1);
Macro(1, 2);
Macro(1, 2, 3);
+
+ printf("%0*d\n", 3, 1);
if (argc < 2) {
fprintf(stderr, "Usage: %s \n", argv[0]);
return 1;
@@ -80,7 +146,7 @@ int main(int argc, char *argv[])
sleep_seconds = 1;
}
- fd = open(filename, O_RDWR | O_CREAT, 0644);
+ fd = open(filename, O_RDWR | O_CREAT | O_CLOEXEC, 0644);
if (fd < 0) {
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, "
@@ -91,6 +157,61 @@ int main(int argc, char *argv[])
return result;
}
+ {
+ int flags;
+ flags = fcntl(fd, F_GETFD, 0);
+ if (flags < 0)
+ {
+ logError("file: "__FILE__", line: %d, " \
+ "fcntl failed, errno: %d, error info: %s.", \
+ __LINE__, errno, STRERROR(errno));
+ return errno != 0 ? errno : EACCES;
+ }
+
+ printf("flags: %d, on: %d\n", flags, (flags & FD_CLOEXEC));
+
+ if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) < 0)
+ {
+ logError("file: "__FILE__", line: %d, " \
+ "fcntl failed, errno: %d, error info: %s.", \
+ __LINE__, errno, STRERROR(errno));
+ return errno != 0 ? errno : EACCES;
+ }
+ flags = fcntl(fd, F_GETFD, 0);
+ printf("flags: %d, on: %d\n", flags, (flags & FD_CLOEXEC));
+ }
+
+
+ fork();
+
+ memset(&lock, 0, sizeof(struct flock));
+ lock.l_start = 1024;
+ if ((result=get_lock_info(fd, &lock)) == 0) {
+ logInfo("pid: %d, lock info: { type: %d, whence: %d, "
+ "start: %"PRId64", len: %"PRId64", pid: %d }", getpid(),
+ lock.l_type, lock.l_whence, (int64_t)lock.l_start,
+ (int64_t)lock.l_len, lock.l_pid);
+ }
+
+ set_lock(fd, F_WRLCK, 0, 10);
+ set_lock(fd, F_WRLCK, 10, 10);
+ set_lock(fd, F_WRLCK, 30, 10);
+ //set_lock(fd, F_WRLCK, 0, 0);
+ //set_lock(fd, F_UNLCK, 0, 10);
+ //set_lock(fd, F_UNLCK, 5, 35);
+
+ fc_create_thread(&tid, unlock_thread, filename, 64 * 1024);
+
+ sleep(100);
+ memset(&lock, 0, sizeof(struct flock));
+ lock.l_start = 100;
+ if ((result=get_lock_info(fd, &lock)) == 0) {
+ logInfo("lock info: { type: %d, whence: %d, start: %"PRId64", "
+ "len: %"PRId64", pid: %d }",
+ lock.l_type, lock.l_whence, (int64_t)lock.l_start,
+ (int64_t)lock.l_len, lock.l_pid);
+ }
+
if (flock(fd, LOCK_EX) < 0) {
logError("file: "__FILE__", line: %d, "
"flock file %s fail, "
diff --git a/src/tests/test_file_write_hole.c b/src/tests/test_file_write_hole.c
index dca3366..202c2ab 100644
--- a/src/tests/test_file_write_hole.c
+++ b/src/tests/test_file_write_hole.c
@@ -50,7 +50,7 @@ int main(int argc, char *argv[])
*/
//fd = open(filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
- fd = open(filename, O_RDWR | O_CREAT, 0644);
+ fd = open(filename, O_RDWR | O_CREAT | O_APPEND, 0644);
if (fd < 0) {
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, " \
@@ -69,7 +69,8 @@ int main(int argc, char *argv[])
return errno != 0 ? errno : EIO;
}
- if (lseek(fd, SEEK_POS, SEEK_SET) < 0) {
+ for (int i=0; i<5; i++) {
+ if (lseek(fd, 0, SEEK_SET) < 0) {
logError("file: "__FILE__", line: %d, " \
"lseek file %s fail, " \
"errno: %d, error info: %s", __LINE__, \
@@ -86,6 +87,7 @@ int main(int argc, char *argv[])
return errno != 0 ? errno : EIO;
}
printf("write bytes: %d\n", n);
+ }
if (lseek(fd, 0, SEEK_SET) < 0) {
logError("file: "__FILE__", line: %d, " \
diff --git a/src/tests/test_mblock.c b/src/tests/test_mblock.c
index 970ab6d..ea9bf0b 100644
--- a/src/tests/test_mblock.c
+++ b/src/tests/test_mblock.c
@@ -162,7 +162,7 @@ int main(int argc, char *argv[])
printf("cpu count: %d\n", get_sys_cpu_count());
end_time = get_current_time_ms();
- logInfo("time used: %d ms", (int)(end_time - start_time));
+ logInfo("time used: %d ms\n", (int)(end_time - start_time));
fast_mblock_manager_init();
@@ -182,19 +182,33 @@ int main(int argc, char *argv[])
sched_add_delay_task(test_delay, objs + i, delay, false);
}
+ printf("mblock1 total count: %"PRId64", free count: %u\n",
+ fast_mblock_total_count(&mblock1),
+ fast_mblock_free_count(&mblock1));
+
/*
for (i=0; inext)) != 0)
+ {
+ return;
+ }
+
+ fc_queue_push(&queue, nodes);
+ printf("remove: %d\n", fc_queue_remove(&queue, nodes + 1));
+ printf("remove: %d\n", fc_queue_remove(&queue, nodes + 2));
+ printf("remove: %d\n", fc_queue_remove(&queue, nodes));
+ printf("count: %d\n", fc_queue_count(&queue));
+
+ end = nodes + COUNT / 2;
+ for (node=nodes; noden = (node - nodes) + 1;
+ fc_queue_push(&queue, node);
+ }
+
+ printf("remove: %d\n", fc_queue_remove(&queue, node));
+ printf("remove: %d\n", fc_queue_remove(&queue, node - 1));
+ printf("remove: %d\n", fc_queue_remove(&queue, node - 3));
+ printf("remove: %d\n", fc_queue_remove(&queue, nodes));
+
+ end = nodes + COUNT;
+ for (; noden = (node - nodes) + 1;
+ fc_queue_push(&queue, node);
+ }
+ printf("count: %d\n\n", fc_queue_count(&queue));
+
+ for (node=end-1; node>=nodes; node--) {
+ printf("remove: %d\n", fc_queue_remove(&queue, node));
+ fc_queue_push(&queue, node);
+ }
+
+ printf("count: %d\n", fc_queue_count(&queue));
+}
+
int main(int argc, char *argv[])
{
const int alloc_elements_once = 8 * 1024;
@@ -107,6 +157,9 @@ int main(int argc, char *argv[])
log_init();
g_log_context.log_level = LOG_DEBUG;
+ test_remove();
+ return 0;
+
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigQuitHandler;