nio threads support busy_polling_callback

support_rdma
YuQing 2023-09-18 16:19:10 +08:00
parent 9731e736df
commit 9fad04f3f9
6 changed files with 141 additions and 3 deletions

View File

@ -47,8 +47,9 @@ SFGlobalVariables g_sf_global_vars = {
SFContext g_sf_context = {{'\0'}, NULL, 0, SFContext g_sf_context = {{'\0'}, NULL, 0,
{{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}},
1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, NULL, 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true,
NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL {false, 0, 0}, NULL, NULL, NULL, NULL, NULL,
sf_task_finish_clean_up, NULL
}; };
static inline void set_config_str_value(const char *value, static inline void set_config_str_value(const char *value,

View File

@ -507,7 +507,8 @@ static inline int check_task(struct fast_task_info *task,
return -1; return -1;
} }
} else { } else {
return EAGAIN; //TODO: for streaming should return EAGAIN
return 0;
} }
} }
@ -713,6 +714,114 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action)
return bytes; return bytes;
} }
static int calc_iops_and_trigger_polling(struct fast_task_info *task)
{
int time_distance;
int result = 0;
time_distance = g_current_time - task->polling.last_calc_time;
if (time_distance > 0) {
if ((task->req_count - task->polling.last_req_count) /
time_distance >= SF_CTX->smart_polling.switch_on_iops)
{
task->polling.continuous_count++;
if (task->polling.continuous_count >= SF_CTX->
smart_polling.switch_on_count)
{
task->polling.continuous_count = 0;
task->polling.in_queue = true;
result = ioevent_detach(&task->thread_data->
ev_puller, task->event.fd);
fc_list_add_tail(&task->polling.dlink,
&task->thread_data->polling_queue);
}
} else {
if (task->polling.continuous_count > 0) {
task->polling.continuous_count = 0;
}
}
logInfo("====== trigger_polling iops: %"PRId64, (task->req_count -
task->polling.last_req_count) / time_distance);
task->polling.last_calc_time = g_current_time;
task->polling.last_req_count = task->req_count;
}
return result;
}
static int calc_iops_and_remove_polling(struct fast_task_info *task)
{
int time_distance;
int result = 0;
time_distance = g_current_time - task->polling.last_calc_time;
if (time_distance > 0) {
if ((task->req_count - task->polling.last_req_count) /
time_distance < SF_CTX->smart_polling.switch_on_iops)
{
task->polling.continuous_count++;
if (task->polling.continuous_count >= SF_CTX->
smart_polling.switch_on_count)
{
task->polling.continuous_count = 0;
task->polling.in_queue = false;
fc_list_del_init(&task->polling.dlink);
result = sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, task->network_timeout);
}
} else {
if (task->polling.continuous_count > 0) {
task->polling.continuous_count = 0;
}
}
logInfo("@@@@@ remove_polling iops: %"PRId64, (task->req_count -
task->polling.last_req_count) / time_distance);
task->polling.last_calc_time = g_current_time;
task->polling.last_req_count = task->req_count;
}
return result;
}
int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data)
{
struct fast_task_info *task;
struct fast_task_info *tmp;
int bytes;
SFCommAction action;
fc_list_for_each_entry_safe(task, tmp, &thread_data->
polling_queue, polling.dlink)
{
if ((bytes=task->handler->recv_data(task, &action)) < 0) {
ioevent_add_to_deleted_list(task);
continue;
}
if (action == sf_comm_action_finish) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
task->network_timeout);
task->req_count++;
task->nio_stages.current = SF_NIO_STAGE_SEND;
if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error
ioevent_add_to_deleted_list(task);
}
} else {
if (calc_iops_and_remove_polling(task) != 0) {
ioevent_add_to_deleted_list(task);
}
}
}
return 0;
}
int sf_client_sock_read(int sock, short event, void *arg) int sf_client_sock_read(int sock, short event, void *arg)
{ {
int result; int result;
@ -779,6 +888,14 @@ int sf_client_sock_read(int sock, short event, void *arg)
ioevent_add_to_deleted_list(task); ioevent_add_to_deleted_list(task);
return -1; return -1;
} }
if (SF_CTX->smart_polling.enabled) {
if (calc_iops_and_trigger_polling(task) != 0) {
ioevent_add_to_deleted_list(task);
return -1;
}
}
break; break;
} else if (action == sf_comm_action_break) { } else if (action == sf_comm_action_break) {
break; break;

View File

@ -125,6 +125,8 @@ int sf_socket_connect_server_done(struct fast_task_info *task);
ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action);
ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action);
int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data);
static inline int sf_nio_forward_request(struct fast_task_info *task, static inline int sf_nio_forward_request(struct fast_task_info *task,
const int new_thread_index) const int new_thread_index)
{ {

View File

@ -187,6 +187,14 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name,
for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts; for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts;
thread_data<data_end; thread_data++,thread_ctx++) thread_data<data_end; thread_data++,thread_ctx++)
{ {
FC_INIT_LIST_HEAD(&thread_data->polling_queue);
if (sf_context->smart_polling.enabled) {
thread_data->busy_polling_callback =
sf_rdma_busy_polling_callback;
} else {
thread_data->busy_polling_callback = NULL;
}
thread_data->thread_loop_callback = thread_loop_callback; thread_data->thread_loop_callback = thread_loop_callback;
if (alloc_thread_extra_data_callback != NULL) { if (alloc_thread_extra_data_callback != NULL) {
thread_data->arg = alloc_thread_extra_data_callback( thread_data->arg = alloc_thread_extra_data_callback(

View File

@ -76,6 +76,15 @@ void sf_service_set_thread_loop_callback_ex(SFContext *sf_context,
#define sf_service_set_thread_loop_callback(thread_loop_callback) \ #define sf_service_set_thread_loop_callback(thread_loop_callback) \
sf_service_set_thread_loop_callback_ex(&g_sf_context, thread_loop_callback) sf_service_set_thread_loop_callback_ex(&g_sf_context, thread_loop_callback)
static inline void sf_service_set_smart_polling_ex(SFContext *sf_context,
const FCSmartPollingConfig *smart_polling)
{
sf_context->smart_polling = *smart_polling;
}
#define sf_service_set_smart_polling(smart_polling) \
sf_service_set_smart_polling_ex(&g_sf_context, smart_polling)
int sf_setup_signal_handler(); int sf_setup_signal_handler();
int sf_startup_schedule(pthread_t *schedule_tid); int sf_startup_schedule(pthread_t *schedule_tid);

View File

@ -140,6 +140,7 @@ typedef struct sf_context {
int header_size; int header_size;
bool remove_from_ready_list; bool remove_from_ready_list;
bool realloc_task_buffer; bool realloc_task_buffer;
FCSmartPollingConfig smart_polling;
sf_deal_task_func deal_task; sf_deal_task_func deal_task;
sf_set_body_length_callback set_body_length; sf_set_body_length_callback set_body_length;
sf_alloc_recv_buffer_callback alloc_recv_buffer; sf_alloc_recv_buffer_callback alloc_recv_buffer;