diff --git a/src/sf_global.c b/src/sf_global.c index f31bc3c..d69c770 100644 --- a/src/sf_global.c +++ b/src/sf_global.c @@ -47,8 +47,9 @@ SFGlobalVariables g_sf_global_vars = { SFContext g_sf_context = {{'\0'}, NULL, 0, {{true, fc_comm_type_sock}, {false, fc_comm_type_rdma}}, - 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, NULL, - NULL, NULL, NULL, NULL, sf_task_finish_clean_up, NULL + 1, DEFAULT_WORK_THREADS, {'\0'}, {'\0'}, 0, true, true, + {false, 0, 0}, NULL, NULL, NULL, NULL, NULL, + sf_task_finish_clean_up, NULL }; static inline void set_config_str_value(const char *value, diff --git a/src/sf_nio.c b/src/sf_nio.c index 6da8a9d..f9bf211 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -507,7 +507,8 @@ static inline int check_task(struct fast_task_info *task, return -1; } } else { - return EAGAIN; + //TODO: for streaming should return EAGAIN + return 0; } } @@ -713,6 +714,114 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action) return bytes; } +static int calc_iops_and_trigger_polling(struct fast_task_info *task) +{ + int time_distance; + int result = 0; + + time_distance = g_current_time - task->polling.last_calc_time; + if (time_distance > 0) { + if ((task->req_count - task->polling.last_req_count) / + time_distance >= SF_CTX->smart_polling.switch_on_iops) + { + task->polling.continuous_count++; + if (task->polling.continuous_count >= SF_CTX-> + smart_polling.switch_on_count) + { + task->polling.continuous_count = 0; + task->polling.in_queue = true; + result = ioevent_detach(&task->thread_data-> + ev_puller, task->event.fd); + fc_list_add_tail(&task->polling.dlink, + &task->thread_data->polling_queue); + } + } else { + if (task->polling.continuous_count > 0) { + task->polling.continuous_count = 0; + } + } + + logInfo("====== trigger_polling iops: %"PRId64, (task->req_count - + task->polling.last_req_count) / time_distance); + + task->polling.last_calc_time = g_current_time; + task->polling.last_req_count = task->req_count; + } + + return result; +} + +static int calc_iops_and_remove_polling(struct fast_task_info *task) +{ + int time_distance; + int result = 0; + + time_distance = g_current_time - task->polling.last_calc_time; + if (time_distance > 0) { + if ((task->req_count - task->polling.last_req_count) / + time_distance < SF_CTX->smart_polling.switch_on_iops) + { + task->polling.continuous_count++; + if (task->polling.continuous_count >= SF_CTX-> + smart_polling.switch_on_count) + { + task->polling.continuous_count = 0; + task->polling.in_queue = false; + fc_list_del_init(&task->polling.dlink); + result = sf_ioevent_add(task, (IOEventCallback) + sf_client_sock_read, task->network_timeout); + } + } else { + if (task->polling.continuous_count > 0) { + task->polling.continuous_count = 0; + } + } + + logInfo("@@@@@ remove_polling iops: %"PRId64, (task->req_count - + task->polling.last_req_count) / time_distance); + + task->polling.last_calc_time = g_current_time; + task->polling.last_req_count = task->req_count; + } + + return result; +} + +int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data) +{ + struct fast_task_info *task; + struct fast_task_info *tmp; + int bytes; + SFCommAction action; + + fc_list_for_each_entry_safe(task, tmp, &thread_data-> + polling_queue, polling.dlink) + { + if ((bytes=task->handler->recv_data(task, &action)) < 0) { + ioevent_add_to_deleted_list(task); + continue; + } + + if (action == sf_comm_action_finish) { + fast_timer_modify(&task->thread_data->timer, + &task->event.timer, g_current_time + + task->network_timeout); + + task->req_count++; + task->nio_stages.current = SF_NIO_STAGE_SEND; + if (SF_CTX->deal_task(task, SF_NIO_STAGE_SEND) < 0) { //fatal error + ioevent_add_to_deleted_list(task); + } + } else { + if (calc_iops_and_remove_polling(task) != 0) { + ioevent_add_to_deleted_list(task); + } + } + } + + return 0; +} + int sf_client_sock_read(int sock, short event, void *arg) { int result; @@ -779,6 +888,14 @@ int sf_client_sock_read(int sock, short event, void *arg) ioevent_add_to_deleted_list(task); return -1; } + + if (SF_CTX->smart_polling.enabled) { + if (calc_iops_and_trigger_polling(task) != 0) { + ioevent_add_to_deleted_list(task); + return -1; + } + } + break; } else if (action == sf_comm_action_break) { break; diff --git a/src/sf_nio.h b/src/sf_nio.h index 3ce0438..a64a406 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -125,6 +125,8 @@ int sf_socket_connect_server_done(struct fast_task_info *task); ssize_t sf_socket_send_data(struct fast_task_info *task, SFCommAction *action); ssize_t sf_socket_recv_data(struct fast_task_info *task, SFCommAction *action); +int sf_rdma_busy_polling_callback(struct nio_thread_data *thread_data); + static inline int sf_nio_forward_request(struct fast_task_info *task, const int new_thread_index) { diff --git a/src/sf_service.c b/src/sf_service.c index da91311..975c2c4 100644 --- a/src/sf_service.c +++ b/src/sf_service.c @@ -187,6 +187,14 @@ int sf_service_init_ex2(SFContext *sf_context, const char *name, for (thread_data=sf_context->thread_data,thread_ctx=thread_contexts; thread_datapolling_queue); + if (sf_context->smart_polling.enabled) { + thread_data->busy_polling_callback = + sf_rdma_busy_polling_callback; + } else { + thread_data->busy_polling_callback = NULL; + } + thread_data->thread_loop_callback = thread_loop_callback; if (alloc_thread_extra_data_callback != NULL) { thread_data->arg = alloc_thread_extra_data_callback( diff --git a/src/sf_service.h b/src/sf_service.h index 912a864..de3328b 100644 --- a/src/sf_service.h +++ b/src/sf_service.h @@ -76,6 +76,15 @@ void sf_service_set_thread_loop_callback_ex(SFContext *sf_context, #define sf_service_set_thread_loop_callback(thread_loop_callback) \ sf_service_set_thread_loop_callback_ex(&g_sf_context, thread_loop_callback) +static inline void sf_service_set_smart_polling_ex(SFContext *sf_context, + const FCSmartPollingConfig *smart_polling) +{ + sf_context->smart_polling = *smart_polling; +} + +#define sf_service_set_smart_polling(smart_polling) \ + sf_service_set_smart_polling_ex(&g_sf_context, smart_polling) + int sf_setup_signal_handler(); int sf_startup_schedule(pthread_t *schedule_tid); diff --git a/src/sf_types.h b/src/sf_types.h index 303941a..0253b3f 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -140,6 +140,7 @@ typedef struct sf_context { int header_size; bool remove_from_ready_list; bool realloc_task_buffer; + FCSmartPollingConfig smart_polling; sf_deal_task_func deal_task; sf_set_body_length_callback set_body_length; sf_alloc_recv_buffer_callback alloc_recv_buffer;