add functions sf_proto_init_task_context, sf_proto_deal_task_done etc.

storage_pool
YuQing 2021-03-10 11:15:09 +08:00
parent ae600238bc
commit 86017c46ac
3 changed files with 153 additions and 0 deletions

View File

@ -17,8 +17,15 @@
#include <errno.h> #include <errno.h>
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
#include "sf_util.h" #include "sf_util.h"
#include "sf_nio.h"
#include "sf_proto.h" #include "sf_proto.h"
static SFHandlerContext sf_handler_ctx = {NULL, {NULL, NULL}};
static int64_t log_slower_than_us = 0;
#define GET_CMD_CAPTION(cmd) sf_handler_ctx.callbacks.get_cmd_caption(cmd)
#define GET_CMD_LOG_LEVEL(cmd) sf_handler_ctx.callbacks.get_cmd_log_level(cmd)
int sf_proto_set_body_length(struct fast_task_info *task) int sf_proto_set_body_length(struct fast_task_info *task)
{ {
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
@ -470,3 +477,102 @@ int sf_proto_get_leader(ConnectionInfo *conn,
return result; return result;
} }
void sf_proto_set_handler_context(const SFHandlerContext *ctx)
{
sf_handler_ctx = *ctx;
log_slower_than_us = ctx->slow_log->cfg.log_slower_than_ms * 1000;
}
int sf_proto_deal_task_done(struct fast_task_info *task,
SFCommonTaskContext *ctx)
{
SFCommonProtoHeader *proto_header;
int status;
int r;
int64_t time_used;
int log_level;
char time_buff[32];
if (ctx->log_level != LOG_NOTHING && ctx->response.error.length > 0) {
log_it_ex(&g_log_context, ctx->log_level,
"file: "__FILE__", line: %d, "
"peer %s:%u, cmd: %d (%s), req body length: %d, "
"resp status: %d, %s", __LINE__, task->client_ip,
task->port, ctx->request.header.cmd,
GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.status,
ctx->response.error.message);
}
if (!ctx->need_response) {
if (sf_handler_ctx.callbacks.get_cmd_log_level != NULL) {
time_used = get_current_time_us() - ctx->req_start_time;
log_level = GET_CMD_LOG_LEVEL(ctx->request.header.cmd);
log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, "
"client %s:%u, req cmd: %d (%s), req body_len: %d, "
"resp status: %d, time used: %s us", __LINE__,
task->client_ip, task->port, ctx->request.header.cmd,
GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.status,
long_to_comma_str(time_used, time_buff));
}
if (ctx->response.header.status == 0) {
task->offset = task->length = 0;
return sf_set_read_event(task);
} else {
return FC_NEGATIVE(ctx->response.header.status);
}
}
proto_header = (SFCommonProtoHeader *)task->data;
if (!ctx->response_done) {
ctx->response.header.body_len = ctx->response.error.length;
if (ctx->response.error.length > 0) {
memcpy(task->data + sizeof(SFCommonProtoHeader),
ctx->response.error.message, ctx->response.error.length);
}
}
status = sf_unify_errno(FC_ABS(ctx->response.header.status));
short2buff(status, proto_header->status);
proto_header->cmd = ctx->response.header.cmd;
int2buff(ctx->response.header.body_len, proto_header->body_len);
task->length = sizeof(SFCommonProtoHeader) + ctx->response.header.body_len;
r = sf_send_add_event(task);
time_used = get_current_time_us() - ctx->req_start_time;
if ((sf_handler_ctx.slow_log != NULL) && (sf_handler_ctx.slow_log->
cfg.enabled && time_used > log_slower_than_us))
{
char buff[256];
int blen;
blen = sprintf(buff, "timed used: %s us, client %s:%u, "
"req cmd: %d (%s), req body len: %d, resp cmd: %d (%s), "
"status: %d, resp body len: %d", long_to_comma_str(time_used,
time_buff), task->client_ip, task->port, ctx->request.
header.cmd, GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.cmd,
GET_CMD_CAPTION(ctx->response.header.cmd),
ctx->response.header.status, ctx->response.header.body_len);
log_it_ex2(&sf_handler_ctx.slow_log->ctx, NULL, buff, blen, false, true);
}
if (sf_handler_ctx.callbacks.get_cmd_log_level != NULL) {
log_level = GET_CMD_LOG_LEVEL(ctx->request.header.cmd);
log_it_ex(&g_log_context, log_level, "file: "__FILE__", line: %d, "
"client %s:%u, req cmd: %d (%s), req body_len: %d, "
"resp cmd: %d (%s), status: %d, resp body_len: %d, "
"time used: %s us", __LINE__,
task->client_ip, task->port, ctx->request.header.cmd,
GET_CMD_CAPTION(ctx->request.header.cmd),
ctx->request.header.body_len, ctx->response.header.cmd,
GET_CMD_CAPTION(ctx->response.header.cmd),
ctx->response.header.status, ctx->response.header.body_len,
long_to_comma_str(time_used, time_buff));
}
return r == 0 ? ctx->response.header.status : r;
}

View File

@ -168,14 +168,52 @@ typedef struct sf_client_server_entry {
ConnectionInfo conn; ConnectionInfo conn;
} SFClientServerEntry; } SFClientServerEntry;
typedef const char *(*sf_get_cmd_caption_func)(const int cmd);
typedef int (*sf_get_cmd_log_level_func)(const int cmd);
typedef struct {
sf_get_cmd_caption_func get_cmd_caption;
sf_get_cmd_log_level_func get_cmd_log_level;
} SFCommandCallbacks;
typedef struct {
SFSlowLogContext *slow_log;
SFCommandCallbacks callbacks;
} SFHandlerContext;
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
void sf_proto_set_handler_context(const SFHandlerContext *ctx);
int sf_proto_set_body_length(struct fast_task_info *task); int sf_proto_set_body_length(struct fast_task_info *task);
const char *sf_get_cmd_caption(const int cmd); const char *sf_get_cmd_caption(const int cmd);
int sf_proto_deal_task_done(struct fast_task_info *task,
SFCommonTaskContext *ctx);
static inline void sf_proto_init_task_context(struct fast_task_info *task,
SFCommonTaskContext *ctx)
{
ctx->req_start_time = get_current_time_us();
ctx->response.header.cmd = SF_PROTO_ACK;
ctx->response.header.body_len = 0;
ctx->response.header.status = 0;
ctx->response.error.length = 0;
ctx->response.error.message[0] = '\0';
ctx->log_level = LOG_ERR;
ctx->response_done = false;
ctx->need_response = true;
ctx->request.header.cmd = ((SFCommonProtoHeader *)task->data)->cmd;
ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader);
ctx->request.header.status = buff2short(((SFCommonProtoHeader *)
task->data)->status);
ctx->request.body = task->data + sizeof(SFCommonProtoHeader);
}
static inline void sf_log_network_error_ex1(SFResponseInfo *response, static inline void sf_log_network_error_ex1(SFResponseInfo *response,
const ConnectionInfo *conn, const int result, const ConnectionInfo *conn, const int result,
const int log_level, const char *file, const int line) const int log_level, const char *file, const int line)

View File

@ -84,6 +84,15 @@ typedef struct {
SFErrorInfo error; SFErrorInfo error;
} SFResponseInfo; } SFResponseInfo;
typedef struct {
int64_t req_start_time; //unit: microsecond (us)
SFRequestInfo request;
SFResponseInfo response;
bool response_done;
char log_level; //level for error log
bool need_response;
} SFCommonTaskContext;
typedef struct sf_binlog_file_position { typedef struct sf_binlog_file_position {
int index; //current binlog file int index; //current binlog file
int64_t offset; //current file offset int64_t offset; //current file offset