log square quoted IPv6 address

use_iouring
YuQing 2024-03-05 18:07:34 +08:00
parent 1c796ab819
commit 231e2610e5
7 changed files with 98 additions and 46 deletions

View File

@ -213,6 +213,7 @@ int idempotency_client_channel_check_reconnect(
IdempotencyClientChannel *channel)
{
int result;
char formatted_ip[FORMATTED_IP_SIZE];
if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) {
return 0;
@ -223,10 +224,12 @@ int idempotency_client_channel_check_reconnect(
channel->last_connect_time = g_current_time;
}
logDebug("file: "__FILE__", line: %d, "
"trigger connect to server %s:%u",
__LINE__, channel->task->server_ip,
channel->task->port);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(channel->task->server_ip, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"trigger connect to server %s:%u", __LINE__,
formatted_ip, channel->task->port);
}
__sync_bool_compare_and_swap(&channel->task->canceled, 1, 0);
if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) {

View File

@ -86,10 +86,12 @@ static inline int idempotency_client_channel_check_wait_ex(
return 0;
} else {
/*
char formatted_ip[FORMATTED_IP_SIZE];
format_ip_address(channel->task->server_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, "
"channel_check fail, server %s:%u, in_ioevent: %d, "
"canceled: %d, req count: %"PRId64, __LINE__,
channel->task->server_ip, channel->task->port,
formatted_ip, channel->task->port,
__sync_add_and_fetch(&channel->in_ioevent, 0),
__sync_add_and_fetch(&channel->task->canceled, 0),
channel->task->req_count);

View File

@ -60,11 +60,13 @@ static int receipt_init_task(struct fast_task_info *task)
static int receipt_recv_timeout_callback(struct fast_task_info *task)
{
IdempotencyClientChannel *channel;
char formatted_ip[FORMATTED_IP_SIZE];
format_ip_address(task->server_ip, formatted_ip);
if (SF_NIO_TASK_STAGE_FETCH(task) == SF_NIO_STAGE_CONNECT) {
logError("file: "__FILE__", line: %d, "
"connect to server %s:%u timeout",
__LINE__, task->server_ip, task->port);
__LINE__, formatted_ip, task->port);
return ETIMEDOUT;
}
@ -72,12 +74,12 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task)
if (channel->waiting_resp_qinfo.head != NULL) {
logError("file: "__FILE__", line: %d, "
"waiting receipt response from server %s:%u timeout",
__LINE__, task->server_ip, task->port);
__LINE__, formatted_ip, task->port);
} else {
logError("file: "__FILE__", line: %d, "
"%s server %s:%u timeout, channel established: %d",
__LINE__, task->nio_stages.current == SF_NIO_STAGE_SEND ?
"send to" : "recv from", task->server_ip, task->port,
"send to" : "recv from", formatted_ip, task->port,
FC_ATOMIC_GET(channel->established));
}
@ -87,6 +89,7 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task)
static void receipt_task_finish_cleanup(struct fast_task_info *task)
{
IdempotencyClientChannel *channel;
char formatted_ip[FORMATTED_IP_SIZE];
if (task->event.fd >= 0) {
sf_task_detach_thread(task);
@ -102,9 +105,12 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task)
__sync_bool_compare_and_swap(&channel->established, 1, 0);
__sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0);
logDebug("file: "__FILE__", line: %d, "
"receipt task for server %s:%u exit",
__LINE__, task->server_ip, task->port);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(task->server_ip, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"receipt task for server %s:%u exit",
__LINE__, formatted_ip, task->port);
}
}
static void setup_channel_request(struct fast_task_info *task)
@ -245,11 +251,14 @@ static inline int receipt_expect_body_length(struct fast_task_info *task,
const int expect_body_len)
{
int body_len;
char formatted_ip[FORMATTED_IP_SIZE];
body_len = task->recv.ptr->length - sizeof(SFCommonProtoHeader);
if (body_len != expect_body_len) {
format_ip_address(task->server_ip, formatted_ip);
logError("file: "__FILE__", line: %d, "
"server %s:%u, response body length: %d != %d", __LINE__,
task->server_ip, task->port, body_len, expect_body_len);
formatted_ip, task->port, body_len, expect_body_len);
return EINVAL;
}
@ -262,6 +271,7 @@ static int deal_setup_channel_response(struct fast_task_info *task)
IdempotencyReceiptThreadContext *thread_ctx;
SFProtoSetupChannelResp *resp;
IdempotencyClientChannel *channel;
char formatted_ip[FORMATTED_IP_SIZE];
int channel_id;
int channel_key;
int buffer_size;
@ -274,10 +284,11 @@ static int deal_setup_channel_response(struct fast_task_info *task)
channel = (IdempotencyClientChannel *)task->arg;
if (__sync_add_and_fetch(&channel->established, 0)) {
format_ip_address(task->server_ip, formatted_ip);
logWarning("file: "__FILE__", line: %d, "
"response from server %s:%u, unexpected cmd: "
"SETUP_CHANNEL_RESP, ignore it!",
__LINE__, task->server_ip, task->port);
__LINE__, formatted_ip, task->port);
return 0;
}
@ -314,6 +325,7 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task)
IdempotencyClientChannel *channel;
IdempotencyClientReceipt *current;
IdempotencyClientReceipt *deleted;
char formatted_ip[FORMATTED_IP_SIZE];
if ((result=receipt_expect_body_length(task, 0)) != 0) {
return result;
@ -321,10 +333,11 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task)
channel = (IdempotencyClientChannel *)task->arg;
if (channel->waiting_resp_qinfo.head == NULL) {
format_ip_address(task->server_ip, formatted_ip);
logWarning("file: "__FILE__", line: %d, "
"response from server %s:%u, unexpect cmd: "
"REPORT_REQ_RECEIPT_RESP", __LINE__,
task->server_ip, task->port);
formatted_ip, task->port);
return EINVAL;
}
@ -346,6 +359,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
{
int result;
SFCommonProtoHeader *header;
char formatted_ip[FORMATTED_IP_SIZE];
do {
if (stage == SF_NIO_STAGE_HANDSHAKE) {
@ -373,10 +387,11 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
msg_len = SF_RECV_BODY_LENGTH(task);
message = SF_PROTO_RECV_BODY(task);
format_ip_address(task->server_ip, formatted_ip);
logError("file: "__FILE__", line: %d, "
"response from server %s:%u, cmd: %d (%s), "
"status: %d, error info: %.*s", __LINE__,
task->server_ip, task->port, header->cmd,
formatted_ip, task->port, header->cmd,
sf_get_cmd_caption(header->cmd),
result, msg_len, message);
break;
@ -395,14 +410,18 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage)
break;
case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP:
result = ECONNRESET; //force to close socket
logDebug("file: "__FILE__", line: %d, "
"close channel to server %s:%u !!!",
__LINE__, task->server_ip, task->port);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(task->server_ip, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"close channel to server %s:%u !!!",
__LINE__, formatted_ip, task->port);
}
break;
default:
format_ip_address(task->server_ip, formatted_ip);
logError("file: "__FILE__", line: %d, "
"response from server %s:%u, unexpect cmd: %d (%s)",
__LINE__, task->server_ip, task->port, header->cmd,
__LINE__, formatted_ip, task->port, header->cmd,
sf_get_cmd_caption(header->cmd));
result = EINVAL;
break;
@ -447,6 +466,7 @@ static void receipt_thread_close_idle_channel(
{
IdempotencyClientChannel *channel;
IdempotencyClientChannel *tmp;
char formatted_ip[FORMATTED_IP_SIZE];
fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) {
if (channel->task->pending_send_count > 0) {
@ -456,9 +476,12 @@ static void receipt_thread_close_idle_channel(
if (g_current_time - channel->last_report_time >
g_idempotency_client_cfg.channel_max_idle_time)
{
logDebug("file: "__FILE__", line: %d, "
"close channel to server %s:%u because idle too long",
__LINE__, channel->task->server_ip, channel->task->port);
if (FC_LOG_BY_LEVEL(LOG_DEBUG)) {
format_ip_address(channel->task->server_ip, formatted_ip);
logDebug("file: "__FILE__", line: %d, "
"close channel to server %s:%u because idle too long",
__LINE__, formatted_ip, channel->task->port);
}
close_channel_request(channel->task);
}
}

View File

@ -405,6 +405,7 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm,
ConnectionInfo *conn;
SFClientServerEntry leader;
SFNetRetryIntervalContext net_retry_ctx;
char formatted_ip[FORMATTED_IP_SIZE];
int i;
int connect_fails;
@ -467,10 +468,11 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm,
connect.times, ++i, *err_no);
}
format_ip_address(server->group_addrs[cm->server_group_index].
address_array.addrs[0]->conn.ip_addr, formatted_ip);
logWarning("file: "__FILE__", line: %d, "
"%s get_leader_connection fail, server id: %d, %s:%u, errno: %d",
__LINE__, cm->module_name, server->id, server->group_addrs
[cm->server_group_index]. address_array.addrs[0]->conn.ip_addr,
__LINE__, cm->module_name, server->id, formatted_ip,
server->group_addrs[cm->server_group_index].address_array.
addrs[0]->conn.port, *err_no);
return NULL;

View File

@ -84,7 +84,7 @@ extern SFContext g_sf_context;
#define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag
#define SF_G_CONNECT_TIMEOUT g_sf_global_vars.net_buffer_cfg.connect_timeout
#define SF_G_NETWORK_TIMEOUT g_sf_global_vars.net_buffer_cfg.network_timeout
#define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections
#define SF_G_MAX_CONNECTIONS g_sf_global_vars.net_buffer_cfg.max_connections
#define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size
#define SF_G_UP_TIME g_sf_global_vars.up_time

View File

@ -216,6 +216,7 @@ static int sf_client_connect_done(int sock, short event, void *arg)
{
int result;
struct fast_task_info *task;
char formatted_ip[FORMATTED_IP_SIZE];
task = (struct fast_task_info *)arg;
if (task->canceled) {
@ -237,9 +238,10 @@ static int sf_client_connect_done(int sock, short event, void *arg)
if (result != 0) {
if (SF_CTX->connect_need_log) {
format_ip_address(task->server_ip, formatted_ip);
logError("file: "__FILE__", line: %d, "
"connect to server %s:%u fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip,
"error info: %s", __LINE__, formatted_ip,
task->port, result, STRERROR(result));
}
ioevent_add_to_deleted_list(task);
@ -247,9 +249,10 @@ static int sf_client_connect_done(int sock, short event, void *arg)
}
if (SF_CTX->connect_need_log) {
format_ip_address(task->server_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, "
"connect to server %s:%u successfully",
__LINE__, task->server_ip, task->port);
__LINE__, formatted_ip, task->port);
}
return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE);
}
@ -270,6 +273,7 @@ int sf_socket_async_connect_server(struct fast_task_info *task)
static int sf_async_connect_server(struct fast_task_info *task)
{
int result;
char formatted_ip[FORMATTED_IP_SIZE];
if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) {
result = ioevent_set(task, task->thread_data, task->event.fd,
@ -291,17 +295,19 @@ static int sf_async_connect_server(struct fast_task_info *task)
}
if (SF_CTX->connect_need_log) {
format_ip_address(task->server_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, "
"connect to server %s:%u successfully",
__LINE__, task->server_ip, task->port);
__LINE__, formatted_ip, task->port);
}
return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE);
} else {
task->handler->close_connection(task);
if (SF_CTX->connect_need_log) {
format_ip_address(task->server_ip, formatted_ip);
logError("file: "__FILE__", line: %d, "
"connect to server %s:%u fail, errno: %d, "
"error info: %s", __LINE__, task->server_ip,
"error info: %s", __LINE__, formatted_ip,
task->port, result, STRERROR(result));
}
return result > 0 ? -1 * result : result;
@ -767,6 +773,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task,
static int calc_iops_and_trigger_polling(struct fast_task_info *task)
{
char formatted_ip[FORMATTED_IP_SIZE];
int time_distance;
int result = 0;
@ -793,9 +800,10 @@ static int calc_iops_and_trigger_polling(struct fast_task_info *task)
fc_list_add_tail(&task->polling.dlink,
&task->thread_data->polling_queue);
format_ip_address(task->client_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, client: %s:%u, "
"trigger polling iops: %"PRId64, __LINE__,
task->client_ip, task->port, (task->req_count -
formatted_ip, task->port, (task->req_count -
task->polling.last_req_count) / time_distance);
}
} else {
@ -813,6 +821,7 @@ static int calc_iops_and_trigger_polling(struct fast_task_info *task)
static int calc_iops_and_remove_polling(struct fast_task_info *task)
{
char formatted_ip[FORMATTED_IP_SIZE];
int time_distance;
int result = 0;
@ -835,9 +844,10 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task)
result = sf_ioevent_add(task, (IOEventCallback)
sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout);
format_ip_address(task->client_ip, formatted_ip);
logInfo("file: "__FILE__", line: %d, client: %s:%u, "
"remove polling iops: %"PRId64, __LINE__,
task->client_ip, task->port, (task->req_count -
formatted_ip, task->port, (task->req_count -
task->polling.last_req_count) / time_distance);
}
} else {

View File

@ -14,8 +14,6 @@
*/
#include <errno.h>
#include "fastcommon/shared_func.h"
#include "sf_util.h"
#include "sf_nio.h"
#include "sf_proto.h"
@ -29,14 +27,16 @@ static int64_t log_slower_than_us = 0;
int sf_proto_set_body_length(struct fast_task_info *task)
{
SFCommonProtoHeader *header;
char formatted_ip[FORMATTED_IP_SIZE];
header = (SFCommonProtoHeader *)task->recv.ptr->data;
if (!SF_PROTO_CHECK_MAGIC(header->magic)) {
format_ip_address(task->client_ip, formatted_ip);
logError("file: "__FILE__", line: %d, "
"%s peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT" is invalid, "
"expect: "SF_PROTO_MAGIC_FORMAT", cmd: %d, body length: %d",
__LINE__, (task->handler != NULL ? task->handler->fh->ctx->
name : ""), task->client_ip, task->port,
name : ""), formatted_ip, task->port,
SF_PROTO_MAGIC_PARAMS(header->magic),
SF_PROTO_MAGIC_EXPECT_PARAMS, header->cmd,
buff2int(header->body_len));
@ -456,14 +456,17 @@ const char *sf_get_cmd_caption(const int cmd)
int sf_proto_deal_ack(struct fast_task_info *task,
SFRequestInfo *request, SFResponseInfo *response)
{
char formatted_ip[FORMATTED_IP_SIZE];
if (request->header.status != 0) {
if (request->header.body_len > 0) {
int remain_size;
int len;
format_ip_address(task->client_ip, formatted_ip);
response->error.length = sprintf(response->error.message,
"message from peer %s:%u => ",
task->client_ip, task->port);
formatted_ip, task->port);
remain_size = sizeof(response->error.message) -
response->error.length;
if (request->header.body_len >= remain_size) {
@ -525,6 +528,7 @@ int sf_proto_get_group_servers(ConnectionInfo *conn,
char out_buff[sizeof(SFCommonProtoHeader) +
sizeof(SFProtoGetGroupServersReq)];
char in_buff[1024];
char formatted_ip[FORMATTED_IP_SIZE];
SFCommonProtoHeader *header;
SFProtoGetGroupServersReq *req;
SFProtoGetGroupServersRespBodyHeader *body_header;
@ -552,9 +556,10 @@ int sf_proto_get_group_servers(ConnectionInfo *conn,
}
if (body_len < sizeof(SFProtoGetGroupServersRespBodyHeader)) {
format_ip_address(conn->ip_addr, formatted_ip);
logError("file: "__FILE__", line: %d, "
"server %s:%d response body length: %d < %d",
__LINE__, conn->ip_addr, conn->port, body_len,
"server %s:%u response body length: %d < %d",
__LINE__, formatted_ip, conn->port, body_len,
(int)sizeof(SFProtoGetGroupServersRespBodyHeader));
return EINVAL;
}
@ -562,15 +567,17 @@ int sf_proto_get_group_servers(ConnectionInfo *conn,
body_header = (SFProtoGetGroupServersRespBodyHeader *)in_buff;
count = buff2int(body_header->count);
if (count <= 0) {
format_ip_address(conn->ip_addr, formatted_ip);
logError("file: "__FILE__", line: %d, "
"server %s:%d response server count: %d <= 0",
__LINE__, conn->ip_addr, conn->port, count);
"server %s:%u response server count: %d <= 0",
__LINE__, formatted_ip, conn->port, count);
return EINVAL;
}
if (count > sarray->alloc) {
format_ip_address(conn->ip_addr, formatted_ip);
logError("file: "__FILE__", line: %d, "
"server %s:%d response server count: %d is too large, "
"exceeds %d", __LINE__, conn->ip_addr, conn->port,
"server %s:%u response server count: %d is too large, "
"exceeds %d", __LINE__, formatted_ip, conn->port,
count, sarray->alloc);
return EOVERFLOW;
}
@ -630,6 +637,7 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
int r;
int64_t time_used;
int log_level;
char formatted_ip[FORMATTED_IP_SIZE];
char time_buff[32];
if (ctx->log_level != LOG_NOTHING && ctx->response.error.length > 0) {
@ -637,7 +645,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
"file: "__FILE__", line: %d, %s "
"peer %s:%u, cmd: %d (%s), req body length: %d, "
"resp status: %d, %s", __LINE__, service_name,
task->client_ip, task->port, ctx->request.header.cmd,
format_ip_address(task->client_ip, formatted_ip),
task->port, ctx->request.header.cmd,
GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.status,
ctx->response.error.message);
@ -650,7 +659,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, "
"%s client %s:%u, req cmd: %d (%s), req body_len: %d, "
"resp status: %d, time used: %s us", __LINE__, service_name,
task->client_ip, task->port, ctx->request.header.cmd,
format_ip_address(task->client_ip, formatted_ip),
task->port, ctx->request.header.cmd,
GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.status,
long_to_comma_str(time_used, time_buff));
@ -691,8 +701,9 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
blen = sprintf(buff, "timed used: %s us, %s client %s:%u, "
"req cmd: %d (%s), req body len: %d, resp cmd: %d (%s), "
"status: %d, resp body len: %d", long_to_comma_str(time_used,
time_buff), service_name, task->client_ip, task->port, ctx->
request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd),
time_buff), service_name, format_ip_address(task->
client_ip, formatted_ip), task->port, ctx->request.
header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.cmd,
GET_CMD_CAPTION(ctx->response.header.cmd),
ctx->response.header.status, ctx->response.header.body_len);
@ -705,7 +716,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task,
"%s client %s:%u, req cmd: %d (%s), req body_len: %d, "
"resp cmd: %d (%s), status: %d, resp body_len: %d, "
"time used: %s us", __LINE__, service_name,
task->client_ip, task->port, ctx->request.header.cmd,
format_ip_address(task->client_ip, formatted_ip),
task->port, ctx->request.header.cmd,
GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.cmd,
GET_CMD_CAPTION(ctx->response.header.cmd),