diff --git a/HISTORY b/HISTORY index 77590d2..66015e4 100644 --- a/HISTORY +++ b/HISTORY @@ -1,11 +1,12 @@ -Version 1.70 2023-09-20 +Version 1.70 2023-09-25 * get full mac address of infiniband NIC under Linux * struct fast_task_info add field conn for RDMA connection * server_id_func.[hc]: support communication type * connection_pool.[hc] support callbacks for RDMA * nio thread data support busy_polling_callback * connection_pool.[hc] support thread local for performance + * struct fast_task_info support send and recv double buffers Version 1.69 2023-08-05 * bugfixed: array_allocator_alloc MUST init the array diff --git a/src/fast_task_queue.c b/src/fast_task_queue.c index 1757cd2..a0550c3 100644 --- a/src/fast_task_queue.c +++ b/src/fast_task_queue.c @@ -25,656 +25,183 @@ #include "fc_memory.h" #include "fast_task_queue.h" -static struct fast_task_queue g_free_queue; - -struct mpool_node { - struct fast_task_info *blocks; - struct fast_task_info *last_block; //last block - struct mpool_node *next; -}; - -struct mpool_chain { - struct mpool_node *head; - struct mpool_node *tail; -}; - -static struct mpool_chain g_mpool = {NULL, NULL}; - -int task_queue_init(struct fast_task_queue *pQueue) +static int task_alloc_init(struct fast_task_info *task, + struct fast_task_queue *queue) { - int result; - - if ((result=init_pthread_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "init_pthread_lock fail, errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - - pQueue->head = NULL; - pQueue->tail = NULL; - - return 0; -} - -static void free_mpool(struct mpool_node *mpool, char *end) -{ - char *pt; - for (pt=(char *)mpool->blocks; pt < end; pt += g_free_queue.block_size) - { - free(((struct fast_task_info *)pt)->data); + task->arg = (char *)task + ALIGNED_TASK_INFO_SIZE + queue->padding_size; + task->send.ptr = &task->send.holder; + task->send.ptr->size = queue->min_buff_size; + if (queue->malloc_whole_block) { + task->send.ptr->data = (char *)task->arg + queue->arg_size; + } else { + task->send.ptr->data = (char *)fc_malloc(task->send.ptr->size); + if (task->send.ptr->data == NULL) { + return ENOMEM; + } } - free(mpool->blocks); - free(mpool); -} - -static struct mpool_node *malloc_mpool(const int total_alloc_size) -{ - struct fast_task_info *pTask; - char *p; - char *pCharEnd; - struct mpool_node *mpool; - - mpool = (struct mpool_node *)fc_malloc(sizeof(struct mpool_node)); - if (mpool == NULL) - { - return NULL; - } - - mpool->next = NULL; - mpool->blocks = (struct fast_task_info *)fc_malloc(total_alloc_size); - if (mpool->blocks == NULL) - { - free(mpool); - return NULL; - } - memset(mpool->blocks, 0, total_alloc_size); - - pCharEnd = ((char *)mpool->blocks) + total_alloc_size; - for (p=(char *)mpool->blocks; psize = g_free_queue.min_buff_size; - - pTask->arg = p + ALIGNED_TASK_INFO_SIZE + g_free_queue.padding_size; - if (g_free_queue.malloc_whole_block) - { - pTask->data = (char *)pTask->arg + g_free_queue.arg_size; - } - else - { - pTask->data = (char *)fc_malloc(pTask->size); - if (pTask->data == NULL) - { - free_mpool(mpool, p); - return NULL; - } - } - - if (g_free_queue.init_callback != NULL) - { - if (g_free_queue.init_callback(pTask) != 0) - { - free_mpool(mpool, p); - return NULL; - } + if (queue->double_buffers) { + task->recv.ptr = &task->recv.holder; + task->recv.ptr->size = queue->min_buff_size; + task->recv.ptr->data = (char *)fc_malloc(task->recv.ptr->size); + if (task->recv.ptr->data == NULL) { + return ENOMEM; } - } + } else { + task->recv.ptr = &task->send.holder; + } - mpool->last_block = (struct fast_task_info *) - (pCharEnd - g_free_queue.block_size); - for (p=(char *)mpool->blocks; p<(char *)mpool->last_block; - p += g_free_queue.block_size) - { - pTask = (struct fast_task_info *)p; - pTask->next = (struct fast_task_info *)(p + g_free_queue.block_size); - } - mpool->last_block->next = NULL; - - return mpool; + task->free_queue = queue; + if (queue->init_callback != NULL) { + return queue->init_callback(task); + } + return 0; } -int free_queue_init_ex2(const int max_connections, const int init_connections, +int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, + const bool double_buffers, const int max_connections, const int alloc_task_once, const int min_buff_size, const int max_buff_size, const int padding_size, const int arg_size, TaskInitCallback init_callback) { #define MAX_DATA_SIZE (256 * 1024 * 1024) - int64_t total_size; - struct mpool_node *mpool; - int alloc_size; int alloc_once; - int result; - int loop_count; int aligned_min_size; int aligned_max_size; int aligned_padding_size; int aligned_arg_size; rlim_t max_data_size; - - if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0) - { - logError("file: "__FILE__", line: %d, " - "init_pthread_lock fail, errno: %d, error info: %s", - __LINE__, result, STRERROR(result)); - return result; - } + char aname[64]; aligned_min_size = MEM_ALIGN(min_buff_size); aligned_max_size = MEM_ALIGN(max_buff_size); aligned_padding_size = MEM_ALIGN(padding_size); aligned_arg_size = MEM_ALIGN(arg_size); - g_free_queue.block_size = ALIGNED_TASK_INFO_SIZE + + queue->block_size = ALIGNED_TASK_INFO_SIZE + aligned_padding_size + aligned_arg_size; - alloc_size = g_free_queue.block_size * init_connections; - if (aligned_max_size > aligned_min_size) - { - total_size = alloc_size; - g_free_queue.malloc_whole_block = false; + if (alloc_task_once <= 0) { + alloc_once = FC_MIN(MAX_DATA_SIZE / queue->block_size, 256); + if (alloc_once == 0) { + alloc_once = 1; + } + } else { + alloc_once = alloc_task_once; + } + + if (aligned_max_size > aligned_min_size) { + queue->malloc_whole_block = false; max_data_size = 0; - } - else - { - struct rlimit rlimit_data; + } else { + struct rlimit rlimit_data; - if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) - { - logError("file: "__FILE__", line: %d, " - "call getrlimit fail, " - "errno: %d, error info: %s", - __LINE__, errno, STRERROR(errno)); - return errno != 0 ? errno : EPERM; - } - if (rlimit_data.rlim_cur == RLIM_INFINITY) - { - max_data_size = MAX_DATA_SIZE; - } - else - { - max_data_size = rlimit_data.rlim_cur; - if (max_data_size > MAX_DATA_SIZE) - { - max_data_size = MAX_DATA_SIZE; - } - } + if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) { + logError("file: "__FILE__", line: %d, " + "call getrlimit fail, " + "errno: %d, error info: %s", + __LINE__, errno, STRERROR(errno)); + return errno != 0 ? errno : EPERM; + } + if (rlimit_data.rlim_cur == RLIM_INFINITY) { + max_data_size = MAX_DATA_SIZE; + } else { + max_data_size = rlimit_data.rlim_cur; + if (max_data_size > MAX_DATA_SIZE) { + max_data_size = MAX_DATA_SIZE; + } + } - if (max_data_size >= (int64_t)(g_free_queue.block_size + - aligned_min_size) * (int64_t)init_connections) - { - total_size = alloc_size + (int64_t)aligned_min_size * - init_connections; - g_free_queue.malloc_whole_block = true; - g_free_queue.block_size += aligned_min_size; - } - else - { - total_size = alloc_size; - g_free_queue.malloc_whole_block = false; - max_data_size = 0; - } - } - - g_free_queue.max_connections = max_connections; - g_free_queue.alloc_connections = init_connections; - if (alloc_task_once <= 0) - { - g_free_queue.alloc_task_once = 256; - alloc_once = MAX_DATA_SIZE / g_free_queue.block_size; - if (g_free_queue.alloc_task_once > alloc_once) + if (max_data_size >= (int64_t)(queue->block_size + + aligned_min_size) * (int64_t)alloc_once) { - g_free_queue.alloc_task_once = alloc_once > 0 ? alloc_once : 1; + queue->malloc_whole_block = true; + queue->block_size += aligned_min_size; + } else { + queue->malloc_whole_block = false; + max_data_size = 0; } } - else - { - g_free_queue.alloc_task_once = alloc_task_once; - } - g_free_queue.min_buff_size = aligned_min_size; - g_free_queue.max_buff_size = aligned_max_size; - g_free_queue.padding_size = aligned_padding_size; - g_free_queue.arg_size = aligned_arg_size; - g_free_queue.init_callback = init_callback; - logDebug("file: "__FILE__", line: %d, " - "max_connections: %d, init_connections: %d, alloc_task_once: %d, " + queue->double_buffers = double_buffers; + queue->min_buff_size = aligned_min_size; + queue->max_buff_size = aligned_max_size; + queue->padding_size = aligned_padding_size; + queue->arg_size = aligned_arg_size; + queue->init_callback = init_callback; + + /* + logInfo("file: "__FILE__", line: %d, [%s] double_buffers: %d, " + "max_connections: %d, alloc_once: %d, malloc_whole_block: %d, " "min_buff_size: %d, max_buff_size: %d, block_size: %d, " - "padding_size: %d, arg_size: %d, max_data_size: %"PRId64", " - "total_size: %"PRId64, __LINE__, max_connections, init_connections, - g_free_queue.alloc_task_once, aligned_min_size, aligned_max_size, - g_free_queue.block_size, aligned_padding_size, aligned_arg_size, - (int64_t)max_data_size, total_size); + "padding_size: %d, arg_size: %d, max_data_size: %"PRId64, + __LINE__, name, double_buffers, max_connections, alloc_once, + queue->malloc_whole_block, aligned_min_size, aligned_max_size, + queue->block_size, aligned_padding_size, aligned_arg_size, + (int64_t)max_data_size); + */ - if ((!g_free_queue.malloc_whole_block) || (total_size <= max_data_size)) - { - loop_count = 1; - mpool = malloc_mpool(total_size); - if (mpool == NULL) - { - return errno != 0 ? errno : ENOMEM; - } - g_mpool.head = mpool; - g_mpool.tail = mpool; - } - else - { - int remain_count; - int alloc_count; - int current_alloc_size; - - loop_count = 0; - remain_count = init_connections; - alloc_once = max_data_size / g_free_queue.block_size; - while (remain_count > 0) - { - alloc_count = (remain_count > alloc_once) ? - alloc_once : remain_count; - current_alloc_size = g_free_queue.block_size * alloc_count; - mpool = malloc_mpool(current_alloc_size); - if (mpool == NULL) - { - free_queue_destroy(); - return errno != 0 ? errno : ENOMEM; - } - - if (g_mpool.tail == NULL) - { - g_mpool.head = mpool; - } - else - { - g_mpool.tail->next = mpool; - g_mpool.tail->last_block->next = mpool->blocks; //link previous mpool to current - } - g_mpool.tail = mpool; - - remain_count -= alloc_count; - loop_count++; - } - - logDebug("file: "__FILE__", line: %d, " \ - "alloc_once: %d", __LINE__, alloc_once); - } - - logDebug("file: "__FILE__", line: %d, " \ - "malloc task info as whole: %d, malloc loop count: %d", \ - __LINE__, g_free_queue.malloc_whole_block, loop_count); - - if (g_mpool.head != NULL) - { - g_free_queue.head = g_mpool.head->blocks; - g_free_queue.tail = g_mpool.tail->last_block; - } - - return 0; + snprintf(aname, sizeof(aname), "%s-task", name); + return fast_mblock_init_ex1(&queue->allocator, aname, + queue->block_size, alloc_once, max_connections, + (fast_mblock_object_init_func)task_alloc_init, + queue, true); } -void free_queue_destroy() +void free_queue_destroy(struct fast_task_queue *queue) { - struct mpool_node *mpool; - struct mpool_node *mp; - - if (g_mpool.head == NULL) - { - return; - } - - if (!g_free_queue.malloc_whole_block) - { - char *p; - char *pCharEnd; - struct fast_task_info *pTask; - - mpool = g_mpool.head; - while (mpool != NULL) - { - pCharEnd = (char *)mpool->last_block + g_free_queue.block_size; - for (p=(char *)mpool->blocks; pdata != NULL) - { - free(pTask->data); - pTask->data = NULL; - } - } - mpool = mpool->next; - } - } - - mpool = g_mpool.head; - while (mpool != NULL) - { - mp = mpool; - mpool = mpool->next; - - free(mp->blocks); - free(mp); - } - g_mpool.head = g_mpool.tail = NULL; - - pthread_mutex_destroy(&(g_free_queue.lock)); + fast_mblock_destroy(&queue->allocator); } -static int free_queue_realloc() -{ - struct mpool_node *mpool; - struct fast_task_info *head; - struct fast_task_info *tail; - int remain_count; - int alloc_count; - int current_alloc_size; - - head = tail = NULL; - remain_count = g_free_queue.max_connections - - g_free_queue.alloc_connections; - alloc_count = (remain_count > g_free_queue.alloc_task_once) ? - g_free_queue.alloc_task_once : remain_count; - if (alloc_count > 0) - { - current_alloc_size = g_free_queue.block_size * alloc_count; - mpool = malloc_mpool(current_alloc_size); - if (mpool == NULL) - { - return ENOMEM; - } - - if (g_mpool.tail == NULL) - { - g_mpool.head = mpool; - } - else - { - g_mpool.tail->next = mpool; - } - g_mpool.tail = mpool; - - head = mpool->blocks; - tail = mpool->last_block; - - remain_count -= alloc_count; - } - else { - return ENOSPC; - } - - if (g_free_queue.head == NULL) - { - g_free_queue.head = head; - } - if (g_free_queue.tail != NULL) - { - g_free_queue.tail->next = head; - } - g_free_queue.tail = tail; - - g_free_queue.alloc_connections += alloc_count; - - logDebug("file: "__FILE__", line: %d, " - "alloc_connections: %d, realloc %d elements", __LINE__, - g_free_queue.alloc_connections, alloc_count); - - return 0; -} - -struct fast_task_info *free_queue_pop() -{ - struct fast_task_info *pTask; - int i; - - if ((pTask=task_queue_pop(&g_free_queue)) != NULL) - { - return pTask; - } - - if (g_free_queue.alloc_connections >= g_free_queue.max_connections) - { - return NULL; - } - - for (i=0; i<10; i++) - { - pthread_mutex_lock(&g_free_queue.lock); - if (g_free_queue.alloc_connections >= g_free_queue.max_connections) - { - if (g_free_queue.head == NULL) - { - pthread_mutex_unlock(&g_free_queue.lock); - return NULL; - } - } - else - { - if (g_free_queue.head == NULL && free_queue_realloc() != 0) - { - pthread_mutex_unlock(&g_free_queue.lock); - return NULL; - } - } - pthread_mutex_unlock(&g_free_queue.lock); - - if ((pTask=task_queue_pop(&g_free_queue)) != NULL) - { - return pTask; - } - } - - return NULL; -} - -static int _realloc_buffer(struct fast_task_info *pTask, const int new_size, - const bool copy_data) +static int _realloc_buffer(struct fast_net_buffer *buffer, + const int new_size, const bool copy_data) { char *new_buff; new_buff = (char *)fc_malloc(new_size); - if (new_buff == NULL) - { + if (new_buff == NULL) { return ENOMEM; } - else - { - if (copy_data && pTask->offset > 0) { - memcpy(new_buff, pTask->data, pTask->offset); - } - free(pTask->data); - pTask->size = new_size; - pTask->data = new_buff; - return 0; + + if (copy_data && buffer->offset > 0) { + memcpy(new_buff, buffer->data, buffer->offset); } + free(buffer->data); + buffer->size = new_size; + buffer->data = new_buff; + return 0; } -int free_queue_push(struct fast_task_info *pTask) +void free_queue_push(struct fast_task_info *task) { - int result; + *(task->client_ip) = '\0'; + task->send.ptr->length = 0; + task->send.ptr->offset = 0; + task->req_count = 0; + if (task->send.ptr->size > task->free_queue->min_buff_size) {//need thrink + _realloc_buffer(task->send.ptr, task->free_queue->min_buff_size, false); + } - *(pTask->client_ip) = '\0'; - pTask->length = 0; - pTask->offset = 0; - pTask->req_count = 0; + if (task->free_queue->double_buffers) { + task->recv.ptr->length = 0; + task->recv.ptr->offset = 0; + if (task->recv.ptr->size > task->free_queue->min_buff_size) { + _realloc_buffer(task->recv.ptr, task->free_queue-> + min_buff_size, false); + } + } - if (pTask->size > g_free_queue.min_buff_size) //need thrink - { - _realloc_buffer(pTask, g_free_queue.min_buff_size, false); - } - - if ((result=pthread_mutex_lock(&g_free_queue.lock)) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - pTask->next = g_free_queue.head; - g_free_queue.head = pTask; - if (g_free_queue.tail == NULL) - { - g_free_queue.tail = pTask; - } - - if ((result=pthread_mutex_unlock(&g_free_queue.lock)) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return result; + fast_mblock_free_object(&task->free_queue->allocator, task); } -int free_queue_count() -{ - return task_queue_count(&g_free_queue); -} - -int free_queue_alloc_connections() -{ - return g_free_queue.alloc_connections; -} - -int free_queue_set_buffer_size(struct fast_task_info *pTask, - const int expect_size) -{ - return task_queue_set_buffer_size(&g_free_queue, pTask, expect_size); -} - -int free_queue_realloc_buffer(struct fast_task_info *pTask, - const int expect_size) -{ - return task_queue_realloc_buffer(&g_free_queue, pTask, expect_size); -} - -int free_queue_set_max_buffer_size(struct fast_task_info *pTask) -{ - return task_queue_set_buffer_size(&g_free_queue, pTask, - g_free_queue.max_buff_size); -} - -int free_queue_realloc_max_buffer(struct fast_task_info *pTask) -{ - return task_queue_realloc_buffer(&g_free_queue, pTask, - g_free_queue.max_buff_size); -} -int task_queue_push(struct fast_task_queue *pQueue, \ - struct fast_task_info *pTask) -{ - int result; - - if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return result; - } - - pTask->next = NULL; - if (pQueue->tail == NULL) - { - pQueue->head = pTask; - } - else - { - pQueue->tail->next = pTask; - } - pQueue->tail = pTask; - - if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return 0; -} - -struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue) -{ - struct fast_task_info *pTask; - int result; - - if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return NULL; - } - - pTask = pQueue->head; - if (pTask != NULL) - { - pQueue->head = pTask->next; - if (pQueue->head == NULL) - { - pQueue->tail = NULL; - } - } - - if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return pTask; -} - -int task_queue_count(struct fast_task_queue *pQueue) -{ - struct fast_task_info *pTask; - int count; - int result; - - if ((result=pthread_mutex_lock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_lock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - return 0; - } - - count = 0; - pTask = pQueue->head; - while (pTask != NULL) - { - pTask = pTask->next; - count++; - } - - if ((result=pthread_mutex_unlock(&(pQueue->lock))) != 0) - { - logError("file: "__FILE__", line: %d, " \ - "call pthread_mutex_unlock fail, " \ - "errno: %d, error info: %s", \ - __LINE__, result, STRERROR(result)); - } - - return count; -} - -int task_queue_get_new_buffer_size(const int min_buff_size, +int free_queue_get_new_buffer_size(const int min_buff_size, const int max_buff_size, const int expect_size, int *new_size) { - if (min_buff_size == max_buff_size) - { + if (min_buff_size == max_buff_size) { logError("file: "__FILE__", line: %d, " "can't change buffer size because NOT supported", __LINE__); return EOPNOTSUPP; } - if (expect_size > max_buff_size) - { + if (expect_size > max_buff_size) { logError("file: "__FILE__", line: %d, " "can't change buffer size because expect buffer size: %d " "exceeds max buffer size: %d", __LINE__, expect_size, @@ -683,14 +210,11 @@ int task_queue_get_new_buffer_size(const int min_buff_size, } *new_size = min_buff_size; - if (expect_size > min_buff_size) - { - while (*new_size < expect_size) - { + if (expect_size > min_buff_size) { + while (*new_size < expect_size) { *new_size *= 2; } - if (*new_size > max_buff_size) - { + if (*new_size > max_buff_size) { *new_size = max_buff_size; } } @@ -698,41 +222,43 @@ int task_queue_get_new_buffer_size(const int min_buff_size, return 0; } -#define _get_new_buffer_size(pQueue, expect_size, new_size) \ - task_queue_get_new_buffer_size(pQueue->min_buff_size, \ - pQueue->max_buff_size, expect_size, new_size) +#define _get_new_buffer_size(queue, expect_size, new_size) \ + free_queue_get_new_buffer_size(queue->min_buff_size, \ + queue->max_buff_size, expect_size, new_size) -int task_queue_set_buffer_size(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size) +int free_queue_set_buffer_size(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size) { int result; int new_size; - if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) { + if ((result=_get_new_buffer_size(task->free_queue, + expect_size, &new_size)) != 0) + { return result; } - if (pTask->size == new_size) //do NOT need change buffer size - { + if (buffer->size == new_size) { //do NOT need change buffer size return 0; } - return _realloc_buffer(pTask, new_size, false); + return _realloc_buffer(buffer, new_size, false); } -int task_queue_realloc_buffer(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size) +int free_queue_realloc_buffer(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size) { int result; int new_size; - if (pTask->size >= expect_size) //do NOT need change buffer size - { + if (buffer->size >= expect_size) { //do NOT need change buffer size return 0; } - if ((result=_get_new_buffer_size(pQueue, expect_size, &new_size)) != 0) { + if ((result=_get_new_buffer_size(task->free_queue, + expect_size, &new_size)) != 0) + { return result; } - return _realloc_buffer(pTask, new_size, true); + return _realloc_buffer(buffer, new_size, true); } diff --git a/src/fast_task_queue.h b/src/fast_task_queue.h index 1f58f62..b5ef4f8 100644 --- a/src/fast_task_queue.h +++ b/src/fast_task_queue.h @@ -23,9 +23,10 @@ #include #include #include "common_define.h" +#include "fc_list.h" #include "ioevent.h" #include "fast_timer.h" -#include "fc_list.h" +#include "fast_mblock.h" #define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0] #define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1] @@ -36,9 +37,9 @@ struct nio_thread_data; struct fast_task_info; typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData); -typedef int (*TaskFinishCallback) (struct fast_task_info *pTask); -typedef void (*TaskCleanUpCallback) (struct fast_task_info *pTask); -typedef int (*TaskInitCallback)(struct fast_task_info *pTask); +typedef int (*TaskFinishCallback) (struct fast_task_info *task); +typedef void (*TaskCleanUpCallback) (struct fast_task_info *task); +typedef int (*TaskInitCallback)(struct fast_task_info *task); typedef void (*IOEventCallback) (int sock, short event, void *arg); typedef int (*TaskContinueCallback)(struct fast_task_info *task); @@ -83,6 +84,21 @@ struct ioevent_notify_entry struct nio_thread_data *thread_data; }; +struct fast_net_buffer +{ + int size; //alloc size + int length; //data length + int offset; //current offset + char *data; //buffer for write or read +}; + +struct fast_net_buffer_wrapper +{ + struct fast_net_buffer holder; + struct fast_net_buffer *ptr; +}; + +struct fast_task_queue; struct fast_task_info { IOEventEntry event; //must first @@ -91,7 +107,6 @@ struct fast_task_info char client_ip[IP_ADDRESS_SIZE]; }; void *arg; //extra argument pointer - char *data; //buffer for write or read char *recv_body; //for extra (dynamic) recv buffer struct { @@ -99,9 +114,9 @@ struct fast_task_info int count; } iovec_array; //for writev - int size; //alloc size - int length; //data length - int offset; //current offset + struct fast_net_buffer_wrapper send; //send buffer + struct fast_net_buffer_wrapper recv; //recv buffer + uint16_t port; //peer port struct { uint8_t current; @@ -125,23 +140,20 @@ struct fast_task_info struct sf_network_handler *handler; //network handler for libserverframe nio struct fast_task_info *next; //for free queue and deleted list struct fast_task_info *notify_next; //for nio notify queue + struct fast_task_queue *free_queue; //task allocator char conn[0]; //for RDMA connection }; struct fast_task_queue { - struct fast_task_info *head; - struct fast_task_info *tail; - pthread_mutex_t lock; - int max_connections; - int alloc_connections; - int alloc_task_once; - int min_buff_size; - int max_buff_size; + int min_buff_size; + int max_buff_size; int padding_size; //for last field: conn[0] - int arg_size; //for arg pointer - int block_size; - bool malloc_whole_block; + int arg_size; //for arg pointer + int block_size; + bool malloc_whole_block; + bool double_buffers; //if send buffer and recv buffer are independent + struct fast_mblock_man allocator; TaskInitCallback init_callback; }; @@ -149,57 +161,119 @@ struct fast_task_queue extern "C" { #endif -int free_queue_init_ex2(const int max_connections, const int init_connections, +int free_queue_init_ex2(struct fast_task_queue *queue, const char *name, + const bool double_buffers, const int max_connections, const int alloc_task_once, const int min_buff_size, const int max_buff_size, const int padding_size, const int arg_size, TaskInitCallback init_callback); -static inline int free_queue_init_ex(const int max_connections, - const int init_connections, const int alloc_task_once, - const int min_buff_size, const int max_buff_size, const int arg_size) +static inline int free_queue_init_ex(struct fast_task_queue *queue, + const char *name, const bool double_buffers, + const int max_connections, const int alloc_task_once, + const int min_buff_size, const int max_buff_size, + const int arg_size) { const int padding_size = 0; - return free_queue_init_ex2(max_connections, init_connections, - alloc_task_once, min_buff_size, max_buff_size, - padding_size, arg_size, NULL); + return free_queue_init_ex2(queue, name, double_buffers, max_connections, + alloc_task_once, min_buff_size, max_buff_size, padding_size, + arg_size, NULL); } -static inline int free_queue_init(const int max_connections, - const int min_buff_size, const int max_buff_size, const int arg_size) +void free_queue_destroy(struct fast_task_queue *queue); + +static inline struct fast_task_info *free_queue_pop( + struct fast_task_queue *queue) { - const int padding_size = 0; - return free_queue_init_ex2(max_connections, max_connections, 0, - min_buff_size, max_buff_size, padding_size, arg_size, NULL); + return fast_mblock_alloc_object(&queue->allocator); } -void free_queue_destroy(); +void free_queue_push(struct fast_task_info *task); -int free_queue_push(struct fast_task_info *pTask); -struct fast_task_info *free_queue_pop(); -int free_queue_count(); -int free_queue_alloc_connections(); -int free_queue_set_buffer_size(struct fast_task_info *pTask, - const int expect_size); -int free_queue_realloc_buffer(struct fast_task_info *pTask, - const int expect_size); +static inline int free_queue_count(struct fast_task_queue *queue) +{ + return queue->allocator.info.element_total_count - + queue->allocator.info.element_used_count; +} -int free_queue_set_max_buffer_size(struct fast_task_info *pTask); +static inline int free_queue_alloc_connections(struct fast_task_queue *queue) +{ + return queue->allocator.info.element_total_count; +} -int free_queue_realloc_max_buffer(struct fast_task_info *pTask); - -int task_queue_init(struct fast_task_queue *pQueue); -int task_queue_push(struct fast_task_queue *pQueue, \ - struct fast_task_info *pTask); -struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue); -int task_queue_count(struct fast_task_queue *pQueue); -int task_queue_set_buffer_size(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size); -int task_queue_realloc_buffer(struct fast_task_queue *pQueue, - struct fast_task_info *pTask, const int expect_size); - -int task_queue_get_new_buffer_size(const int min_buff_size, +int free_queue_get_new_buffer_size(const int min_buff_size, const int max_buff_size, const int expect_size, int *new_size); +int free_queue_set_buffer_size(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size); + +static inline int free_queue_set_max_buffer_size( + struct fast_task_info *task, + struct fast_net_buffer *buffer) +{ + return free_queue_set_buffer_size(task, buffer, + task->free_queue->max_buff_size); +} + +int free_queue_realloc_buffer(struct fast_task_info *task, + struct fast_net_buffer *buffer, const int expect_size); + +static inline int free_queue_realloc_max_buffer( + struct fast_task_info *task, + struct fast_net_buffer *buffer) +{ + return free_queue_realloc_buffer(task, buffer, + task->free_queue->max_buff_size); +} + +/* send and recv buffer operations */ +static inline int free_queue_set_send_buffer_size( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_set_buffer_size(task, task->send.ptr, expect_size); +} + +static inline int free_queue_set_recv_buffer_size( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_set_buffer_size(task, task->recv.ptr, expect_size); +} + +static inline int free_queue_set_send_max_buffer_size( + struct fast_task_info *task) +{ + return free_queue_set_max_buffer_size(task, task->send.ptr); +} + +static inline int free_queue_set_recv_max_buffer_size( + struct fast_task_info *task) +{ + return free_queue_set_max_buffer_size(task, task->recv.ptr); +} + +static inline int free_queue_realloc_send_buffer( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_realloc_buffer(task, task->send.ptr, expect_size); +} + +static inline int free_queue_realloc_recv_buffer( + struct fast_task_info *task, const int expect_size) +{ + return free_queue_realloc_buffer(task, task->recv.ptr, expect_size); +} + +static inline int free_queue_realloc_send_max_buffer( + struct fast_task_info *task) +{ + return free_queue_realloc_max_buffer(task, task->send.ptr); +} + +static inline int free_queue_realloc_recv_max_buffer( + struct fast_task_info *task) +{ + return free_queue_realloc_max_buffer(task, task->recv.ptr); +} + #ifdef __cplusplus } #endif