Compare commits

...

4 Commits

Author SHA1 Message Date
Hongcai Deng a73cb05a5c
Merge a16fde8070 into cf16c41054 2025-08-03 21:38:18 +08:00
YuQing cf16c41054 src/connection_pool.[hc]: restore pthread mutex lock 2025-08-03 15:18:23 +08:00
YuQing 7fbb5c620b change int2buff, buff2int etc. functions to static inline 2025-08-03 15:10:28 +08:00
Hongcai Deng a16fde8070 fix: compile error when build .so using .a
```
src/libfastcommon.a(hash.o): relocation R_X86_64_32 against `.data' can not be used when making a shared object; recompile with -fPIC
```

env: Ubuntu 14.04, gcc 4.8
2017-01-29 14:20:18 +08:00
7 changed files with 178 additions and 259 deletions

View File

@ -1,8 +1,8 @@
Version 1.78 2025-08-02 Version 1.78 2025-08-03
* getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6 * getIpaddrByName: normalize ip addr when input addr is IPv4 or IPv6
* add files: spinlock.[hc] * add files: spinlock.[hc]
* connection_pool.[hc]: use CAS instead of pthread mutex lock * shared_func.[hc]: change int2buff, buff2int etc. functions to static inline
Version 1.77 2025-03-18 Version 1.77 2025-03-18
* impl. shorten_path for /./ and /../ * impl. shorten_path for /./ and /../

View File

@ -66,7 +66,7 @@ libfastcommon.a: $(FAST_STATIC_OBJS)
.c: .c:
$(COMPILE) -o $@ $< $(FAST_STATIC_OBJS) $(LIB_PATH) $(INC_PATH) $(COMPILE) -o $@ $< $(FAST_STATIC_OBJS) $(LIB_PATH) $(INC_PATH)
.c.o: .c.o:
$(COMPILE) -c -o $@ $< $(INC_PATH) $(COMPILE) -c -fPIC -o $@ $< $(INC_PATH)
.c.lo: .c.lo:
$(COMPILE) -c -fPIC -o $@ $< $(INC_PATH) $(COMPILE) -c -fPIC -o $@ $< $(INC_PATH)
install: install:

View File

@ -23,7 +23,6 @@
#include "shared_func.h" #include "shared_func.h"
#include "sched_thread.h" #include "sched_thread.h"
#include "server_id_func.h" #include "server_id_func.h"
#include "fc_atomic.h"
#include "connection_pool.h" #include "connection_pool.h"
ConnectionCallbacks g_connection_callbacks = { ConnectionCallbacks g_connection_callbacks = {
@ -63,9 +62,6 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
ConnectionInfo *conn, const bool bForce) ConnectionInfo *conn, const bool bForce)
{ {
ConnectionNode *node; ConnectionNode *node;
int64_t index;
int64_t head;
int64_t tail;
char formatted_ip[FORMATTED_IP_SIZE]; char formatted_ip[FORMATTED_IP_SIZE];
node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode)); node = (ConnectionNode *)((char *)conn - sizeof(ConnectionNode));
@ -78,11 +74,9 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
return EINVAL; return EINVAL;
} }
while (1) { if (bForce)
head = FC_ATOMIC_GET(cm->ring.head); {
tail = FC_ATOMIC_GET(cm->ring.tail); cm->total_count--;
if (bForce || (head - tail) >= cp->ring_size - 1) {
FC_ATOMIC_DEC(cm->total_count);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip); format_ip_address(conn->ip_addr, formatted_ip);
@ -97,25 +91,19 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
close_connection(conn); close_connection(conn);
fast_mblock_free_object(&cp->node_allocator, node); fast_mblock_free_object(&cp->node_allocator, node);
if (bForce) { node = cm->head;
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) { while (node != NULL)
node = (ConnectionNode *)FC_ATOMIC_GET( {
cm->ring.nodes[index % cp->ring_size]); node->conn->validate_flag = true;
if (node != NULL) { node = node->next;
node->conn->validate_flag = true; }
} }
} else
}
break;
}
if (__sync_bool_compare_and_swap(&cm->ring.nodes[
head % cp->ring_size], NULL, node))
{ {
if (__sync_bool_compare_and_swap(&cm->ring.head, head, head + 1)) {
node->atime = get_current_time(); node->atime = get_current_time();
FC_ATOMIC_INC(cm->free_count); node->next = cm->head;
cm->head = node;
cm->free_count++;
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip); format_ip_address(conn->ip_addr, formatted_ip);
@ -125,62 +113,41 @@ static int close_conn(ConnectionPool *cp, ConnectionManager *cm,
__LINE__, formatted_ip, conn->port, __LINE__, formatted_ip, conn->port,
conn->sock, cm->total_count, cm->free_count); conn->sock, cm->total_count, cm->free_count);
} }
break;
} else { //rollback
__sync_bool_compare_and_swap(&cm->ring.nodes[
head % cp->ring_size], node, NULL);
sched_yield();
}
}
} }
return 0; return 0;
} }
static inline ConnectionManager *do_find_manager( static ConnectionManager *find_manager(ConnectionPool *cp,
ConnectionManager *head, const string_t *key) ConnectionBucket *bucket, const string_t *key,
const bool need_create)
{ {
ConnectionManager *cm; ConnectionManager *cm;
if (fc_string_equal(&head->key, key)) //fast path if (bucket->head != NULL)
{ {
return head; if (fc_string_equal(&bucket->head->key, key)) //fast path
{
return bucket->head;
} }
else
cm = (ConnectionManager *)FC_ATOMIC_GET(head->next); {
cm = bucket->head->next;
while (cm != NULL) while (cm != NULL)
{ {
if (fc_string_equal(&cm->key, key)) if (fc_string_equal(&cm->key, key))
{ {
return cm; return cm;
} }
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next); cm = cm->next;
}
} }
return NULL;
} }
static ConnectionManager *find_manager(ConnectionBucket *bucket, if (!need_create)
const string_t *key)
{
ConnectionManager *head;
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
if (head == NULL)
{ {
return NULL; return NULL;
} }
return do_find_manager(head, key);
}
static ConnectionManager *create_manager(ConnectionPool *cp,
ConnectionBucket *bucket, ConnectionManager *old_head,
const string_t *key, int *result)
{
ConnectionManager *cm;
char *buff;
int aligned_key_len;
int bytes;
cm = (ConnectionManager *)fast_mblock_alloc_object( cm = (ConnectionManager *)fast_mblock_alloc_object(
&cp->manager_allocator); &cp->manager_allocator);
@ -189,66 +156,24 @@ static ConnectionManager *create_manager(ConnectionPool *cp,
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__, "malloc %d bytes fail", __LINE__,
(int)sizeof(ConnectionManager)); (int)sizeof(ConnectionManager));
*result = ENOMEM;
return NULL; return NULL;
} }
cm->ring.head = cm->ring.tail = 0; cm->head = NULL;
cm->total_count = 0; cm->total_count = 0;
cm->free_count = 0; cm->free_count = 0;
if ((cm->key.str=fc_malloc(key->len + 1)) == NULL)
aligned_key_len = MEM_ALIGN(key->len);
bytes = aligned_key_len + sizeof(ConnectionNode *) * cp->ring_size;
if ((buff=fc_malloc(bytes)) == NULL)
{ {
*result = ENOMEM;
return NULL; return NULL;
} }
memset(buff, 0, bytes); memcpy(cm->key.str, key->str, key->len + 1);
cm->key.str = buff;
cm->ring.nodes = (ConnectionNode **)(buff + aligned_key_len);
memcpy(cm->key.str, key->str, key->len);
cm->key.len = key->len; cm->key.len = key->len;
//add to manager chain //add to manager chain
FC_ATOMIC_SET(cm->next, old_head); cm->next = bucket->head;
if (__sync_bool_compare_and_swap(&bucket->head, old_head, cm)) bucket->head = cm;
{
return cm; return cm;
} }
else
{
fast_mblock_free_object(&cp->manager_allocator, cm);
*result = EAGAIN;
return NULL;
}
}
static ConnectionManager *get_manager(ConnectionPool *cp,
ConnectionBucket *bucket, const string_t *key)
{
ConnectionManager *head;
ConnectionManager *cm;
int result = 0;
do {
head = (ConnectionManager *)FC_ATOMIC_GET(bucket->head);
if (head != NULL)
{
if ((cm=do_find_manager(head, key)) != NULL)
{
return cm;
}
}
if ((cm=create_manager(cp, bucket, head, key, &result)) != NULL)
{
return cm;
}
} while (result == EAGAIN);
return NULL;
}
static int close_connection(ConnectionPool *cp, ConnectionInfo *conn, static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
const string_t *key, uint32_t hash_code, const bool bForce) const string_t *key, uint32_t hash_code, const bool bForce)
@ -259,7 +184,8 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
int result; int result;
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity; bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
if ((cm=find_manager(bucket, key)) != NULL) pthread_mutex_lock(&bucket->lock);
if ((cm=find_manager(cp, bucket, key, false)) != NULL)
{ {
result = close_conn(cp, cm, conn, bForce); result = close_conn(cp, cm, conn, bForce);
} }
@ -271,6 +197,7 @@ static int close_connection(ConnectionPool *cp, ConnectionInfo *conn,
__LINE__, formatted_ip, conn->port); __LINE__, formatted_ip, conn->port);
result = ENOENT; result = ENOENT;
} }
pthread_mutex_unlock(&bucket->lock);
return result; return result;
} }
@ -311,6 +238,7 @@ static void cp_tls_destroy(void *ptr)
static int init_hashtable(ConnectionPool *cp, const int htable_capacity) static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
{ {
int bytes; int bytes;
int result;
unsigned int *hash_capacity; unsigned int *hash_capacity;
ConnectionBucket *bucket; ConnectionBucket *bucket;
ConnectionBucket *end; ConnectionBucket *end;
@ -335,6 +263,10 @@ static int init_hashtable(ConnectionPool *cp, const int htable_capacity)
for (bucket=cp->hashtable.buckets; bucket<end; bucket++) for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
{ {
bucket->head = NULL; bucket->head = NULL;
if ((result=init_pthread_lock(&bucket->lock)) != 0)
{
return result;
}
} }
return 0; return 0;
@ -355,12 +287,12 @@ int conn_pool_init_ex1(ConnectionPool *cp, const int connect_timeout,
cp->connect_timeout_ms = connect_timeout * 1000; cp->connect_timeout_ms = connect_timeout * 1000;
cp->max_count_per_entry = max_count_per_entry; cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time; cp->max_idle_time = max_idle_time;
cp->ring_size = max_count_per_entry > 0 ? max_count_per_entry : 1024;
cp->extra_data_size = extra_data_size; cp->extra_data_size = extra_data_size;
cp->connect_done_callback.func = connect_done_func; cp->connect_done_callback.func = connect_done_func;
cp->connect_done_callback.args = connect_done_args; cp->connect_done_callback.args = connect_done_args;
cp->validate_callback.func = validate_func; cp->validate_callback.func = validate_func;
cp->validate_callback.args = validate_args; cp->validate_callback.args = validate_args;
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager", if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool-manager",
sizeof(ConnectionManager), 256, alloc_elements_limit, sizeof(ConnectionManager), 256, alloc_elements_limit,
NULL, NULL, true)) != 0) NULL, NULL, true)) != 0)
@ -423,31 +355,31 @@ static void conn_pool_hash_walk(ConnectionPool *cp,
end = cp->hashtable.buckets + cp->hashtable.capacity; end = cp->hashtable.buckets + cp->hashtable.capacity;
for (bucket=cp->hashtable.buckets; bucket<end; bucket++) for (bucket=cp->hashtable.buckets; bucket<end; bucket++)
{ {
cm = (ConnectionManager *)FC_ATOMIC_GET(bucket->head); pthread_mutex_lock(&bucket->lock);
cm = bucket->head;
while (cm != NULL) while (cm != NULL)
{ {
current = cm; current = cm;
cm = (ConnectionManager *)FC_ATOMIC_GET(cm->next); cm = cm->next;
callback(cp, current, args); callback(cp, current, args);
} }
pthread_mutex_unlock(&bucket->lock);
} }
} }
static void cp_destroy_walk_callback(ConnectionPool *cp, static void cp_destroy_walk_callback(ConnectionPool *cp,
ConnectionManager *cm, void *args) ConnectionManager *cm, void *args)
{ {
int64_t index;
int64_t head;
ConnectionNode *node; ConnectionNode *node;
ConnectionNode *deleted;
head = FC_ATOMIC_GET(cm->ring.head); node = cm->head;
for (index=FC_ATOMIC_GET(cm->ring.tail); index<head; index++) { while (node != NULL)
node = (ConnectionNode *)FC_ATOMIC_GET( {
cm->ring.nodes[index % cp->ring_size]); deleted = node;
if (node != NULL) { node = node->next;
G_COMMON_CONNECTION_CALLBACKS[node->conn->comm_type]. G_COMMON_CONNECTION_CALLBACKS[deleted->conn->comm_type].
close_connection(node->conn); close_connection(deleted->conn);
}
} }
free(cm->key.str); free(cm->key.str);
@ -455,11 +387,19 @@ static void cp_destroy_walk_callback(ConnectionPool *cp,
void conn_pool_destroy(ConnectionPool *cp) void conn_pool_destroy(ConnectionPool *cp)
{ {
ConnectionBucket *bucket;
ConnectionBucket *end;
if (cp->hashtable.buckets == NULL) { if (cp->hashtable.buckets == NULL) {
return; return;
} }
conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp); conn_pool_hash_walk(cp, cp_destroy_walk_callback, cp);
end = cp->hashtable.buckets + cp->hashtable.capacity;
for (bucket=cp->hashtable.buckets; bucket<end; bucket++) {
pthread_mutex_destroy(&bucket->lock);
}
free(cp->hashtable.buckets); free(cp->hashtable.buckets);
cp->hashtable.buckets = NULL; cp->hashtable.buckets = NULL;
@ -557,11 +497,10 @@ int conn_pool_async_connect_server_ex(ConnectionInfo *conn,
} }
static ConnectionInfo *get_conn(ConnectionPool *cp, static ConnectionInfo *get_conn(ConnectionPool *cp,
ConnectionManager *cm, const ConnectionInfo *conn, ConnectionManager *cm, pthread_mutex_t *lock,
const char *service_name, int *err_no) const ConnectionInfo *conn, const char *service_name,
int *err_no)
{ {
int64_t tail;
int index;
ConnectionNode *node; ConnectionNode *node;
ConnectionInfo *ci; ConnectionInfo *ci;
char formatted_ip[FORMATTED_IP_SIZE]; char formatted_ip[FORMATTED_IP_SIZE];
@ -570,11 +509,10 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
current_time = get_current_time(); current_time = get_current_time();
while (1) while (1)
{ {
tail = FC_ATOMIC_GET(cm->ring.tail); if (cm->head == NULL)
if (tail == FC_ATOMIC_GET(cm->ring.head)) //empty
{ {
if ((cp->max_count_per_entry > 0) && FC_ATOMIC_GET( if ((cp->max_count_per_entry > 0) &&
cm->total_count) >= cp->max_count_per_entry) (cm->total_count >= cp->max_count_per_entry))
{ {
format_ip_address(conn->ip_addr, formatted_ip); format_ip_address(conn->ip_addr, formatted_ip);
*err_no = ENOSPC; *err_no = ENOSPC;
@ -600,7 +538,9 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
node->manager = cm; node->manager = cm;
node->next = NULL; node->next = NULL;
node->atime = 0; node->atime = 0;
FC_ATOMIC_INC(cm->total_count);
cm->total_count++;
pthread_mutex_unlock(lock);
memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr)); memcpy(node->conn->ip_addr, conn->ip_addr, sizeof(conn->ip_addr));
node->conn->port = conn->port; node->conn->port = conn->port;
@ -622,7 +562,8 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
close_connection(node->conn); close_connection(node->conn);
fast_mblock_free_object(&cp->node_allocator, node); fast_mblock_free_object(&cp->node_allocator, node);
FC_ATOMIC_DEC(cm->total_count); //rollback pthread_mutex_lock(lock);
cm->total_count--; //rollback
return NULL; return NULL;
} }
@ -635,27 +576,19 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
node->conn->sock, cm->total_count, node->conn->sock, cm->total_count,
cm->free_count); cm->free_count);
} }
pthread_mutex_lock(lock);
return node->conn; return node->conn;
} }
else else
{ {
bool invalid; bool invalid;
if (!__sync_bool_compare_and_swap(&cm->ring.tail, tail, tail + 1)) { node = cm->head;
sched_yield();
continue;
}
index = tail % cp->ring_size;
node = (ConnectionNode *)FC_ATOMIC_GET(
cm->ring.nodes[index]);
if (node == NULL) {
continue;
}
__sync_bool_compare_and_swap(&cm->ring.nodes[index], node, NULL);
ci = node->conn; ci = node->conn;
FC_ATOMIC_DEC(cm->free_count); cm->head = node->next;
cm->free_count--;
if (current_time - node->atime > cp->max_idle_time) if (current_time - node->atime > cp->max_idle_time)
{ {
if (cp->validate_callback.func != NULL) if (cp->validate_callback.func != NULL)
@ -685,7 +618,8 @@ static ConnectionInfo *get_conn(ConnectionPool *cp,
if (invalid) if (invalid)
{ {
FC_ATOMIC_DEC(cm->total_count); cm->total_count--;
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(conn->ip_addr, formatted_ip); format_ip_address(conn->ip_addr, formatted_ip);
logDebug("file: "__FILE__", line: %d, " logDebug("file: "__FILE__", line: %d, "
@ -728,9 +662,10 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
ConnectionInfo *ci; ConnectionInfo *ci;
bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity; bucket = cp->hashtable.buckets + hash_code % cp->hashtable.capacity;
if ((cm=get_manager(cp, bucket, key)) != NULL) pthread_mutex_lock(&bucket->lock);
if ((cm=find_manager(cp, bucket, key, true)) != NULL)
{ {
ci = get_conn(cp, cm, conn, service_name, err_no); ci = get_conn(cp, cm, &bucket->lock, conn, service_name, err_no);
if (ci != NULL) if (ci != NULL)
{ {
ci->shared = shared; ci->shared = shared;
@ -741,6 +676,7 @@ static ConnectionInfo *get_connection(ConnectionPool *cp,
*err_no = ENOMEM; *err_no = ENOMEM;
ci = NULL; ci = NULL;
} }
pthread_mutex_unlock(&bucket->lock);
return ci; return ci;
} }
@ -889,8 +825,8 @@ static void cp_stat_walk_callback(ConnectionPool *cp,
stat = args; stat = args;
stat->server_count++; stat->server_count++;
stat->connection.total_count += FC_ATOMIC_GET(cm->total_count); stat->connection.total_count += cm->total_count;
stat->connection.free_count += FC_ATOMIC_GET(cm->free_count); stat->connection.free_count += cm->free_count;
} }
void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat) void conn_pool_stat(ConnectionPool *cp, ConnectionPoolStat *stat)

View File

@ -147,24 +147,21 @@ struct tagConnectionManager;
typedef struct tagConnectionNode { typedef struct tagConnectionNode {
ConnectionInfo *conn; ConnectionInfo *conn;
struct tagConnectionManager *manager; struct tagConnectionManager *manager;
struct tagConnectionNode *next; //for thread local struct tagConnectionNode *next;
time_t atime; //last access time time_t atime; //last access time
} ConnectionNode; } ConnectionNode;
typedef struct tagConnectionManager { typedef struct tagConnectionManager {
string_t key; string_t key;
struct { ConnectionNode *head;
ConnectionNode **nodes; int total_count; //total connections
volatile int64_t head; //producer int free_count; //free connections
volatile int64_t tail; //consumer struct tagConnectionManager *next;
} ring;
volatile int total_count; //total connections
volatile int free_count; //free connections
volatile struct tagConnectionManager *next;
} ConnectionManager; } ConnectionManager;
typedef struct tagConnectionBucket { typedef struct tagConnectionBucket {
volatile ConnectionManager *head; ConnectionManager *head;
pthread_mutex_t lock;
} ConnectionBucket; } ConnectionBucket;
struct tagConnectionPool; struct tagConnectionPool;
@ -182,7 +179,6 @@ typedef struct tagConnectionPool {
int connect_timeout_ms; int connect_timeout_ms;
int max_count_per_entry; //0 means no limit int max_count_per_entry; //0 means no limit
int ring_size;
/* /*
connections whose idle time exceeds this time will be closed connections whose idle time exceeds this time will be closed

View File

@ -1565,66 +1565,6 @@ int fc_copy_to_path(const char *src_filename, const char *dest_path)
return fc_copy_file(src_filename, dest_filename); return fc_copy_file(src_filename, dest_filename);
} }
void short2buff(const short n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
short buff2short(const char *buff)
{
return (short)((((unsigned char)(*(buff))) << 8) | \
((unsigned char)(*(buff+1))));
}
void int2buff(const int n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 24) & 0xFF;
*p++ = (n >> 16) & 0xFF;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
int buff2int(const char *buff)
{
return (((unsigned char)(*buff)) << 24) | \
(((unsigned char)(*(buff+1))) << 16) | \
(((unsigned char)(*(buff+2))) << 8) | \
((unsigned char)(*(buff+3)));
}
void long2buff(int64_t n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 56) & 0xFF;
*p++ = (n >> 48) & 0xFF;
*p++ = (n >> 40) & 0xFF;
*p++ = (n >> 32) & 0xFF;
*p++ = (n >> 24) & 0xFF;
*p++ = (n >> 16) & 0xFF;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
int64_t buff2long(const char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
return (((int64_t)(*p)) << 56) | \
(((int64_t)(*(p+1))) << 48) | \
(((int64_t)(*(p+2))) << 40) | \
(((int64_t)(*(p+3))) << 32) | \
(((int64_t)(*(p+4))) << 24) | \
(((int64_t)(*(p+5))) << 16) | \
(((int64_t)(*(p+6))) << 8) | \
((int64_t)(*(p+7)));
}
int fd_gets(int fd, char *buff, const int size, int once_bytes) int fd_gets(int fd, char *buff, const int size, int once_bytes)
{ {
char *pDest; char *pDest;

View File

@ -196,14 +196,24 @@ void printBuffHex(const char *s, const int len);
* buff: the buffer, at least 2 bytes space, no tail \0 * buff: the buffer, at least 2 bytes space, no tail \0
* return: none * return: none
*/ */
void short2buff(const short n, char *buff); static inline void short2buff(const short n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
/** buffer convert to 16 bits int /** buffer convert to 16 bits int
* parameters: * parameters:
* buff: big-endian 2 bytes buffer * buff: big-endian 2 bytes buffer
* return: 16 bits int value * return: 16 bits int value
*/ */
short buff2short(const char *buff); static inline short buff2short(const char *buff)
{
return (short)((((unsigned char)(*(buff))) << 8) |
((unsigned char)(*(buff+1))));
}
/** 32 bits int convert to buffer (big-endian) /** 32 bits int convert to buffer (big-endian)
* parameters: * parameters:
@ -211,14 +221,28 @@ short buff2short(const char *buff);
* buff: the buffer, at least 4 bytes space, no tail \0 * buff: the buffer, at least 4 bytes space, no tail \0
* return: none * return: none
*/ */
void int2buff(const int n, char *buff); static inline void int2buff(const int n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 24) & 0xFF;
*p++ = (n >> 16) & 0xFF;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
/** buffer convert to 32 bits int /** buffer convert to 32 bits int
* parameters: * parameters:
* buff: big-endian 4 bytes buffer * buff: big-endian 4 bytes buffer
* return: 32 bits int value * return: 32 bits int value
*/ */
int buff2int(const char *buff); static inline int buff2int(const char *buff)
{
return (((unsigned char)(*buff)) << 24) |
(((unsigned char)(*(buff+1))) << 16) |
(((unsigned char)(*(buff+2))) << 8) |
((unsigned char)(*(buff+3)));
}
/** long (64 bits) convert to buffer (big-endian) /** long (64 bits) convert to buffer (big-endian)
* parameters: * parameters:
@ -226,15 +250,38 @@ int buff2int(const char *buff);
* buff: the buffer, at least 8 bytes space, no tail \0 * buff: the buffer, at least 8 bytes space, no tail \0
* return: none * return: none
*/ */
void long2buff(int64_t n, char *buff); static inline void long2buff(int64_t n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 56) & 0xFF;
*p++ = (n >> 48) & 0xFF;
*p++ = (n >> 40) & 0xFF;
*p++ = (n >> 32) & 0xFF;
*p++ = (n >> 24) & 0xFF;
*p++ = (n >> 16) & 0xFF;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
/** buffer convert to 64 bits int /** buffer convert to 64 bits int
* parameters: * parameters:
* buff: big-endian 8 bytes buffer * buff: big-endian 8 bytes buffer
* return: 64 bits int value * return: 64 bits int value
*/ */
int64_t buff2long(const char *buff); static inline int64_t buff2long(const char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
return (((int64_t)(*p)) << 56) |
(((int64_t)(*(p+1))) << 48) |
(((int64_t)(*(p+2))) << 40) |
(((int64_t)(*(p+3))) << 32) |
(((int64_t)(*(p+4))) << 24) |
(((int64_t)(*(p+5))) << 16) |
(((int64_t)(*(p+6))) << 8) |
((int64_t)(*(p+7)));
}
/** 32 bits float convert to buffer (big-endian) /** 32 bits float convert to buffer (big-endian)
* parameters: * parameters:

View File

@ -30,7 +30,7 @@
#define USE_CONN_POOL 1 #define USE_CONN_POOL 1
//#define USE_CAS_LOCK 1 //#define USE_CAS_LOCK 1
static int thread_count = 24; static int thread_count = 16;
static int64_t loop_count = 10000000; static int64_t loop_count = 10000000;
static ConnectionPool cpool; static ConnectionPool cpool;
static char buff[1024]; static char buff[1024];