From 86017c46ac88b00b0816e7170376a9ecbb9b635f Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Wed, 10 Mar 2021 11:15:09 +0800 Subject: [PATCH] add functions sf_proto_init_task_context, sf_proto_deal_task_done etc. --- src/sf_proto.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++++ src/sf_proto.h | 38 ++++++++++++++++++ src/sf_types.h | 9 +++++ 3 files changed, 153 insertions(+) diff --git a/src/sf_proto.c b/src/sf_proto.c index 61bbf53..9c02c9b 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -17,8 +17,15 @@ #include #include "fastcommon/shared_func.h" #include "sf_util.h" +#include "sf_nio.h" #include "sf_proto.h" +static SFHandlerContext sf_handler_ctx = {NULL, {NULL, NULL}}; +static int64_t log_slower_than_us = 0; + +#define GET_CMD_CAPTION(cmd) sf_handler_ctx.callbacks.get_cmd_caption(cmd) +#define GET_CMD_LOG_LEVEL(cmd) sf_handler_ctx.callbacks.get_cmd_log_level(cmd) + int sf_proto_set_body_length(struct fast_task_info *task) { SFCommonProtoHeader *header; @@ -470,3 +477,102 @@ int sf_proto_get_leader(ConnectionInfo *conn, return result; } + +void sf_proto_set_handler_context(const SFHandlerContext *ctx) +{ + sf_handler_ctx = *ctx; + log_slower_than_us = ctx->slow_log->cfg.log_slower_than_ms * 1000; +} + +int sf_proto_deal_task_done(struct fast_task_info *task, + SFCommonTaskContext *ctx) +{ + SFCommonProtoHeader *proto_header; + int status; + int r; + int64_t time_used; + int log_level; + char time_buff[32]; + + if (ctx->log_level != LOG_NOTHING && ctx->response.error.length > 0) { + log_it_ex(&g_log_context, ctx->log_level, + "file: "__FILE__", line: %d, " + "peer %s:%u, cmd: %d (%s), req body length: %d, " + "resp status: %d, %s", __LINE__, task->client_ip, + task->port, ctx->request.header.cmd, + GET_CMD_CAPTION(ctx->request.header.cmd), + ctx->request.header.body_len, ctx->response.header.status, + ctx->response.error.message); + } + + if (!ctx->need_response) { + if (sf_handler_ctx.callbacks.get_cmd_log_level != NULL) { + time_used = get_current_time_us() - ctx->req_start_time; + log_level = GET_CMD_LOG_LEVEL(ctx->request.header.cmd); + log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " + "client %s:%u, req cmd: %d (%s), req body_len: %d, " + "resp status: %d, time used: %s us", __LINE__, + task->client_ip, task->port, ctx->request.header.cmd, + GET_CMD_CAPTION(ctx->request.header.cmd), + ctx->request.header.body_len, ctx->response.header.status, + long_to_comma_str(time_used, time_buff)); + } + + if (ctx->response.header.status == 0) { + task->offset = task->length = 0; + return sf_set_read_event(task); + } else { + return FC_NEGATIVE(ctx->response.header.status); + } + } + + proto_header = (SFCommonProtoHeader *)task->data; + if (!ctx->response_done) { + ctx->response.header.body_len = ctx->response.error.length; + if (ctx->response.error.length > 0) { + memcpy(task->data + sizeof(SFCommonProtoHeader), + ctx->response.error.message, ctx->response.error.length); + } + } + + status = sf_unify_errno(FC_ABS(ctx->response.header.status)); + short2buff(status, proto_header->status); + proto_header->cmd = ctx->response.header.cmd; + int2buff(ctx->response.header.body_len, proto_header->body_len); + task->length = sizeof(SFCommonProtoHeader) + ctx->response.header.body_len; + + r = sf_send_add_event(task); + time_used = get_current_time_us() - ctx->req_start_time; + if ((sf_handler_ctx.slow_log != NULL) && (sf_handler_ctx.slow_log-> + cfg.enabled && time_used > log_slower_than_us)) + { + char buff[256]; + int blen; + + blen = sprintf(buff, "timed used: %s us, client %s:%u, " + "req cmd: %d (%s), req body len: %d, resp cmd: %d (%s), " + "status: %d, resp body len: %d", long_to_comma_str(time_used, + time_buff), task->client_ip, task->port, ctx->request. + header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd), + ctx->request.header.body_len, ctx->response.header.cmd, + GET_CMD_CAPTION(ctx->response.header.cmd), + ctx->response.header.status, ctx->response.header.body_len); + log_it_ex2(&sf_handler_ctx.slow_log->ctx, NULL, buff, blen, false, true); + } + + if (sf_handler_ctx.callbacks.get_cmd_log_level != NULL) { + log_level = GET_CMD_LOG_LEVEL(ctx->request.header.cmd); + log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, " + "client %s:%u, req cmd: %d (%s), req body_len: %d, " + "resp cmd: %d (%s), status: %d, resp body_len: %d, " + "time used: %s us", __LINE__, + task->client_ip, task->port, ctx->request.header.cmd, + GET_CMD_CAPTION(ctx->request.header.cmd), + ctx->request.header.body_len, ctx->response.header.cmd, + GET_CMD_CAPTION(ctx->response.header.cmd), + ctx->response.header.status, ctx->response.header.body_len, + long_to_comma_str(time_used, time_buff)); + } + + return r == 0 ? ctx->response.header.status : r; +} diff --git a/src/sf_proto.h b/src/sf_proto.h index 84dcb9c..d447111 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -168,14 +168,52 @@ typedef struct sf_client_server_entry { ConnectionInfo conn; } SFClientServerEntry; +typedef const char *(*sf_get_cmd_caption_func)(const int cmd); +typedef int (*sf_get_cmd_log_level_func)(const int cmd); + +typedef struct { + sf_get_cmd_caption_func get_cmd_caption; + sf_get_cmd_log_level_func get_cmd_log_level; +} SFCommandCallbacks; + +typedef struct { + SFSlowLogContext *slow_log; + SFCommandCallbacks callbacks; +} SFHandlerContext; + #ifdef __cplusplus extern "C" { #endif +void sf_proto_set_handler_context(const SFHandlerContext *ctx); + int sf_proto_set_body_length(struct fast_task_info *task); const char *sf_get_cmd_caption(const int cmd); +int sf_proto_deal_task_done(struct fast_task_info *task, + SFCommonTaskContext *ctx); + +static inline void sf_proto_init_task_context(struct fast_task_info *task, + SFCommonTaskContext *ctx) +{ + ctx->req_start_time = get_current_time_us(); + ctx->response.header.cmd = SF_PROTO_ACK; + ctx->response.header.body_len = 0; + ctx->response.header.status = 0; + ctx->response.error.length = 0; + ctx->response.error.message[0] = '\0'; + ctx->log_level = LOG_ERR; + ctx->response_done = false; + ctx->need_response = true; + + ctx->request.header.cmd = ((SFCommonProtoHeader *)task->data)->cmd; + ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader); + ctx->request.header.status = buff2short(((SFCommonProtoHeader *) + task->data)->status); + ctx->request.body = task->data + sizeof(SFCommonProtoHeader); +} + static inline void sf_log_network_error_ex1(SFResponseInfo *response, const ConnectionInfo *conn, const int result, const int log_level, const char *file, const int line) diff --git a/src/sf_types.h b/src/sf_types.h index 997adba..7d0323a 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -84,6 +84,15 @@ typedef struct { SFErrorInfo error; } SFResponseInfo; +typedef struct { + int64_t req_start_time; //unit: microsecond (us) + SFRequestInfo request; + SFResponseInfo response; + bool response_done; + char log_level; //level for error log + bool need_response; +} SFCommonTaskContext; + typedef struct sf_binlog_file_position { int index; //current binlog file int64_t offset; //current file offset