struct fast_task_info support send and recv double buffers

support_rdma
YuQing 2023-09-25 18:36:15 +08:00
parent 2e176a9d1b
commit acaf94db0c
3 changed files with 268 additions and 667 deletions

View File

@ -1,11 +1,12 @@
Version 1.70 2023-09-20 Version 1.70 2023-09-25
* get full mac address of infiniband NIC under Linux * get full mac address of infiniband NIC under Linux
* struct fast_task_info add field conn for RDMA connection * struct fast_task_info add field conn for RDMA connection
* server_id_func.[hc]: support communication type * server_id_func.[hc]: support communication type
* connection_pool.[hc] support callbacks for RDMA * connection_pool.[hc] support callbacks for RDMA
* nio thread data support busy_polling_callback * nio thread data support busy_polling_callback
* connection_pool.[hc] support thread local for performance * connection_pool.[hc] support thread local for performance
* struct fast_task_info support send and recv double buffers
Version 1.69 2023-08-05 Version 1.69 2023-08-05
* bugfixed: array_allocator_alloc MUST init the array * bugfixed: array_allocator_alloc MUST init the array

View File

@ -25,656 +25,183 @@
#include "fc_memory.h" #include "fc_memory.h"
#include "fast_task_queue.h" #include "fast_task_queue.h"
static struct fast_task_queue g_free_queue; static int task_alloc_init(struct fast_task_info *task,
struct fast_task_queue *queue)
struct mpool_node {
struct fast_task_info *blocks;
struct fast_task_info *last_block; //last block
struct mpool_node *next;
};
struct mpool_chain {
struct mpool_node *head;
struct mpool_node *tail;
};
static struct mpool_chain g_mpool = {NULL, NULL};
int task_queue_init(struct fast_task_queue *pQueue)
{ {
int result; task->arg = (char *)task + ALIGNED_TASK_INFO_SIZE + queue->padding_size;
task->send.ptr = &task->send.holder;
if ((result=init_pthread_lock(&(pQueue->lock))) != 0) task->send.ptr->size = queue->min_buff_size;
{ if (queue->malloc_whole_block) {
logError("file: "__FILE__", line: %d, " \ task->send.ptr->data = (char *)task->arg + queue->arg_size;
"init_pthread_lock fail, errno: %d, error info: %s", \ } else {
__LINE__, result, STRERROR(result)); task->send.ptr->data = (char *)fc_malloc(task->send.ptr->size);
return result; if (task->send.ptr->data == NULL) {
return ENOMEM;
}
} }
pQueue->head = NULL; if (queue->double_buffers) {
pQueue->tail = NULL; task->recv.ptr = &task->recv.holder;
task->recv.ptr->size = queue->min_buff_size;
task->recv.ptr->data = (char *)fc_malloc(task->recv.ptr->size);
if (task->recv.ptr->data == NULL) {
return ENOMEM;
}
} else {
task->recv.ptr = &task->send.holder;
}
task->free_queue = queue;
if (queue->init_callback != NULL) {
return queue->init_callback(task);
}
return 0; return 0;
} }
static void free_mpool(struct mpool_node *mpool, char *end) int free_queue_init_ex2(struct fast_task_queue *queue, const char *name,
{ const bool double_buffers, const int max_connections,
char *pt;
for (pt=(char *)mpool->blocks; pt < end; pt += g_free_queue.block_size)
{
free(((struct fast_task_info *)pt)->data);
}
free(mpool->blocks);
free(mpool);
}
static struct mpool_node *malloc_mpool(const int total_alloc_size)
{
struct fast_task_info *pTask;
char *p;
char *pCharEnd;
struct mpool_node *mpool;
mpool = (struct mpool_node *)fc_malloc(sizeof(struct mpool_node));
if (mpool == NULL)
{
return NULL;
}
mpool->next = NULL;
mpool->blocks = (struct fast_task_info *)fc_malloc(total_alloc_size);
if (mpool->blocks == NULL)
{
free(mpool);
return NULL;
}
memset(mpool->blocks, 0, total_alloc_size);
pCharEnd = ((char *)mpool->blocks) + total_alloc_size;
for (p=(char *)mpool->blocks; p<pCharEnd; p += g_free_queue.block_size)
{
pTask = (struct fast_task_info *)p;
pTask->size = g_free_queue.min_buff_size;
pTask->arg = p + ALIGNED_TASK_INFO_SIZE + g_free_queue.padding_size;
if (g_free_queue.malloc_whole_block)
{
pTask->data = (char *)pTask->arg + g_free_queue.arg_size;
}
else
{
pTask->data = (char *)fc_malloc(pTask->size);
if (pTask->data == NULL)
{
free_mpool(mpool, p);
return NULL;
}
}
if (g_free_queue.init_callback != NULL)
{
if (g_free_queue.init_callback(pTask) != 0)
{
free_mpool(mpool, p);
return NULL;
}
}
}
mpool->last_block = (struct fast_task_info *)
(pCharEnd - g_free_queue.block_size);
for (p=(char *)mpool->blocks; p<(char *)mpool->last_block;
p += g_free_queue.block_size)
{
pTask = (struct fast_task_info *)p;
pTask->next = (struct fast_task_info *)(p + g_free_queue.block_size);
}
mpool->last_block->next = NULL;
return mpool;
}
int free_queue_init_ex2(const int max_connections, const int init_connections,
const int alloc_task_once, const int min_buff_size, const int alloc_task_once, const int min_buff_size,
const int max_buff_size, const int padding_size, const int max_buff_size, const int padding_size,
const int arg_size, TaskInitCallback init_callback) const int arg_size, TaskInitCallback init_callback)
{ {
#define MAX_DATA_SIZE (256 * 1024 * 1024) #define MAX_DATA_SIZE (256 * 1024 * 1024)
int64_t total_size;
struct mpool_node *mpool;
int alloc_size;
int alloc_once; int alloc_once;
int result;
int loop_count;
int aligned_min_size; int aligned_min_size;
int aligned_max_size; int aligned_max_size;
int aligned_padding_size; int aligned_padding_size;
int aligned_arg_size; int aligned_arg_size;
rlim_t max_data_size; rlim_t max_data_size;
char aname[64];
if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0)
{
logError("file: "__FILE__", line: %d, "
"init_pthread_lock fail, errno: %d, error info: %s",
__LINE__, result, STRERROR(result));
return result;
}
aligned_min_size = MEM_ALIGN(min_buff_size); aligned_min_size = MEM_ALIGN(min_buff_size);
aligned_max_size = MEM_ALIGN(max_buff_size); aligned_max_size = MEM_ALIGN(max_buff_size);
aligned_padding_size = MEM_ALIGN(padding_size); aligned_padding_size = MEM_ALIGN(padding_size);
aligned_arg_size = MEM_ALIGN(arg_size); aligned_arg_size = MEM_ALIGN(arg_size);
g_free_queue.block_size = ALIGNED_TASK_INFO_SIZE + queue->block_size = ALIGNED_TASK_INFO_SIZE +
aligned_padding_size + aligned_arg_size; aligned_padding_size + aligned_arg_size;
alloc_size = g_free_queue.block_size * init_connections; if (alloc_task_once <= 0) {
if (aligned_max_size > aligned_min_size) alloc_once = FC_MIN(MAX_DATA_SIZE / queue->block_size, 256);
{ if (alloc_once == 0) {
total_size = alloc_size; alloc_once = 1;
g_free_queue.malloc_whole_block = false;
max_data_size = 0;
} }
else } else {
{ alloc_once = alloc_task_once;
}
if (aligned_max_size > aligned_min_size) {
queue->malloc_whole_block = false;
max_data_size = 0;
} else {
struct rlimit rlimit_data; struct rlimit rlimit_data;
if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) {
{
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"call getrlimit fail, " "call getrlimit fail, "
"errno: %d, error info: %s", "errno: %d, error info: %s",
__LINE__, errno, STRERROR(errno)); __LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EPERM; return errno != 0 ? errno : EPERM;
} }
if (rlimit_data.rlim_cur == RLIM_INFINITY) if (rlimit_data.rlim_cur == RLIM_INFINITY) {
{
max_data_size = MAX_DATA_SIZE; max_data_size = MAX_DATA_SIZE;
} } else {
else
{
max_data_size = rlimit_data.rlim_cur; max_data_size = rlimit_data.rlim_cur;
if (max_data_size > MAX_DATA_SIZE) if (max_data_size > MAX_DATA_SIZE) {
{
max_data_size = MAX_DATA_SIZE; max_data_size = MAX_DATA_SIZE;
} }
} }
if (max_data_size >= (int64_t)(g_free_queue.block_size + if (max_data_size >= (int64_t)(queue->block_size +
aligned_min_size) * (int64_t)init_connections) aligned_min_size) * (int64_t)alloc_once)
{ {
total_size = alloc_size + (int64_t)aligned_min_size * queue->malloc_whole_block = true;
init_connections; queue->block_size += aligned_min_size;
g_free_queue.malloc_whole_block = true; } else {
g_free_queue.block_size += aligned_min_size; queue->malloc_whole_block = false;
}
else
{
total_size = alloc_size;
g_free_queue.malloc_whole_block = false;
max_data_size = 0; max_data_size = 0;
} }
} }
g_free_queue.max_connections = max_connections; queue->double_buffers = double_buffers;
g_free_queue.alloc_connections = init_connections; queue->min_buff_size = aligned_min_size;
if (alloc_task_once <= 0) queue->max_buff_size = aligned_max_size;
{ queue->padding_size = aligned_padding_size;
g_free_queue.alloc_task_once = 256; queue->arg_size = aligned_arg_size;
alloc_once = MAX_DATA_SIZE / g_free_queue.block_size; queue->init_callback = init_callback;
if (g_free_queue.alloc_task_once > alloc_once)
{
g_free_queue.alloc_task_once = alloc_once > 0 ? alloc_once : 1;
}
}
else
{
g_free_queue.alloc_task_once = alloc_task_once;
}
g_free_queue.min_buff_size = aligned_min_size;
g_free_queue.max_buff_size = aligned_max_size;
g_free_queue.padding_size = aligned_padding_size;
g_free_queue.arg_size = aligned_arg_size;
g_free_queue.init_callback = init_callback;
logDebug("file: "__FILE__", line: %d, " /*
"max_connections: %d, init_connections: %d, alloc_task_once: %d, " logInfo("file: "__FILE__", line: %d, [%s] double_buffers: %d, "
"max_connections: %d, alloc_once: %d, malloc_whole_block: %d, "
"min_buff_size: %d, max_buff_size: %d, block_size: %d, " "min_buff_size: %d, max_buff_size: %d, block_size: %d, "
"padding_size: %d, arg_size: %d, max_data_size: %"PRId64", " "padding_size: %d, arg_size: %d, max_data_size: %"PRId64,
"total_size: %"PRId64, __LINE__, max_connections, init_connections, __LINE__, name, double_buffers, max_connections, alloc_once,
g_free_queue.alloc_task_once, aligned_min_size, aligned_max_size, queue->malloc_whole_block, aligned_min_size, aligned_max_size,
g_free_queue.block_size, aligned_padding_size, aligned_arg_size, queue->block_size, aligned_padding_size, aligned_arg_size,
(int64_t)max_data_size, total_size); (int64_t)max_data_size);
*/
if ((!g_free_queue.malloc_whole_block) || (total_size <= max_data_size)) snprintf(aname, sizeof(aname), "%s-task", name);
{ return fast_mblock_init_ex1(&queue->allocator, aname,
loop_count = 1; queue->block_size, alloc_once, max_connections,
mpool = malloc_mpool(total_size); (fast_mblock_object_init_func)task_alloc_init,
if (mpool == NULL) queue, true);
{
return errno != 0 ? errno : ENOMEM;
}
g_mpool.head = mpool;
g_mpool.tail = mpool;
}
else
{
int remain_count;
int alloc_count;
int current_alloc_size;
loop_count = 0;
remain_count = init_connections;
alloc_once = max_data_size / g_free_queue.block_size;
while (remain_count > 0)
{
alloc_count = (remain_count > alloc_once) ?
alloc_once : remain_count;
current_alloc_size = g_free_queue.block_size * alloc_count;
mpool = malloc_mpool(current_alloc_size);
if (mpool == NULL)
{
free_queue_destroy();
return errno != 0 ? errno : ENOMEM;
}
if (g_mpool.tail == NULL)
{
g_mpool.head = mpool;
}
else
{
g_mpool.tail->next = mpool;
g_mpool.tail->last_block->next = mpool->blocks; //link previous mpool to current
}
g_mpool.tail = mpool;
remain_count -= alloc_count;
loop_count++;
}
logDebug("file: "__FILE__", line: %d, " \
"alloc_once: %d", __LINE__, alloc_once);
}
logDebug("file: "__FILE__", line: %d, " \
"malloc task info as whole: %d, malloc loop count: %d", \
__LINE__, g_free_queue.malloc_whole_block, loop_count);
if (g_mpool.head != NULL)
{
g_free_queue.head = g_mpool.head->blocks;
g_free_queue.tail = g_mpool.tail->last_block;
}
return 0;
} }
void free_queue_destroy() void free_queue_destroy(struct fast_task_queue *queue)
{ {
struct mpool_node *mpool; fast_mblock_destroy(&queue->allocator);
struct mpool_node *mp;
if (g_mpool.head == NULL)
{
return;
}
if (!g_free_queue.malloc_whole_block)
{
char *p;
char *pCharEnd;
struct fast_task_info *pTask;
mpool = g_mpool.head;
while (mpool != NULL)
{
pCharEnd = (char *)mpool->last_block + g_free_queue.block_size;
for (p=(char *)mpool->blocks; p<pCharEnd; p += g_free_queue.block_size)
{
pTask = (struct fast_task_info *)p;
if (pTask->data != NULL)
{
free(pTask->data);
pTask->data = NULL;
}
}
mpool = mpool->next;
}
}
mpool = g_mpool.head;
while (mpool != NULL)
{
mp = mpool;
mpool = mpool->next;
free(mp->blocks);
free(mp);
}
g_mpool.head = g_mpool.tail = NULL;
pthread_mutex_destroy(&(g_free_queue.lock));
} }
static int free_queue_realloc() static int _realloc_buffer(struct fast_net_buffer *buffer,
{ const int new_size, const bool copy_data)
struct mpool_node *mpool;
struct fast_task_info *head;
struct fast_task_info *tail;
int remain_count;
int alloc_count;
int current_alloc_size;
head = tail = NULL;
remain_count = g_free_queue.max_connections -
g_free_queue.alloc_connections;
alloc_count = (remain_count > g_free_queue.alloc_task_once) ?
g_free_queue.alloc_task_once : remain_count;
if (alloc_count > 0)
{
current_alloc_size = g_free_queue.block_size * alloc_count;
mpool = malloc_mpool(current_alloc_size);
if (mpool == NULL)
{
return ENOMEM;
}
if (g_mpool.tail == NULL)
{
g_mpool.head = mpool;
}
else
{
g_mpool.tail->next = mpool;
}
g_mpool.tail = mpool;
head = mpool->blocks;
tail = mpool->last_block;
remain_count -= alloc_count;
}
else {
return ENOSPC;
}
if (g_free_queue.head == NULL)
{
g_free_queue.head = head;
}
if (g_free_queue.tail != NULL)
{
g_free_queue.tail->next = head;
}
g_free_queue.tail = tail;
g_free_queue.alloc_connections += alloc_count;
logDebug("file: "__FILE__", line: %d, "
"alloc_connections: %d, realloc %d elements", __LINE__,
g_free_queue.alloc_connections, alloc_count);
return 0;
}
struct fast_task_info *free_queue_pop()
{
struct fast_task_info *pTask;
int i;
if ((pTask=task_queue_pop(&g_free_queue)) != NULL)
{
return pTask;
}
if (g_free_queue.alloc_connections >= g_free_queue.max_connections)
{
return NULL;
}
for (i=0; i<10; i++)
{
pthread_mutex_lock(&g_free_queue.lock);
if (g_free_queue.alloc_connections >= g_free_queue.max_connections)
{
if (g_free_queue.head == NULL)
{
pthread_mutex_unlock(&g_free_queue.lock);
return NULL;
}
}
else
{
if (g_free_queue.head == NULL && free_queue_realloc() != 0)
{
pthread_mutex_unlock(&g_free_queue.lock);
return NULL;
}
}
pthread_mutex_unlock(&g_free_queue.lock);
if ((pTask=task_queue_pop(&g_free_queue)) != NULL)
{
return pTask;
}
}
return NULL;
}
static int _realloc_buffer(struct fast_task_info *pTask, const int new_size,
const bool copy_data)
{ {
char *new_buff; char *new_buff;
new_buff = (char *)fc_malloc(new_size); new_buff = (char *)fc_malloc(new_size);
if (new_buff == NULL) if (new_buff == NULL) {
{
return ENOMEM; return ENOMEM;
} }
else
{ if (copy_data && buffer->offset > 0) {
if (copy_data && pTask->offset > 0) { memcpy(new_buff, buffer->data, buffer->offset);
memcpy(new_buff, pTask->data, pTask->offset);
} }
free(pTask->data); free(buffer->data);
pTask->size = new_size; buffer->size = new_size;
pTask->data = new_buff; buffer->data = new_buff;
return 0;
}
}
int free_queue_push(struct fast_task_info *pTask)
{
int result;
*(pTask->client_ip) = '\0';
pTask->length = 0;
pTask->offset = 0;
pTask->req_count = 0;
if (pTask->size > g_free_queue.min_buff_size) //need thrink
{
_realloc_buffer(pTask, g_free_queue.min_buff_size, false);
}
if ((result=pthread_mutex_lock(&g_free_queue.lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
pTask->next = g_free_queue.head;
g_free_queue.head = pTask;
if (g_free_queue.tail == NULL)
{
g_free_queue.tail = pTask;
}
if ((result=pthread_mutex_unlock(&g_free_queue.lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return result;
}
int free_queue_count()
{
return task_queue_count(&g_free_queue);
}
int free_queue_alloc_connections()
{
return g_free_queue.alloc_connections;
}
int free_queue_set_buffer_size(struct fast_task_info *pTask,
const int expect_size)
{
return task_queue_set_buffer_size(&g_free_queue, pTask, expect_size);
}
int free_queue_realloc_buffer(struct fast_task_info *pTask,
const int expect_size)
{
return task_queue_realloc_buffer(&g_free_queue, pTask, expect_size);
}
int free_queue_set_max_buffer_size(struct fast_task_info *pTask)
{
return task_queue_set_buffer_size(&g_free_queue, pTask,
g_free_queue.max_buff_size);
}
int free_queue_realloc_max_buffer(struct fast_task_info *pTask)
{
return task_queue_realloc_buffer(&g_free_queue, pTask,
g_free_queue.max_buff_size);
}
int task_queue_push(struct fast_task_queue *pQueue, \
struct fast_task_info *pTask)
{
int result;
if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
pTask->next = NULL;
if (pQueue->tail == NULL)
{
pQueue->head = pTask;
}
else
{
pQueue->tail->next = pTask;
}
pQueue->tail = pTask;
if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return 0; return 0;
} }
struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue) void free_queue_push(struct fast_task_info *task)
{ {
struct fast_task_info *pTask; *(task->client_ip) = '\0';
int result; task->send.ptr->length = 0;
task->send.ptr->offset = 0;
if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) task->req_count = 0;
{ if (task->send.ptr->size > task->free_queue->min_buff_size) {//need thrink
logError("file: "__FILE__", line: %d, " \ _realloc_buffer(task->send.ptr, task->free_queue->min_buff_size, false);
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return NULL;
} }
pTask = pQueue->head; if (task->free_queue->double_buffers) {
if (pTask != NULL) task->recv.ptr->length = 0;
{ task->recv.ptr->offset = 0;
pQueue->head = pTask->next; if (task->recv.ptr->size > task->free_queue->min_buff_size) {
if (pQueue->head == NULL) _realloc_buffer(task->recv.ptr, task->free_queue->
{ min_buff_size, false);
pQueue->tail = NULL;
} }
} }
if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) fast_mblock_free_object(&task->free_queue->allocator, task);
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return pTask;
} }
int task_queue_count(struct fast_task_queue *pQueue) int free_queue_get_new_buffer_size(const int min_buff_size,
{
struct fast_task_info *pTask;
int count;
int result;
if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return 0;
}
count = 0;
pTask = pQueue->head;
while (pTask != NULL)
{
pTask = pTask->next;
count++;
}
if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_unlock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
return count;
}
int task_queue_get_new_buffer_size(const int min_buff_size,
const int max_buff_size, const int expect_size, int *new_size) const int max_buff_size, const int expect_size, int *new_size)
{ {
if (min_buff_size == max_buff_size) if (min_buff_size == max_buff_size) {
{
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"can't change buffer size because NOT supported", __LINE__); "can't change buffer size because NOT supported", __LINE__);
return EOPNOTSUPP; return EOPNOTSUPP;
} }
if (expect_size > max_buff_size) if (expect_size > max_buff_size) {
{
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"can't change buffer size because expect buffer size: %d " "can't change buffer size because expect buffer size: %d "
"exceeds max buffer size: %d", __LINE__, expect_size, "exceeds max buffer size: %d", __LINE__, expect_size,
@ -683,14 +210,11 @@ int task_queue_get_new_buffer_size(const int min_buff_size,
} }
*new_size = min_buff_size; *new_size = min_buff_size;
if (expect_size > min_buff_size) if (expect_size > min_buff_size) {
{ while (*new_size < expect_size) {
while (*new_size < expect_size)
{
*new_size *= 2; *new_size *= 2;
} }
if (*new_size > max_buff_size) if (*new_size > max_buff_size) {
{
*new_size = max_buff_size; *new_size = max_buff_size;
} }
} }
@ -698,41 +222,43 @@ int task_queue_get_new_buffer_size(const int min_buff_size,
return 0; return 0;
} }
#define _get_new_buffer_size(pQueue, expect_size, new_size) \ #define _get_new_buffer_size(queue, expect_size, new_size) \
task_queue_get_new_buffer_size(pQueue->min_buff_size, \ free_queue_get_new_buffer_size(queue->min_buff_size, \
pQueue->max_buff_size, expect_size, new_size) queue->max_buff_size, expect_size, new_size)
int task_queue_set_buffer_size(struct fast_task_queue *pQueue, int free_queue_set_buffer_size(struct fast_task_info *task,
struct fast_task_info *pTask, const int expect_size) struct fast_net_buffer *buffer, const int expect_size)
{ {
int result; int result;
int new_size; int new_size;
if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) { if ((result=_get_new_buffer_size(task->free_queue,
expect_size, &new_size)) != 0)
{
return result; return result;
} }
if (pTask->size == new_size) //do NOT need change buffer size if (buffer->size == new_size) { //do NOT need change buffer size
{
return 0; return 0;
} }
return _realloc_buffer(pTask, new_size, false); return _realloc_buffer(buffer, new_size, false);
} }
int task_queue_realloc_buffer(struct fast_task_queue *pQueue, int free_queue_realloc_buffer(struct fast_task_info *task,
struct fast_task_info *pTask, const int expect_size) struct fast_net_buffer *buffer, const int expect_size)
{ {
int result; int result;
int new_size; int new_size;
if (pTask->size >= expect_size) //do NOT need change buffer size if (buffer->size >= expect_size) { //do NOT need change buffer size
{
return 0; return 0;
} }
if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) { if ((result=_get_new_buffer_size(task->free_queue,
expect_size, &new_size)) != 0)
{
return result; return result;
} }
return _realloc_buffer(pTask, new_size, true); return _realloc_buffer(buffer, new_size, true);
} }

View File

@ -23,9 +23,10 @@
#include <string.h> #include <string.h>
#include <pthread.h> #include <pthread.h>
#include "common_define.h" #include "common_define.h"
#include "fc_list.h"
#include "ioevent.h" #include "ioevent.h"
#include "fast_timer.h" #include "fast_timer.h"
#include "fc_list.h" #include "fast_mblock.h"
#define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0] #define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0]
#define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1] #define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1]
@ -36,9 +37,9 @@ struct nio_thread_data;
struct fast_task_info; struct fast_task_info;
typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData); typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData);
typedef int (*TaskFinishCallback) (struct fast_task_info *pTask); typedef int (*TaskFinishCallback) (struct fast_task_info *task);
typedef void (*TaskCleanUpCallback) (struct fast_task_info *pTask); typedef void (*TaskCleanUpCallback) (struct fast_task_info *task);
typedef int (*TaskInitCallback)(struct fast_task_info *pTask); typedef int (*TaskInitCallback)(struct fast_task_info *task);
typedef void (*IOEventCallback) (int sock, short event, void *arg); typedef void (*IOEventCallback) (int sock, short event, void *arg);
typedef int (*TaskContinueCallback)(struct fast_task_info *task); typedef int (*TaskContinueCallback)(struct fast_task_info *task);
@ -83,6 +84,21 @@ struct ioevent_notify_entry
struct nio_thread_data *thread_data; struct nio_thread_data *thread_data;
}; };
struct fast_net_buffer
{
int size; //alloc size
int length; //data length
int offset; //current offset
char *data; //buffer for write or read
};
struct fast_net_buffer_wrapper
{
struct fast_net_buffer holder;
struct fast_net_buffer *ptr;
};
struct fast_task_queue;
struct fast_task_info struct fast_task_info
{ {
IOEventEntry event; //must first IOEventEntry event; //must first
@ -91,7 +107,6 @@ struct fast_task_info
char client_ip[IP_ADDRESS_SIZE]; char client_ip[IP_ADDRESS_SIZE];
}; };
void *arg; //extra argument pointer void *arg; //extra argument pointer
char *data; //buffer for write or read
char *recv_body; //for extra (dynamic) recv buffer char *recv_body; //for extra (dynamic) recv buffer
struct { struct {
@ -99,9 +114,9 @@ struct fast_task_info
int count; int count;
} iovec_array; //for writev } iovec_array; //for writev
int size; //alloc size struct fast_net_buffer_wrapper send; //send buffer
int length; //data length struct fast_net_buffer_wrapper recv; //recv buffer
int offset; //current offset
uint16_t port; //peer port uint16_t port; //peer port
struct { struct {
uint8_t current; uint8_t current;
@ -125,23 +140,20 @@ struct fast_task_info
struct sf_network_handler *handler; //network handler for libserverframe nio struct sf_network_handler *handler; //network handler for libserverframe nio
struct fast_task_info *next; //for free queue and deleted list struct fast_task_info *next; //for free queue and deleted list
struct fast_task_info *notify_next; //for nio notify queue struct fast_task_info *notify_next; //for nio notify queue
struct fast_task_queue *free_queue; //task allocator
char conn[0]; //for RDMA connection char conn[0]; //for RDMA connection
}; };
struct fast_task_queue struct fast_task_queue
{ {
struct fast_task_info *head;
struct fast_task_info *tail;
pthread_mutex_t lock;
int max_connections;
int alloc_connections;
int alloc_task_once;
int min_buff_size; int min_buff_size;
int max_buff_size; int max_buff_size;
int padding_size; //for last field: conn[0] int padding_size; //for last field: conn[0]
int arg_size; //for arg pointer int arg_size; //for arg pointer
int block_size; int block_size;
bool malloc_whole_block; bool malloc_whole_block;
bool double_buffers; //if send buffer and recv buffer are independent
struct fast_mblock_man allocator;
TaskInitCallback init_callback; TaskInitCallback init_callback;
}; };
@ -149,57 +161,119 @@ struct fast_task_queue
extern "C" { extern "C" {
#endif #endif
int free_queue_init_ex2(const int max_connections, const int init_connections, int free_queue_init_ex2(struct fast_task_queue *queue, const char *name,
const bool double_buffers, const int max_connections,
const int alloc_task_once, const int min_buff_size, const int alloc_task_once, const int min_buff_size,
const int max_buff_size, const int padding_size, const int max_buff_size, const int padding_size,
const int arg_size, TaskInitCallback init_callback); const int arg_size, TaskInitCallback init_callback);
static inline int free_queue_init_ex(const int max_connections, static inline int free_queue_init_ex(struct fast_task_queue *queue,
const int init_connections, const int alloc_task_once, const char *name, const bool double_buffers,
const int min_buff_size, const int max_buff_size, const int arg_size) const int max_connections, const int alloc_task_once,
const int min_buff_size, const int max_buff_size,
const int arg_size)
{ {
const int padding_size = 0; const int padding_size = 0;
return free_queue_init_ex2(max_connections, init_connections, return free_queue_init_ex2(queue, name, double_buffers, max_connections,
alloc_task_once, min_buff_size, max_buff_size, alloc_task_once, min_buff_size, max_buff_size, padding_size,
padding_size, arg_size, NULL); arg_size, NULL);
} }
static inline int free_queue_init(const int max_connections, void free_queue_destroy(struct fast_task_queue *queue);
const int min_buff_size, const int max_buff_size, const int arg_size)
static inline struct fast_task_info *free_queue_pop(
struct fast_task_queue *queue)
{ {
const int padding_size = 0; return fast_mblock_alloc_object(&queue->allocator);
return free_queue_init_ex2(max_connections, max_connections, 0,
min_buff_size, max_buff_size, padding_size, arg_size, NULL);
} }
void free_queue_destroy(); void free_queue_push(struct fast_task_info *task);
int free_queue_push(struct fast_task_info *pTask); static inline int free_queue_count(struct fast_task_queue *queue)
struct fast_task_info *free_queue_pop(); {
int free_queue_count(); return queue->allocator.info.element_total_count -
int free_queue_alloc_connections(); queue->allocator.info.element_used_count;
int free_queue_set_buffer_size(struct fast_task_info *pTask, }
const int expect_size);
int free_queue_realloc_buffer(struct fast_task_info *pTask,
const int expect_size);
int free_queue_set_max_buffer_size(struct fast_task_info *pTask); static inline int free_queue_alloc_connections(struct fast_task_queue *queue)
{
return queue->allocator.info.element_total_count;
}
int free_queue_realloc_max_buffer(struct fast_task_info *pTask); int free_queue_get_new_buffer_size(const int min_buff_size,
int task_queue_init(struct fast_task_queue *pQueue);
int task_queue_push(struct fast_task_queue *pQueue, \
struct fast_task_info *pTask);
struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue);
int task_queue_count(struct fast_task_queue *pQueue);
int task_queue_set_buffer_size(struct fast_task_queue *pQueue,
struct fast_task_info *pTask, const int expect_size);
int task_queue_realloc_buffer(struct fast_task_queue *pQueue,
struct fast_task_info *pTask, const int expect_size);
int task_queue_get_new_buffer_size(const int min_buff_size,
const int max_buff_size, const int expect_size, int *new_size); const int max_buff_size, const int expect_size, int *new_size);
int free_queue_set_buffer_size(struct fast_task_info *task,
struct fast_net_buffer *buffer, const int expect_size);
static inline int free_queue_set_max_buffer_size(
struct fast_task_info *task,
struct fast_net_buffer *buffer)
{
return free_queue_set_buffer_size(task, buffer,
task->free_queue->max_buff_size);
}
int free_queue_realloc_buffer(struct fast_task_info *task,
struct fast_net_buffer *buffer, const int expect_size);
static inline int free_queue_realloc_max_buffer(
struct fast_task_info *task,
struct fast_net_buffer *buffer)
{
return free_queue_realloc_buffer(task, buffer,
task->free_queue->max_buff_size);
}
/* send and recv buffer operations */
static inline int free_queue_set_send_buffer_size(
struct fast_task_info *task, const int expect_size)
{
return free_queue_set_buffer_size(task, task->send.ptr, expect_size);
}
static inline int free_queue_set_recv_buffer_size(
struct fast_task_info *task, const int expect_size)
{
return free_queue_set_buffer_size(task, task->recv.ptr, expect_size);
}
static inline int free_queue_set_send_max_buffer_size(
struct fast_task_info *task)
{
return free_queue_set_max_buffer_size(task, task->send.ptr);
}
static inline int free_queue_set_recv_max_buffer_size(
struct fast_task_info *task)
{
return free_queue_set_max_buffer_size(task, task->recv.ptr);
}
static inline int free_queue_realloc_send_buffer(
struct fast_task_info *task, const int expect_size)
{
return free_queue_realloc_buffer(task, task->send.ptr, expect_size);
}
static inline int free_queue_realloc_recv_buffer(
struct fast_task_info *task, const int expect_size)
{
return free_queue_realloc_buffer(task, task->recv.ptr, expect_size);
}
static inline int free_queue_realloc_send_max_buffer(
struct fast_task_info *task)
{
return free_queue_realloc_max_buffer(task, task->send.ptr);
}
static inline int free_queue_realloc_recv_max_buffer(
struct fast_task_info *task)
{
return free_queue_realloc_max_buffer(task, task->recv.ptr);
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif