diff --git a/src/idempotency/client/client_channel.c b/src/idempotency/client/client_channel.c index 6170d50..f2a3109 100644 --- a/src/idempotency/client/client_channel.c +++ b/src/idempotency/client/client_channel.c @@ -213,6 +213,7 @@ int idempotency_client_channel_check_reconnect( IdempotencyClientChannel *channel) { int result; + char formatted_ip[FORMATTED_IP_SIZE]; if (!__sync_bool_compare_and_swap(&channel->in_ioevent, 0, 1)) { return 0; @@ -223,10 +224,12 @@ int idempotency_client_channel_check_reconnect( channel->last_connect_time = g_current_time; } - logDebug("file: "__FILE__", line: %d, " - "trigger connect to server %s:%u", - __LINE__, channel->task->server_ip, - channel->task->port); + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { + format_ip_address(channel->task->server_ip, formatted_ip); + logDebug("file: "__FILE__", line: %d, " + "trigger connect to server %s:%u", __LINE__, + formatted_ip, channel->task->port); + } __sync_bool_compare_and_swap(&channel->task->canceled, 1, 0); if ((result=sf_nio_notify(channel->task, SF_NIO_STAGE_CONNECT)) == 0) { diff --git a/src/idempotency/client/client_channel.h b/src/idempotency/client/client_channel.h index ba31309..8fa82be 100644 --- a/src/idempotency/client/client_channel.h +++ b/src/idempotency/client/client_channel.h @@ -86,10 +86,12 @@ static inline int idempotency_client_channel_check_wait_ex( return 0; } else { /* + char formatted_ip[FORMATTED_IP_SIZE]; + format_ip_address(channel->task->server_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, " "channel_check fail, server %s:%u, in_ioevent: %d, " "canceled: %d, req count: %"PRId64, __LINE__, - channel->task->server_ip, channel->task->port, + formatted_ip, channel->task->port, __sync_add_and_fetch(&channel->in_ioevent, 0), __sync_add_and_fetch(&channel->task->canceled, 0), channel->task->req_count); diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 2fb7c55..b013cd2 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -60,11 +60,13 @@ static int receipt_init_task(struct fast_task_info *task) static int receipt_recv_timeout_callback(struct fast_task_info *task) { IdempotencyClientChannel *channel; + char formatted_ip[FORMATTED_IP_SIZE]; + format_ip_address(task->server_ip, formatted_ip); if (SF_NIO_TASK_STAGE_FETCH(task) == SF_NIO_STAGE_CONNECT) { logError("file: "__FILE__", line: %d, " "connect to server %s:%u timeout", - __LINE__, task->server_ip, task->port); + __LINE__, formatted_ip, task->port); return ETIMEDOUT; } @@ -72,12 +74,12 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task) if (channel->waiting_resp_qinfo.head != NULL) { logError("file: "__FILE__", line: %d, " "waiting receipt response from server %s:%u timeout", - __LINE__, task->server_ip, task->port); + __LINE__, formatted_ip, task->port); } else { logError("file: "__FILE__", line: %d, " "%s server %s:%u timeout, channel established: %d", __LINE__, task->nio_stages.current == SF_NIO_STAGE_SEND ? - "send to" : "recv from", task->server_ip, task->port, + "send to" : "recv from", formatted_ip, task->port, FC_ATOMIC_GET(channel->established)); } @@ -87,6 +89,7 @@ static int receipt_recv_timeout_callback(struct fast_task_info *task) static void receipt_task_finish_cleanup(struct fast_task_info *task) { IdempotencyClientChannel *channel; + char formatted_ip[FORMATTED_IP_SIZE]; if (task->event.fd >= 0) { sf_task_detach_thread(task); @@ -102,9 +105,12 @@ static void receipt_task_finish_cleanup(struct fast_task_info *task) __sync_bool_compare_and_swap(&channel->established, 1, 0); __sync_bool_compare_and_swap(&channel->in_ioevent, 1, 0); - logDebug("file: "__FILE__", line: %d, " - "receipt task for server %s:%u exit", - __LINE__, task->server_ip, task->port); + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { + format_ip_address(task->server_ip, formatted_ip); + logDebug("file: "__FILE__", line: %d, " + "receipt task for server %s:%u exit", + __LINE__, formatted_ip, task->port); + } } static void setup_channel_request(struct fast_task_info *task) @@ -245,11 +251,14 @@ static inline int receipt_expect_body_length(struct fast_task_info *task, const int expect_body_len) { int body_len; + char formatted_ip[FORMATTED_IP_SIZE]; + body_len = task->recv.ptr->length - sizeof(SFCommonProtoHeader); if (body_len != expect_body_len) { + format_ip_address(task->server_ip, formatted_ip); logError("file: "__FILE__", line: %d, " "server %s:%u, response body length: %d != %d", __LINE__, - task->server_ip, task->port, body_len, expect_body_len); + formatted_ip, task->port, body_len, expect_body_len); return EINVAL; } @@ -262,6 +271,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) IdempotencyReceiptThreadContext *thread_ctx; SFProtoSetupChannelResp *resp; IdempotencyClientChannel *channel; + char formatted_ip[FORMATTED_IP_SIZE]; int channel_id; int channel_key; int buffer_size; @@ -274,10 +284,11 @@ static int deal_setup_channel_response(struct fast_task_info *task) channel = (IdempotencyClientChannel *)task->arg; if (__sync_add_and_fetch(&channel->established, 0)) { + format_ip_address(task->server_ip, formatted_ip); logWarning("file: "__FILE__", line: %d, " "response from server %s:%u, unexpected cmd: " "SETUP_CHANNEL_RESP, ignore it!", - __LINE__, task->server_ip, task->port); + __LINE__, formatted_ip, task->port); return 0; } @@ -314,6 +325,7 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) IdempotencyClientChannel *channel; IdempotencyClientReceipt *current; IdempotencyClientReceipt *deleted; + char formatted_ip[FORMATTED_IP_SIZE]; if ((result=receipt_expect_body_length(task, 0)) != 0) { return result; @@ -321,10 +333,11 @@ static inline int deal_report_req_receipt_response(struct fast_task_info *task) channel = (IdempotencyClientChannel *)task->arg; if (channel->waiting_resp_qinfo.head == NULL) { + format_ip_address(task->server_ip, formatted_ip); logWarning("file: "__FILE__", line: %d, " "response from server %s:%u, unexpect cmd: " "REPORT_REQ_RECEIPT_RESP", __LINE__, - task->server_ip, task->port); + formatted_ip, task->port); return EINVAL; } @@ -346,6 +359,7 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) { int result; SFCommonProtoHeader *header; + char formatted_ip[FORMATTED_IP_SIZE]; do { if (stage == SF_NIO_STAGE_HANDSHAKE) { @@ -373,10 +387,11 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) msg_len = SF_RECV_BODY_LENGTH(task); message = SF_PROTO_RECV_BODY(task); + format_ip_address(task->server_ip, formatted_ip); logError("file: "__FILE__", line: %d, " "response from server %s:%u, cmd: %d (%s), " "status: %d, error info: %.*s", __LINE__, - task->server_ip, task->port, header->cmd, + formatted_ip, task->port, header->cmd, sf_get_cmd_caption(header->cmd), result, msg_len, message); break; @@ -395,14 +410,18 @@ static int receipt_deal_task(struct fast_task_info *task, const int stage) break; case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: result = ECONNRESET; //force to close socket - logDebug("file: "__FILE__", line: %d, " - "close channel to server %s:%u !!!", - __LINE__, task->server_ip, task->port); + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { + format_ip_address(task->server_ip, formatted_ip); + logDebug("file: "__FILE__", line: %d, " + "close channel to server %s:%u !!!", + __LINE__, formatted_ip, task->port); + } break; default: + format_ip_address(task->server_ip, formatted_ip); logError("file: "__FILE__", line: %d, " "response from server %s:%u, unexpect cmd: %d (%s)", - __LINE__, task->server_ip, task->port, header->cmd, + __LINE__, formatted_ip, task->port, header->cmd, sf_get_cmd_caption(header->cmd)); result = EINVAL; break; @@ -447,6 +466,7 @@ static void receipt_thread_close_idle_channel( { IdempotencyClientChannel *channel; IdempotencyClientChannel *tmp; + char formatted_ip[FORMATTED_IP_SIZE]; fc_list_for_each_entry_safe(channel, tmp, &thread_ctx->head, dlink) { if (channel->task->pending_send_count > 0) { @@ -456,9 +476,12 @@ static void receipt_thread_close_idle_channel( if (g_current_time - channel->last_report_time > g_idempotency_client_cfg.channel_max_idle_time) { - logDebug("file: "__FILE__", line: %d, " - "close channel to server %s:%u because idle too long", - __LINE__, channel->task->server_ip, channel->task->port); + if (FC_LOG_BY_LEVEL(LOG_DEBUG)) { + format_ip_address(channel->task->server_ip, formatted_ip); + logDebug("file: "__FILE__", line: %d, " + "close channel to server %s:%u because idle too long", + __LINE__, formatted_ip, channel->task->port); + } close_channel_request(channel->task); } } diff --git a/src/sf_connection_manager.c b/src/sf_connection_manager.c index b275cfe..7981d8d 100644 --- a/src/sf_connection_manager.c +++ b/src/sf_connection_manager.c @@ -405,6 +405,7 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm, ConnectionInfo *conn; SFClientServerEntry leader; SFNetRetryIntervalContext net_retry_ctx; + char formatted_ip[FORMATTED_IP_SIZE]; int i; int connect_fails; @@ -467,10 +468,11 @@ static ConnectionInfo *get_leader_connection(SFConnectionManager *cm, connect.times, ++i, *err_no); } + format_ip_address(server->group_addrs[cm->server_group_index]. + address_array.addrs[0]->conn.ip_addr, formatted_ip); logWarning("file: "__FILE__", line: %d, " "%s get_leader_connection fail, server id: %d, %s:%u, errno: %d", - __LINE__, cm->module_name, server->id, server->group_addrs - [cm->server_group_index]. address_array.addrs[0]->conn.ip_addr, + __LINE__, cm->module_name, server->id, formatted_ip, server->group_addrs[cm->server_group_index].address_array. addrs[0]->conn.port, *err_no); return NULL; diff --git a/src/sf_global.h b/src/sf_global.h index 680aaed..08db74a 100644 --- a/src/sf_global.h +++ b/src/sf_global.h @@ -84,7 +84,7 @@ extern SFContext g_sf_context; #define SF_G_CONTINUE_FLAG g_sf_global_vars.continue_flag #define SF_G_CONNECT_TIMEOUT g_sf_global_vars.net_buffer_cfg.connect_timeout #define SF_G_NETWORK_TIMEOUT g_sf_global_vars.net_buffer_cfg.network_timeout -#define SF_G_MAX_CONNECTIONS g_sf_global_vars.max_connections +#define SF_G_MAX_CONNECTIONS g_sf_global_vars.net_buffer_cfg.max_connections #define SF_G_THREAD_STACK_SIZE g_sf_global_vars.thread_stack_size #define SF_G_UP_TIME g_sf_global_vars.up_time diff --git a/src/sf_nio.c b/src/sf_nio.c index 24f5103..1d4d60f 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -216,6 +216,7 @@ static int sf_client_connect_done(int sock, short event, void *arg) { int result; struct fast_task_info *task; + char formatted_ip[FORMATTED_IP_SIZE]; task = (struct fast_task_info *)arg; if (task->canceled) { @@ -237,9 +238,10 @@ static int sf_client_connect_done(int sock, short event, void *arg) if (result != 0) { if (SF_CTX->connect_need_log) { + format_ip_address(task->server_ip, formatted_ip); logError("file: "__FILE__", line: %d, " "connect to server %s:%u fail, errno: %d, " - "error info: %s", __LINE__, task->server_ip, + "error info: %s", __LINE__, formatted_ip, task->port, result, STRERROR(result)); } ioevent_add_to_deleted_list(task); @@ -247,9 +249,10 @@ static int sf_client_connect_done(int sock, short event, void *arg) } if (SF_CTX->connect_need_log) { + format_ip_address(task->server_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, " "connect to server %s:%u successfully", - __LINE__, task->server_ip, task->port); + __LINE__, formatted_ip, task->port); } return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE); } @@ -270,6 +273,7 @@ int sf_socket_async_connect_server(struct fast_task_info *task) static int sf_async_connect_server(struct fast_task_info *task) { int result; + char formatted_ip[FORMATTED_IP_SIZE]; if ((result=task->handler->async_connect_server(task)) == EINPROGRESS) { result = ioevent_set(task, task->thread_data, task->event.fd, @@ -291,17 +295,19 @@ static int sf_async_connect_server(struct fast_task_info *task) } if (SF_CTX->connect_need_log) { + format_ip_address(task->server_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, " "connect to server %s:%u successfully", - __LINE__, task->server_ip, task->port); + __LINE__, formatted_ip, task->port); } return SF_CTX->callbacks.deal_task(task, SF_NIO_STAGE_HANDSHAKE); } else { task->handler->close_connection(task); if (SF_CTX->connect_need_log) { + format_ip_address(task->server_ip, formatted_ip); logError("file: "__FILE__", line: %d, " "connect to server %s:%u fail, errno: %d, " - "error info: %s", __LINE__, task->server_ip, + "error info: %s", __LINE__, formatted_ip, task->port, result, STRERROR(result)); } return result > 0 ? -1 * result : result; @@ -767,6 +773,7 @@ ssize_t sf_socket_recv_data(struct fast_task_info *task, static int calc_iops_and_trigger_polling(struct fast_task_info *task) { + char formatted_ip[FORMATTED_IP_SIZE]; int time_distance; int result = 0; @@ -793,9 +800,10 @@ static int calc_iops_and_trigger_polling(struct fast_task_info *task) fc_list_add_tail(&task->polling.dlink, &task->thread_data->polling_queue); + format_ip_address(task->client_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, client: %s:%u, " "trigger polling iops: %"PRId64, __LINE__, - task->client_ip, task->port, (task->req_count - + formatted_ip, task->port, (task->req_count - task->polling.last_req_count) / time_distance); } } else { @@ -813,6 +821,7 @@ static int calc_iops_and_trigger_polling(struct fast_task_info *task) static int calc_iops_and_remove_polling(struct fast_task_info *task) { + char formatted_ip[FORMATTED_IP_SIZE]; int time_distance; int result = 0; @@ -835,9 +844,10 @@ static int calc_iops_and_remove_polling(struct fast_task_info *task) result = sf_ioevent_add(task, (IOEventCallback) sf_client_sock_read, SF_CTX->net_buffer_cfg.network_timeout); + format_ip_address(task->client_ip, formatted_ip); logInfo("file: "__FILE__", line: %d, client: %s:%u, " "remove polling iops: %"PRId64, __LINE__, - task->client_ip, task->port, (task->req_count - + formatted_ip, task->port, (task->req_count - task->polling.last_req_count) / time_distance); } } else { diff --git a/src/sf_proto.c b/src/sf_proto.c index 99a2a68..bcc03c2 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -14,8 +14,6 @@ */ -#include -#include "fastcommon/shared_func.h" #include "sf_util.h" #include "sf_nio.h" #include "sf_proto.h" @@ -29,14 +27,16 @@ static int64_t log_slower_than_us = 0; int sf_proto_set_body_length(struct fast_task_info *task) { SFCommonProtoHeader *header; + char formatted_ip[FORMATTED_IP_SIZE]; header = (SFCommonProtoHeader *)task->recv.ptr->data; if (!SF_PROTO_CHECK_MAGIC(header->magic)) { + format_ip_address(task->client_ip, formatted_ip); logError("file: "__FILE__", line: %d, " "%s peer %s:%u, magic "SF_PROTO_MAGIC_FORMAT" is invalid, " "expect: "SF_PROTO_MAGIC_FORMAT", cmd: %d, body length: %d", __LINE__, (task->handler != NULL ? task->handler->fh->ctx-> - name : ""), task->client_ip, task->port, + name : ""), formatted_ip, task->port, SF_PROTO_MAGIC_PARAMS(header->magic), SF_PROTO_MAGIC_EXPECT_PARAMS, header->cmd, buff2int(header->body_len)); @@ -456,14 +456,17 @@ const char *sf_get_cmd_caption(const int cmd) int sf_proto_deal_ack(struct fast_task_info *task, SFRequestInfo *request, SFResponseInfo *response) { + char formatted_ip[FORMATTED_IP_SIZE]; + if (request->header.status != 0) { if (request->header.body_len > 0) { int remain_size; int len; + format_ip_address(task->client_ip, formatted_ip); response->error.length = sprintf(response->error.message, "message from peer %s:%u => ", - task->client_ip, task->port); + formatted_ip, task->port); remain_size = sizeof(response->error.message) - response->error.length; if (request->header.body_len >= remain_size) { @@ -525,6 +528,7 @@ int sf_proto_get_group_servers(ConnectionInfo *conn, char out_buff[sizeof(SFCommonProtoHeader) + sizeof(SFProtoGetGroupServersReq)]; char in_buff[1024]; + char formatted_ip[FORMATTED_IP_SIZE]; SFCommonProtoHeader *header; SFProtoGetGroupServersReq *req; SFProtoGetGroupServersRespBodyHeader *body_header; @@ -552,9 +556,10 @@ int sf_proto_get_group_servers(ConnectionInfo *conn, } if (body_len < sizeof(SFProtoGetGroupServersRespBodyHeader)) { + format_ip_address(conn->ip_addr, formatted_ip); logError("file: "__FILE__", line: %d, " - "server %s:%d response body length: %d < %d", - __LINE__, conn->ip_addr, conn->port, body_len, + "server %s:%u response body length: %d < %d", + __LINE__, formatted_ip, conn->port, body_len, (int)sizeof(SFProtoGetGroupServersRespBodyHeader)); return EINVAL; } @@ -562,15 +567,17 @@ int sf_proto_get_group_servers(ConnectionInfo *conn, body_header = (SFProtoGetGroupServersRespBodyHeader *)in_buff; count = buff2int(body_header->count); if (count <= 0) { + format_ip_address(conn->ip_addr, formatted_ip); logError("file: "__FILE__", line: %d, " - "server %s:%d response server count: %d <= 0", - __LINE__, conn->ip_addr, conn->port, count); + "server %s:%u response server count: %d <= 0", + __LINE__, formatted_ip, conn->port, count); return EINVAL; } if (count > sarray->alloc) { + format_ip_address(conn->ip_addr, formatted_ip); logError("file: "__FILE__", line: %d, " - "server %s:%d response server count: %d is too large, " - "exceeds %d", __LINE__, conn->ip_addr, conn->port, + "server %s:%u response server count: %d is too large, " + "exceeds %d", __LINE__, formatted_ip, conn->port, count, sarray->alloc); return EOVERFLOW; } @@ -630,6 +637,7 @@ int sf_proto_deal_task_done(struct fast_task_info *task, int r; int64_t time_used; int log_level; + char formatted_ip[FORMATTED_IP_SIZE]; char time_buff[32]; if (ctx->log_level != LOG_NOTHING && ctx->response.error.length > 0) { @@ -637,7 +645,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task, "file: "__FILE__", line: %d, %s " "peer %s:%u, cmd: %d (%s), req body length: %d, " "resp status: %d, %s", __LINE__, service_name, - task->client_ip, task->port, ctx->request.header.cmd, + format_ip_address(task->client_ip, formatted_ip), + task->port, ctx->request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.status, ctx->response.error.message); @@ -650,7 +659,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task, log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " "%s client %s:%u, req cmd: %d (%s), req body_len: %d, " "resp status: %d, time used: %s us", __LINE__, service_name, - task->client_ip, task->port, ctx->request.header.cmd, + format_ip_address(task->client_ip, formatted_ip), + task->port, ctx->request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.status, long_to_comma_str(time_used, time_buff)); @@ -691,8 +701,9 @@ int sf_proto_deal_task_done(struct fast_task_info *task, blen = sprintf(buff, "timed used: %s us, %s client %s:%u, " "req cmd: %d (%s), req body len: %d, resp cmd: %d (%s), " "status: %d, resp body len: %d", long_to_comma_str(time_used, - time_buff), service_name, task->client_ip, task->port, ctx-> - request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), + time_buff), service_name, format_ip_address(task-> + client_ip, formatted_ip), task->port, ctx->request. + header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.cmd, GET_CMD_CAPTION(ctx->response.header.cmd), ctx->response.header.status, ctx->response.header.body_len); @@ -705,7 +716,8 @@ int sf_proto_deal_task_done(struct fast_task_info *task, "%s client %s:%u, req cmd: %d (%s), req body_len: %d, " "resp cmd: %d (%s), status: %d, resp body_len: %d, " "time used: %s us", __LINE__, service_name, - task->client_ip, task->port, ctx->request.header.cmd, + format_ip_address(task->client_ip, formatted_ip), + task->port, ctx->request.header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), ctx->request.header.body_len, ctx->response.header.cmd, GET_CMD_CAPTION(ctx->response.header.cmd),