support connect for client mode

connection_manager
YuQing 2020-09-06 18:54:46 +08:00
parent 638c5b16a3
commit aaeb421e6d
4 changed files with 91 additions and 14 deletions

View File

@ -11,10 +11,11 @@
#define SF_DEF_MAX_BUFF_SIZE (64 * 1024)
#define SF_NIO_STAGE_INIT 0 //set ioevent
#define SF_NIO_STAGE_RECV 1 //recv
#define SF_NIO_STAGE_SEND 2 //send
#define SF_NIO_STAGE_FORWARDED 3 //deal the forwarded request
#define SF_NIO_STAGE_CONTINUE 4 //notify the thread continue deal
#define SF_NIO_STAGE_CONNECT 1 //do connect
#define SF_NIO_STAGE_RECV 2 //do recv
#define SF_NIO_STAGE_SEND 3 //do send
#define SF_NIO_STAGE_FORWARDED 4 //deal the forwarded request
#define SF_NIO_STAGE_CONTINUE 5 //notify the thread continue deal
#define SF_NIO_STAGE_CLOSE 9 //cleanup the task
#ifdef __cplusplus

View File

@ -135,17 +135,17 @@ int sf_set_read_event(struct fast_task_info *task)
return 0;
}
static int sf_ioevent_add(struct fast_task_info *task)
static inline int sf_ioevent_add(struct fast_task_info *task,
IOEventCallback callback, const int timeout)
{
int result;
result = ioevent_set(task, task->thread_data, task->event.fd,
IOEVENT_READ, (IOEventCallback)sf_client_sock_read,
g_sf_global_vars.network_timeout);
IOEVENT_READ, callback, timeout);
return result > 0 ? -1 * result : result;
}
static int sf_nio_init(struct fast_task_info *task)
static inline int sf_nio_init(struct fast_task_info *task)
{
int current_connections;
@ -155,7 +155,66 @@ static int sf_nio_init(struct fast_task_info *task)
g_sf_global_vars.connection_stat.max_count = current_connections;
}
return sf_ioevent_add(task);
return sf_ioevent_add(task, (IOEventCallback)sf_client_sock_read,
task->network_timeout);
}
static int sf_client_sock_connect(int sock, short event, void *arg)
{
int result;
socklen_t len;
struct fast_task_info *task;
task = (struct fast_task_info *)arg;
if (event & IOEVENT_TIMEOUT) {
result = ETIMEDOUT;
} else {
len = sizeof(result);
if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &result, &len) < 0) {
result = errno != 0 ? errno : EACCES;
}
}
if (result != 0) {
logError("file: "__FILE__", line: %d, "
"connect to server %s:%d fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip,
task->port, result, STRERROR(result));
iovent_add_to_deleted_list(task);
return -1;
}
task->nio_stage = SF_NIO_STAGE_RECV;
task->event.callback = (IOEventCallback)sf_client_sock_read;
return 0;
}
static int sf_connect_server(struct fast_task_info *task)
{
int result;
if ((task->event.fd=socketCreateEx2(AF_UNSPEC, task->server_ip,
O_NONBLOCK, NULL, &result)) < 0)
{
return result > 0 ? -1 * result : result;
}
result = asyncconnectserverbyip(task->event.fd,
task->server_ip, task->port);
if (result == 0) {
task->nio_stage = SF_NIO_STAGE_RECV;
return sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, task->network_timeout);
} else if (result == EINPROGRESS) {
return sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_connect, task->connect_timeout);
} else {
logError("file: "__FILE__", line: %d, "
"connect to server %s:%d fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip,
task->port, result, STRERROR(result));
return result > 0 ? -1 * result : result;
}
}
static int sf_nio_deal_task(struct fast_task_info *task)
@ -166,6 +225,9 @@ static int sf_nio_deal_task(struct fast_task_info *task)
task->nio_stage = SF_NIO_STAGE_RECV;
result = sf_nio_init(task);
break;
case SF_NIO_STAGE_CONNECT:
result = sf_connect_server(task);
break;
case SF_NIO_STAGE_RECV:
if ((result=sf_set_read_event(task)) == 0)
{
@ -180,7 +242,10 @@ static int sf_nio_deal_task(struct fast_task_info *task)
result = SF_CTX->deal_task(task);
break;
case SF_NIO_STAGE_FORWARDED: //forward by other thread
if ((result=sf_ioevent_add(task)) == 0) {
if ((result=sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read,
task->network_timeout)) == 0)
{
result = SF_CTX->deal_task(task);
}
break;
@ -304,7 +369,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
}
task->event.timer.expires = g_current_time +
g_sf_global_vars.network_timeout;
task->network_timeout;
fast_timer_add(&task->thread_data->timer,
&task->event.timer);
}
@ -342,7 +407,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
while (1) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
g_sf_global_vars.network_timeout);
task->network_timeout);
if (task->length == 0) { //recv header
recv_bytes = SF_CTX->header_size - task->offset;
}
@ -515,7 +580,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
while (1) {
fast_timer_modify(&task->thread_data->timer,
&task->event.timer, g_current_time +
g_sf_global_vars.network_timeout);
task->network_timeout);
bytes = write(sock, task->data + task->offset,
task->length - task->offset);

View File

@ -46,6 +46,14 @@ struct accept_thread_context {
int server_sock;
};
int sf_init_task(struct fast_task_info *task)
{
task->connect_timeout = SF_G_CONNECT_TIMEOUT; //for client side
task->network_timeout = SF_G_NETWORK_TIMEOUT;
return 0;
}
static void *worker_thread_entrance(void *arg);
static int sf_init_free_queues(const int task_arg_size,
@ -82,7 +90,8 @@ static int sf_init_free_queues(const int task_arg_size,
if ((result=free_queue_init_ex2(g_sf_global_vars.max_connections,
init_connections, alloc_conn_once, g_sf_global_vars.
min_buff_size, g_sf_global_vars.max_buff_size,
task_arg_size, init_callback)) != 0)
task_arg_size, init_callback != NULL ?
init_callback : sf_init_task)) != 0)
{
return result;
}

View File

@ -83,6 +83,8 @@ struct nio_thread_data *sf_get_random_thread_data_ex(SFContext *sf_context);
void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler);
int sf_init_task(struct fast_task_info *task);
#ifdef __cplusplus
}
#endif