add_to_deleted_list instead of cleanup directly

connection_manager
YuQing 2020-03-09 10:55:13 +08:00
parent c0df88aabf
commit 01d24d4838
2 changed files with 28 additions and 74 deletions

View File

@ -98,7 +98,7 @@ static inline int set_write_event(struct fast_task_info *task)
task->event.fd, IOEVENT_WRITE, task) != 0) task->event.fd, IOEVENT_WRITE, task) != 0)
{ {
result = errno != 0 ? errno : ENOENT; result = errno != 0 ? errno : ENOENT;
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, " "ioevent_modify fail, "
@ -122,7 +122,7 @@ static inline int set_read_event(struct fast_task_info *task)
task->event.fd, IOEVENT_READ, task) != 0) task->event.fd, IOEVENT_READ, task) != 0)
{ {
result = errno != 0 ? errno : ENOENT; result = errno != 0 ? errno : ENOENT;
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"ioevent_modify fail, " "ioevent_modify fail, "
@ -195,13 +195,12 @@ static int sf_nio_deal_task(struct fast_task_info *task)
} }
if (result < 0) { if (result < 0) {
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
} }
return result; return result;
} }
#if defined(OS_LINUX)
int sf_nio_notify(struct fast_task_info *task, const int stage) int sf_nio_notify(struct fast_task_info *task, const int stage)
{ {
int64_t n; int64_t n;
@ -262,60 +261,12 @@ void sf_recv_notify_read(int sock, short event, void *arg)
task = current; task = current;
current = current->next; current = current->next;
if (!task->canceled) {
sf_nio_deal_task(task); sf_nio_deal_task(task);
} }
} }
#else
int sf_nio_notify(struct fast_task_info *task, const int stage)
{
long task_addr;
int result;
task_addr = (long)task;
task->nio_stage = stage;
if (write(NOTIFY_WRITE_FD(task->thread_data), &task_addr,
sizeof(task_addr)) != sizeof(task_addr))
{
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, "
"write to pipe %d fail, errno: %d, error info: %s",
__LINE__, NOTIFY_WRITE_FD(task->thread_data),
result, STRERROR(result));
return result;
} }
return 0;
}
void sf_recv_notify_read(int sock, short event, void *arg)
{
int bytes;
long task_ptr;
struct fast_task_info *task;
while (1) {
if ((bytes=read(sock, &task_ptr, sizeof(task_ptr))) < 0) {
if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
logError("file: "__FILE__", line: %d, "
"call read failed, "
"errno: %d, error info: %s",
__LINE__, errno, strerror(errno));
}
break;
}
else if (bytes == 0) {
break;
}
task = (struct fast_task_info *)task_ptr;
sf_nio_deal_task(task);
}
}
#endif
int sf_send_add_event(struct fast_task_info *task) int sf_send_add_event(struct fast_task_info *task)
{ {
task->offset = 0; task->offset = 0;
@ -337,7 +288,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
struct fast_task_info *task; struct fast_task_info *task;
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if (task->nio_stage != SF_NIO_STAGE_RECV) { if (task->canceled || (task->nio_stage != SF_NIO_STAGE_RECV)) {
return 0; return 0;
} }
@ -346,7 +297,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
if (task->offset == 0 && task->req_count > 0) { if (task->offset == 0 && task->req_count > 0) {
if (SF_CTX->timeout_callback != NULL) { if (SF_CTX->timeout_callback != NULL) {
if (SF_CTX->timeout_callback(task) != 0) { if (SF_CTX->timeout_callback(task) != 0) {
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
} }
@ -370,7 +321,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
__LINE__, task->client_ip, task->req_count); __LINE__, task->client_ip, task->req_count);
} }
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -382,7 +333,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
"client ip: %s, recv error event: %d, " "client ip: %s, recv error event: %d, "
"close connection", __LINE__, task->client_ip, event); "close connection", __LINE__, task->client_ip, event);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -416,7 +367,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
__LINE__, task->client_ip, __LINE__, task->client_ip,
errno, strerror(errno)); errno, strerror(errno));
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
} }
@ -445,7 +396,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
__LINE__, task->client_ip, sock); __LINE__, task->client_ip, sock);
} }
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -457,7 +408,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
} }
if (SF_CTX->set_body_length(task) != 0) { if (SF_CTX->set_body_length(task) != 0) {
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
if (task->length < 0) { if (task->length < 0) {
@ -466,7 +417,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
__LINE__, task->client_ip, __LINE__, task->client_ip,
task->length); task->length);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -478,7 +429,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
task->client_ip, task->length, task->client_ip, task->length,
g_sf_global_vars.max_pkg_size); g_sf_global_vars.max_pkg_size);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -491,7 +442,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
"from %d to %d fail", __LINE__, "from %d to %d fail", __LINE__,
task->client_ip, task->size, task->length); task->client_ip, task->size, task->length);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -506,7 +457,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
task->req_count++; task->req_count++;
task->nio_stage = SF_NIO_STAGE_SEND; task->nio_stage = SF_NIO_STAGE_SEND;
if (SF_CTX->deal_task(task) < 0) { //fatal error if (SF_CTX->deal_task(task) < 0) { //fatal error
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
break; break;
@ -524,13 +475,17 @@ int sf_client_sock_write(int sock, short event, void *arg)
assert(sock >= 0); assert(sock >= 0);
task = (struct fast_task_info *)arg; task = (struct fast_task_info *)arg;
if (task->canceled) {
return 0;
}
if (event & IOEVENT_TIMEOUT) { if (event & IOEVENT_TIMEOUT) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout. total length: %d, offset: %d, " "client ip: %s, send timeout. total length: %d, offset: %d, "
"remain: %d", __LINE__, task->client_ip, task->length, "remain: %d", __LINE__, task->client_ip, task->length,
task->offset, task->length - task->offset); task->offset, task->length - task->offset);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -539,7 +494,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
"client ip: %s, recv error event: %d, " "client ip: %s, recv error event: %d, "
"close connection", __LINE__, task->client_ip, event); "close connection", __LINE__, task->client_ip, event);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -572,7 +527,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
__LINE__, task->client_ip, __LINE__, task->client_ip,
errno, strerror(errno)); errno, strerror(errno));
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
} }
@ -582,7 +537,7 @@ int sf_client_sock_write(int sock, short event, void *arg)
"connection disconnected", "connection disconnected",
__LINE__, task->client_ip, sock); __LINE__, task->client_ip, sock);
SF_CTX->task_cleanup_func(task); iovent_add_to_deleted_list(task);
return -1; return -1;
} }
@ -601,4 +556,3 @@ int sf_client_sock_write(int sock, short event, void *arg)
return total_write; return total_write;
} }

View File

@ -176,6 +176,9 @@ int sf_service_init_ex(SFContext *sf_context,
return result; return result;
} }
if ((result=init_pthread_lock(&thread_data->waiting_queue.lock)) != 0) {
return result;
}
#if defined(OS_LINUX) #if defined(OS_LINUX)
NOTIFY_READ_FD(thread_data) = eventfd(0, EFD_NONBLOCK); NOTIFY_READ_FD(thread_data) = eventfd(0, EFD_NONBLOCK);
if (NOTIFY_READ_FD(thread_data) < 0) { if (NOTIFY_READ_FD(thread_data) < 0) {
@ -187,10 +190,6 @@ int sf_service_init_ex(SFContext *sf_context,
break; break;
} }
NOTIFY_WRITE_FD(thread_data) = NOTIFY_READ_FD(thread_data); NOTIFY_WRITE_FD(thread_data) = NOTIFY_READ_FD(thread_data);
if ((result=init_pthread_lock(&thread_data->waiting_queue.lock)) != 0) {
return result;
}
#else #else
if (pipe(thread_data->pipe_fds) != 0) { if (pipe(thread_data->pipe_fds) != 0) {
result = errno != 0 ? errno : EPERM; result = errno != 0 ? errno : EPERM;
@ -355,6 +354,7 @@ static void *accept_thread_entrance(void *arg)
} }
strcpy(task->client_ip, szClientIp); strcpy(task->client_ip, szClientIp);
task->canceled = false;
task->ctx = accept_context->sf_context; task->ctx = accept_context->sf_context;
task->event.fd = incomesock; task->event.fd = incomesock;
task->thread_data = accept_context->sf_context->thread_data + task->thread_data = accept_context->sf_context->thread_data +