task use reffer_count for share
parent
946bd8fe46
commit
719f8b2b32
|
|
@ -176,21 +176,13 @@ struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel,
|
|||
{
|
||||
struct fast_task_info *task;
|
||||
|
||||
task = free_queue_pop();
|
||||
if (task == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"malloc task buff failed, you should "
|
||||
"increase the parameter: max_connections",
|
||||
__LINE__);
|
||||
if ((task=sf_alloc_init_task(&g_sf_context, -1)) == NULL) {
|
||||
*err_no = ENOMEM;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
snprintf(task->server_ip, sizeof(task->server_ip), "%s", server_ip);
|
||||
task->port = port;
|
||||
task->canceled = false;
|
||||
task->ctx = &g_sf_context;
|
||||
task->event.fd = -1;
|
||||
task->arg = channel;
|
||||
task->thread_data = g_sf_context.thread_data +
|
||||
hash_code % g_sf_context.work_threads;
|
||||
|
|
@ -198,7 +190,7 @@ struct fast_task_info *alloc_channel_task(IdempotencyClientChannel *channel,
|
|||
channel->last_connect_time = g_current_time;
|
||||
if ((*err_no=sf_nio_notify(task, SF_NIO_STAGE_CONNECT)) != 0) {
|
||||
channel->in_ioevent = 0; //rollback
|
||||
free_queue_push(task);
|
||||
sf_release_task(task);
|
||||
return NULL;
|
||||
}
|
||||
return task;
|
||||
|
|
@ -223,7 +215,7 @@ int idempotency_client_channel_check_reconnect(
|
|||
__LINE__, channel->task->server_ip,
|
||||
channel->task->port);
|
||||
|
||||
channel->task->canceled = false;
|
||||
__sync_bool_compare_and_swap(&channel->task->canceled, 1, 0);
|
||||
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {
|
||||
channel->last_connect_time = g_current_time;
|
||||
channel->last_report_time = g_current_time;
|
||||
|
|
|
|||
75
src/sf_nio.c
75
src/sf_nio.c
|
|
@ -37,6 +37,7 @@
|
|||
#include "fastcommon/fast_task_queue.h"
|
||||
#include "fastcommon/ioevent_loop.h"
|
||||
#include "sf_global.h"
|
||||
#include "sf_service.h"
|
||||
#include "sf_nio.h"
|
||||
|
||||
#define SF_CTX ((SFContext *)(task->ctx))
|
||||
|
|
@ -97,7 +98,7 @@ void sf_task_finish_clean_up(struct fast_task_info *task)
|
|||
task->event.fd = -1;
|
||||
|
||||
__sync_fetch_and_sub(&g_sf_global_vars.connection_stat.current_count, 1);
|
||||
free_queue_push(task);
|
||||
sf_release_task(task);
|
||||
}
|
||||
|
||||
static inline int set_write_event(struct fast_task_info *task)
|
||||
|
|
@ -305,10 +306,25 @@ int sf_nio_notify(struct fast_task_info *task, const int stage)
|
|||
int result;
|
||||
bool notify;
|
||||
|
||||
if (__sync_add_and_fetch(&task->canceled, 0)) {
|
||||
if (stage == SF_NIO_STAGE_CONTINUE) {
|
||||
if (task->continue_callback != NULL) {
|
||||
return task->continue_callback(task);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
"unexpected notify stage: %d, task %p "
|
||||
"already canceled", __LINE__, stage, task);
|
||||
return ECANCELED;
|
||||
}
|
||||
}
|
||||
|
||||
if (!__sync_bool_compare_and_swap(&task->nio_stages.notify,
|
||||
SF_NIO_STAGE_NONE, stage))
|
||||
{
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
logWarning("file: "__FILE__", line: %d, "
|
||||
"current stage: %d != %d, skip set stage to %d",
|
||||
__LINE__, __sync_fetch_and_sub(&task->nio_stages.notify, 0),
|
||||
SF_NIO_STAGE_NONE, stage);
|
||||
|
|
@ -374,6 +390,11 @@ void sf_recv_notify_read(int sock, short event, void *arg)
|
|||
} else {
|
||||
stage = __sync_add_and_fetch(&task->nio_stages.notify, 0);
|
||||
if (stage != SF_NIO_STAGE_NONE) {
|
||||
if (stage == SF_NIO_STAGE_CONTINUE &&
|
||||
task->continue_callback != NULL)
|
||||
{
|
||||
task->continue_callback(task);
|
||||
}
|
||||
__sync_bool_compare_and_swap(&task->nio_stages.notify,
|
||||
stage, SF_NIO_STAGE_NONE);
|
||||
}
|
||||
|
|
@ -395,19 +416,39 @@ int sf_send_add_event(struct fast_task_info *task)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static inline int check_task(struct fast_task_info *task,
|
||||
const short event, const int expect_stage)
|
||||
{
|
||||
if (task->canceled) {
|
||||
return ENOTCONN;
|
||||
}
|
||||
|
||||
if (event & IOEVENT_ERROR) {
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, expect stage: %d, recv error event: %d, "
|
||||
"close connection", __LINE__, task->client_ip,
|
||||
expect_stage, event);
|
||||
|
||||
ioevent_add_to_deleted_list(task);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return task->nio_stages.current == expect_stage ? 0 : EAGAIN;
|
||||
}
|
||||
|
||||
int sf_client_sock_read(int sock, short event, void *arg)
|
||||
{
|
||||
int result;
|
||||
int bytes;
|
||||
int recv_bytes;
|
||||
int total_read;
|
||||
struct fast_task_info *task;
|
||||
|
||||
task = (struct fast_task_info *)arg;
|
||||
if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_RECV)) {
|
||||
return 0;
|
||||
if ((result=check_task(task, event, SF_NIO_STAGE_RECV)) != 0) {
|
||||
return result >= 0 ? 0 : -1;
|
||||
}
|
||||
|
||||
//assert(sock >= 0);
|
||||
if (event & IOEVENT_TIMEOUT) {
|
||||
if (task->offset == 0 && task->req_count > 0) {
|
||||
if (SF_CTX->timeout_callback != NULL) {
|
||||
|
|
@ -441,15 +482,6 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (event & IOEVENT_ERROR) {
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, recv error event: %d, "
|
||||
"close connection", __LINE__, task->client_ip, event);
|
||||
|
||||
ioevent_add_to_deleted_list(task);
|
||||
return -1;
|
||||
}
|
||||
|
||||
total_read = 0;
|
||||
while (1) {
|
||||
fast_timer_modify(&task->thread_data->timer,
|
||||
|
|
@ -588,14 +620,14 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
|||
|
||||
int sf_client_sock_write(int sock, short event, void *arg)
|
||||
{
|
||||
int result;
|
||||
int bytes;
|
||||
int total_write;
|
||||
struct fast_task_info *task;
|
||||
|
||||
//assert(sock >= 0);
|
||||
task = (struct fast_task_info *)arg;
|
||||
if (task->canceled || (task->nio_stages.current != SF_NIO_STAGE_SEND)) {
|
||||
return 0;
|
||||
if ((result=check_task(task, event, SF_NIO_STAGE_SEND)) != 0) {
|
||||
return result >= 0 ? 0 : -1;
|
||||
}
|
||||
|
||||
if (event & IOEVENT_TIMEOUT) {
|
||||
|
|
@ -608,15 +640,6 @@ int sf_client_sock_write(int sock, short event, void *arg)
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (event & IOEVENT_ERROR) {
|
||||
logDebug("file: "__FILE__", line: %d, "
|
||||
"client ip: %s, recv error event: %d, "
|
||||
"close connection", __LINE__, task->client_ip, event);
|
||||
|
||||
ioevent_add_to_deleted_list(task);
|
||||
return -1;
|
||||
}
|
||||
|
||||
total_write = 0;
|
||||
while (1) {
|
||||
fast_timer_modify(&task->thread_data->timer,
|
||||
|
|
|
|||
|
|
@ -380,22 +380,16 @@ static void *accept_thread_entrance(void *arg)
|
|||
continue;
|
||||
}
|
||||
|
||||
task = free_queue_pop();
|
||||
if (task == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"malloc task buff failed, you should "
|
||||
"increase the parameter: max_connections",
|
||||
__LINE__);
|
||||
if ((task=sf_alloc_init_task(accept_context->
|
||||
sf_context, incomesock)) == NULL)
|
||||
{
|
||||
close(incomesock);
|
||||
continue;
|
||||
}
|
||||
|
||||
getPeerIpAddPort(incomesock, task->client_ip,
|
||||
sizeof(task->client_ip), &port);
|
||||
task->port = port;
|
||||
|
||||
task->canceled = false;
|
||||
task->ctx = accept_context->sf_context;
|
||||
task->event.fd = incomesock;
|
||||
task->thread_data = accept_context->sf_context->thread_data +
|
||||
incomesock % accept_context->sf_context->work_threads;
|
||||
if (accept_context->sf_context->accept_done_func != NULL) {
|
||||
|
|
@ -406,7 +400,7 @@ static void *accept_thread_entrance(void *arg)
|
|||
|
||||
if (sf_nio_notify(task, SF_NIO_STAGE_INIT) != 0) {
|
||||
close(incomesock);
|
||||
free_queue_push(task);
|
||||
sf_release_task(task);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -100,6 +100,63 @@ void sf_set_sig_quit_handler(sf_sig_quit_handler quit_handler);
|
|||
|
||||
int sf_init_task(struct fast_task_info *task);
|
||||
|
||||
#define sf_alloc_init_task(sf_context, sock) \
|
||||
sf_alloc_init_task_ex(sf_context, sock, 1)
|
||||
|
||||
static inline struct fast_task_info *sf_alloc_init_task_ex(
|
||||
SFContext *sf_context, const int sock, const int init_reffer)
|
||||
{
|
||||
struct fast_task_info *task;
|
||||
|
||||
task = free_queue_pop();
|
||||
if (task == NULL) {
|
||||
logError("file: "__FILE__", line: %d, "
|
||||
"malloc task buff failed, you should "
|
||||
"increase the parameter: max_connections",
|
||||
__LINE__);
|
||||
return NULL;
|
||||
}
|
||||
__sync_add_and_fetch(&task->reffer_count, init_reffer);
|
||||
__sync_bool_compare_and_swap(&task->canceled, 1, 0);
|
||||
task->ctx = sf_context;
|
||||
task->event.fd = sock;
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
#define sf_hold_task(task) __sync_add_and_fetch(&task->reffer_count, 1)
|
||||
|
||||
/*
|
||||
#define sf_hold_task(task) \
|
||||
logInfo("file: "__FILE__", line: %d, " \
|
||||
"hold task %p, reffer: %d", \
|
||||
__LINE__, task, __sync_add_and_fetch(&task->reffer_count, 1))
|
||||
*/
|
||||
|
||||
#define sf_try_hold_task_to_twice(task) \
|
||||
__sync_bool_compare_and_swap(&task->reffer_count, 1, 2)
|
||||
|
||||
static inline void sf_release_task(struct fast_task_info *task)
|
||||
{
|
||||
int reffer_count;
|
||||
if ((reffer_count=__sync_sub_and_fetch(&task->reffer_count, 1)) == 0) {
|
||||
int free_count = free_queue_count();
|
||||
int alloc_count = free_queue_alloc_connections();
|
||||
logInfo("file: "__FILE__", line: %d, "
|
||||
"push task %p to queue, alloc: %d, "
|
||||
"used: %d, freed: %d", __LINE__, task,
|
||||
alloc_count, alloc_count - free_count, free_count);
|
||||
|
||||
free_queue_push(task);
|
||||
} else {
|
||||
/*
|
||||
logInfo("file: "__FILE__", line: %d, "
|
||||
"release task %p, current reffer: %d",
|
||||
__LINE__, task, reffer_count);
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
Loading…
Reference in New Issue