From 6433e3e8d9e9cc6f0cbde22da8354f7d9bf88bdf Mon Sep 17 00:00:00 2001 From: YuQing <384681@qq.com> Date: Tue, 15 Sep 2020 20:58:37 +0800 Subject: [PATCH] server_handler.h OK --- src/idempotency/client/receipt_handler.c | 34 ++--- src/idempotency/client/receipt_handler.h | 4 +- src/idempotency/server/server_handler.c | 74 ++++----- src/idempotency/server/server_handler.h | 16 +- src/sf_proto.c | 185 +++++++++++++++++++++-- src/sf_proto.h | 117 ++++++++++---- src/sf_types.h | 10 +- 7 files changed, 338 insertions(+), 102 deletions(-) diff --git a/src/idempotency/client/receipt_handler.c b/src/idempotency/client/receipt_handler.c index 0b04d82..0806294 100644 --- a/src/idempotency/client/receipt_handler.c +++ b/src/idempotency/client/receipt_handler.c @@ -82,17 +82,17 @@ static int setup_channel_request(struct fast_task_info *task) { IdempotencyClientChannel *channel; SFCommonProtoHeader *header; - FSProtoSetupChannelReq *req; + SFProtoSetupChannelReq *req; channel = (IdempotencyClientChannel *)task->arg; header = (SFCommonProtoHeader *)task->data; - req = (FSProtoSetupChannelReq *)(header + 1); + req = (SFProtoSetupChannelReq *)(header + 1); int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id); int2buff(__sync_add_and_fetch(&channel->key, 0), req->key); - FS_PROTO_SET_HEADER(header, FS_SERVICE_PROTO_SETUP_CHANNEL_REQ, - sizeof(FSProtoSetupChannelReq)); - task->length = sizeof(SFCommonProtoHeader) + sizeof(FSProtoSetupChannelReq); + SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ, + sizeof(SFProtoSetupChannelReq)); + task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq); return sf_send_add_event(task); } @@ -101,9 +101,9 @@ static int check_report_req_receipt(struct fast_task_info *task, { IdempotencyClientChannel *channel; SFCommonProtoHeader *header; - FSProtoReportReqReceiptHeader *rheader; - FSProtoReportReqReceiptBody *rbody; - FSProtoReportReqReceiptBody *rstart; + SFProtoReportReqReceiptHeader *rheader; + SFProtoReportReqReceiptBody *rbody; + SFProtoReportReqReceiptBody *rstart; IdempotencyClientReceipt *last; IdempotencyClientReceipt *receipt; char *buff_end; @@ -130,14 +130,14 @@ static int check_report_req_receipt(struct fast_task_info *task, } header = (SFCommonProtoHeader *)task->data; - rheader = (FSProtoReportReqReceiptHeader *)(header + 1); - rbody = rstart = (FSProtoReportReqReceiptBody *)(rheader + 1); + rheader = (SFProtoReportReqReceiptHeader *)(header + 1); + rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1); buff_end = task->data + task->size; last = NULL; receipt = channel->waiting_resp_qinfo.head; do { //check buffer remain space - if (buff_end - (char *)rbody < sizeof(FSProtoReportReqReceiptBody)) { + if (buff_end - (char *)rbody < sizeof(SFProtoReportReqReceiptBody)) { break; } @@ -164,7 +164,7 @@ static int check_report_req_receipt(struct fast_task_info *task, int2buff(*count, rheader->count); task->length = (char *)rbody - task->data; int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); - header->cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; + header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; return sf_send_add_event(task); } @@ -216,13 +216,13 @@ static int deal_setup_channel_response(struct fast_task_info *task) { int result; IdempotencyReceiptThreadContext *thread_ctx; - FSProtoSetupChannelResp *resp; + SFProtoSetupChannelResp *resp; IdempotencyClientChannel *channel; int channel_id; int channel_key; if ((result=receipt_expect_body_length(task, - sizeof(FSProtoSetupChannelResp))) != 0) + sizeof(SFProtoSetupChannelResp))) != 0) { return result; } @@ -236,7 +236,7 @@ static int deal_setup_channel_response(struct fast_task_info *task) return 0; } - resp = (FSProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); + resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); channel_id = buff2int(resp->channel_id); channel_key = buff2int(resp->key); idempotency_client_channel_set_id_key(channel, channel_id, channel_key); @@ -330,10 +330,10 @@ static int receipt_deal_task(struct fast_task_info *task) } switch (((SFCommonProtoHeader *)task->data)->cmd) { - case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP: + case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP: result = deal_setup_channel_response(task); break; - case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: + case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: result = deal_report_req_receipt_response(task); break; default: diff --git a/src/idempotency/client/receipt_handler.h b/src/idempotency/client/receipt_handler.h index 1402cb5..bfceebf 100644 --- a/src/idempotency/client/receipt_handler.h +++ b/src/idempotency/client/receipt_handler.h @@ -1,7 +1,7 @@ //receipt_handler.h -#ifndef _FS_IDEMPOTENCY_RECEIPT_HANDLER_H -#define _FS_IDEMPOTENCY_RECEIPT_HANDLER_H +#ifndef _SF_IDEMPOTENCY_RECEIPT_HANDLER_H +#define _SF_IDEMPOTENCY_RECEIPT_HANDLER_H #include "client_types.h" diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index a8a41e0..616ea80 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -25,25 +25,25 @@ #define SF_TASK_BODY_LENGTH(task) \ (task->length - sizeof(SFCommonProtoHeader)) -int service_deal_setup_channel(struct fast_task_info *task, +int sf_server_deal_setup_channel(struct fast_task_info *task, int *task_type, IdempotencyChannel **channel, SFResponseInfo *response) { int result; - FSProtoSetupChannelReq *req; - FSProtoSetupChannelResp *resp; + SFProtoSetupChannelReq *req; + SFProtoSetupChannelResp *resp; uint32_t channel_id; int key; - response->header.cmd = FS_SERVICE_PROTO_SETUP_CHANNEL_RESP; + response->header.cmd = SF_SERVICE_PROTO_SETUP_CHANNEL_RESP; if ((result=sf_server_expect_body_length(response, SF_TASK_BODY_LENGTH(task), - sizeof(FSProtoSetupChannelReq))) != 0) + sizeof(SFProtoSetupChannelReq))) != 0) { return result; } - req = (FSProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + req = (SFProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); channel_id = buff2int(req->channel_id); key = buff2int(req->key); if (*channel != NULL) { @@ -59,29 +59,29 @@ int service_deal_setup_channel(struct fast_task_info *task, return ENOMEM; } - *task_type = FS_SERVER_TASK_TYPE_CHANNEL_HOLDER; + *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER; - resp = (FSProtoSetupChannelResp *)(task->data + + resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->key, resp->key); - response->header.body_len = sizeof(FSProtoSetupChannelResp); + response->header.body_len = sizeof(SFProtoSetupChannelResp); //TASK_ARG->context.response_done = true; return 0; } -/* -static int check_holder_channel(struct fast_task_info *task) -{ - if (SERVER_TASK_TYPE != FS_SERVER_TASK_TYPE_CHANNEL_HOLDER) { - RESPONSE.error.length = sprintf(RESPONSE.error.message, - "unexpect task type: %d", SERVER_TASK_TYPE); +static int check_holder_channel(const int task_type, + IdempotencyChannel *channel, SFResponseInfo *response) +{ + if (task_type != SF_SERVER_TASK_TYPE_CHANNEL_HOLDER) { + response->error.length = sprintf(response->error.message, + "unexpect task type: %d", task_type); return EINVAL; } - if (*channel == NULL) { - RESPONSE.error.length = sprintf( - RESPONSE.error.message, + if (channel == NULL) { + response->error.length = sprintf( + response->error.message, "channel not exist"); return SF_RETRIABLE_ERROR_NO_CHANNEL; } @@ -89,21 +89,25 @@ static int check_holder_channel(struct fast_task_info *task) return 0; } -int service_deal_close_channel(struct fast_task_info *task) +int sf_server_deal_close_channel(struct fast_task_info *task, + int *task_type, IdempotencyChannel **channel, + SFResponseInfo *response) { int result; - if ((result=check_holder_channel(task)) != 0) { + if ((result=check_holder_channel(*task_type, *channel, response)) != 0) { return result; } - RESPONSE.header.cmd = FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP; idempotency_channel_free(*channel); *channel = NULL; - *task_type = FS_SERVER_TASK_TYPE_NONE; + *task_type = SF_SERVER_TASK_TYPE_NONE; + response->header.cmd = SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP; return 0; } -int service_deal_report_req_receipt(struct fast_task_info *task) +int sf_server_deal_report_req_receipt(struct fast_task_info *task, + int *task_type, IdempotencyChannel **channel, + SFResponseInfo *response) { int result; int count; @@ -111,35 +115,35 @@ int service_deal_report_req_receipt(struct fast_task_info *task) int body_len; int calc_body_len; int64_t req_id; - FSProtoReportReqReceiptHeader *body_header; - FSProtoReportReqReceiptBody *body_part; - FSProtoReportReqReceiptBody *body_end; + SFProtoReportReqReceiptHeader *body_header; + SFProtoReportReqReceiptBody *body_part; + SFProtoReportReqReceiptBody *body_end; - if ((result=check_holder_channel(task)) != 0) { + if ((result=check_holder_channel(*task_type, *channel, response)) != 0) { return result; } body_len = SF_TASK_BODY_LENGTH(task); if ((result=sf_server_check_min_body_length(response, body_len, - sizeof(FSProtoReportReqReceiptHeader))) != 0) + sizeof(SFProtoReportReqReceiptHeader))) != 0) { return result; } - body_header = (FSProtoReportReqReceiptHeader *) + body_header = (SFProtoReportReqReceiptHeader *) (task->data + sizeof(SFCommonProtoHeader)); count = buff2int(body_header->count); - calc_body_len = sizeof(FSProtoReportReqReceiptHeader) + - sizeof(FSProtoReportReqReceiptBody) * count; + calc_body_len = sizeof(SFProtoReportReqReceiptHeader) + + sizeof(SFProtoReportReqReceiptBody) * count; if (body_len != calc_body_len) { - RESPONSE.error.length = sprintf(RESPONSE.error.message, + response->error.length = sprintf(response->error.message, "body length: %d != calculated body length: %d", body_len, calc_body_len); return EINVAL; } success = 0; - body_part = (FSProtoReportReqReceiptBody *)(body_header + 1); + body_part = (SFProtoReportReqReceiptBody *)(body_header + 1); body_end = body_part + count; for (; body_part < body_end; body_part++) { req_id = buff2long(body_part->req_id); @@ -150,8 +154,6 @@ int service_deal_report_req_receipt(struct fast_task_info *task) logInfo("receipt count: %d, success: %d", count, success); - RESPONSE.header.cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; + response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; return 0; } - -*/ diff --git a/src/idempotency/server/server_handler.h b/src/idempotency/server/server_handler.h index 047a67c..eec28ca 100644 --- a/src/idempotency/server/server_handler.h +++ b/src/idempotency/server/server_handler.h @@ -1,7 +1,7 @@ //server_handler.h -#ifndef _FS_IDEMPOTENCY_SERVER_HANDLER_H -#define _FS_IDEMPOTENCY_SERVER_HANDLER_H +#ifndef _SF_IDEMPOTENCY_SERVER_HANDLER_H +#define _SF_IDEMPOTENCY_SERVER_HANDLER_H #include "server_types.h" @@ -9,6 +9,18 @@ extern "C" { #endif +int sf_server_deal_setup_channel(struct fast_task_info *task, + int *task_type, IdempotencyChannel **channel, + SFResponseInfo *response); + +int sf_server_deal_close_channel(struct fast_task_info *task, + int *task_type, IdempotencyChannel **channel, + SFResponseInfo *response); + +int sf_server_deal_report_req_receipt(struct fast_task_info *task, + int *task_type, IdempotencyChannel **channel, + SFResponseInfo *response); + #ifdef __cplusplus } #endif diff --git a/src/sf_proto.c b/src/sf_proto.c index a7698a1..b15d90a 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -8,33 +8,196 @@ int sf_proto_set_body_length(struct fast_task_info *task) SFCommonProtoHeader *header; header = (SFCommonProtoHeader *)task->data; - if (!FS_PROTO_CHECK_MAGIC(header->magic)) { + if (!SF_PROTO_CHECK_MAGIC(header->magic)) { logError("file: "__FILE__", line: %d, " - "client ip: %s, magic "FS_PROTO_MAGIC_FORMAT - " is invalid, expect: "FS_PROTO_MAGIC_FORMAT, + "client ip: %s, magic "SF_PROTO_MAGIC_FORMAT + " is invalid, expect: "SF_PROTO_MAGIC_FORMAT, __LINE__, task->client_ip, - FS_PROTO_MAGIC_PARAMS(header->magic), - FS_PROTO_MAGIC_EXPECT_PARAMS); + SF_PROTO_MAGIC_PARAMS(header->magic), + SF_PROTO_MAGIC_EXPECT_PARAMS); return EINVAL; } task->length = buff2int(header->body_len); //set body length return 0; } + +int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd) +{ + int result; + + if (response->header.status == 0) { + if (response->header.cmd != expect_cmd) { + response->error.length = sprintf( + response->error.message, + "response cmd: %d != expect: %d", + response->header.cmd, expect_cmd); + return EINVAL; + } + + return 0; + } + + if (response->header.body_len > 0) { + int recv_bytes; + if (response->header.body_len >= sizeof(response->error.message)) { + response->error.length = sizeof(response->error.message) - 1; + } else { + response->error.length = response->header.body_len; + } + + if ((result=tcprecvdata_nb_ex(conn->sock, response->error.message, + response->error.length, network_timeout, &recv_bytes)) == 0) + { + response->error.message[response->error.length] = '\0'; + } else { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv error message fail, " + "recv bytes: %d, expect message length: %d, " + "errno: %d, error info: %s", recv_bytes, + response->error.length, result, STRERROR(result)); + } + } else { + response->error.length = 0; + response->error.message[0] = '\0'; + } + + return response->header.status; +} + +int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, + const int len, SFResponseInfo *response, const int network_timeout) +{ + int result; + SFCommonProtoHeader header_proto; + + if ((result=tcpsenddata_nb(conn->sock, data, len, network_timeout)) != 0) { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "send data fail, errno: %d, error info: %s", + result, STRERROR(result)); + return result; + } + + if ((result=tcprecvdata_nb(conn->sock, &header_proto, + sizeof(SFCommonProtoHeader), network_timeout)) != 0) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv data fail, errno: %d, error info: %s", + result, STRERROR(result)); + return result; + } + + sf_proto_extract_header(&header_proto, &response->header); + return 0; +} + +int sf_send_and_recv_response(ConnectionInfo *conn, char *send_data, + const int send_len, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd, + char *recv_data, const int expect_body_len) +{ + int result; + int recv_bytes; + + if ((result=sf_send_and_check_response_header(conn, + send_data, send_len, response, + network_timeout, expect_cmd)) != 0) + { + return result; + } + + if (response->header.body_len != expect_body_len) { + response->error.length = sprintf(response->error.message, + "response body length: %d != %d", + response->header.body_len, + expect_body_len); + return EINVAL; + } + if (expect_body_len == 0) { + return 0; + } + + if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, + expect_body_len, network_timeout, &recv_bytes)) != 0) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv body fail, recv bytes: %d, expect body length: %d, " + "errno: %d, error info: %s", recv_bytes, + response->header.body_len, + result, STRERROR(result)); + } + return result; +} + +int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd, + char *recv_data, const int expect_body_len) +{ + int result; + int recv_bytes; + SFCommonProtoHeader header_proto; + + if ((result=tcprecvdata_nb(conn->sock, &header_proto, + sizeof(SFCommonProtoHeader), network_timeout)) != 0) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv data fail, errno: %d, error info: %s", + result, STRERROR(result)); + return result; + } + sf_proto_extract_header(&header_proto, &response->header); + + if ((result=sf_check_response(conn, response, network_timeout, + expect_cmd)) != 0) + { + return result; + } + + if (response->header.body_len != expect_body_len) { + response->error.length = sprintf(response->error.message, + "response body length: %d != %d", + response->header.body_len, + expect_body_len); + return EINVAL; + } + if (expect_body_len == 0) { + return 0; + } + + if ((result=tcprecvdata_nb_ex(conn->sock, recv_data, + expect_body_len, network_timeout, &recv_bytes)) != 0) + { + response->error.length = snprintf(response->error.message, + sizeof(response->error.message), + "recv body fail, recv bytes: %d, expect body length: %d, " + "errno: %d, error info: %s", recv_bytes, + response->header.body_len, + result, STRERROR(result)); + } + + return result; +} + const char *sf_get_cmd_caption(const int cmd) { switch (cmd) { - case FS_SERVICE_PROTO_SETUP_CHANNEL_REQ: + case SF_SERVICE_PROTO_SETUP_CHANNEL_REQ: return "SETUP_CHANNEL_REQ"; - case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP: + case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP: return "SETUP_CHANNEL_RESP"; - case FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ: + case SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ: return "CLOSE_CHANNEL_REQ"; - case FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP: + case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: return "CLOSE_CHANNEL_RESP"; - case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ: + case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ: return "REPORT_REQ_RECEIPT_REQ"; - case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: + case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: return "REPORT_REQ_RECEIPT_RESP"; default: return "UNKOWN"; diff --git a/src/sf_proto.h b/src/sf_proto.h index 280ce1f..2b486de 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -1,7 +1,7 @@ //sf_proto.h -#ifndef _FS_IDEMPOTENCY_PROTO_H -#define _FS_IDEMPOTENCY_PROTO_H +#ifndef _SF_IDEMPOTENCY_PROTO_H +#define _SF_IDEMPOTENCY_PROTO_H #include "fastcommon/fast_task_queue.h" #include "fastcommon/shared_func.h" @@ -11,38 +11,38 @@ #include "sf_types.h" //for request idempotency -#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 111 -#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 112 -#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113 -#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114 -#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115 -#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116 +#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 111 +#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 112 +#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113 +#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114 +#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115 +#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116 -#define FS_PROTO_MAGIC_CHAR '@' -#define FS_PROTO_SET_MAGIC(m) \ - m[0] = m[1] = m[2] = m[3] = FS_PROTO_MAGIC_CHAR +#define SF_PROTO_MAGIC_CHAR '@' +#define SF_PROTO_SET_MAGIC(m) \ + m[0] = m[1] = m[2] = m[3] = SF_PROTO_MAGIC_CHAR -#define FS_PROTO_CHECK_MAGIC(m) \ - (m[0] == FS_PROTO_MAGIC_CHAR && m[1] == FS_PROTO_MAGIC_CHAR && \ - m[2] == FS_PROTO_MAGIC_CHAR && m[3] == FS_PROTO_MAGIC_CHAR) +#define SF_PROTO_CHECK_MAGIC(m) \ + (m[0] == SF_PROTO_MAGIC_CHAR && m[1] == SF_PROTO_MAGIC_CHAR && \ + m[2] == SF_PROTO_MAGIC_CHAR && m[3] == SF_PROTO_MAGIC_CHAR) -#define FS_PROTO_MAGIC_FORMAT "0x%02X%02X%02X%02X" -#define FS_PROTO_MAGIC_EXPECT_PARAMS \ - FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR, \ - FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR +#define SF_PROTO_MAGIC_FORMAT "0x%02X%02X%02X%02X" +#define SF_PROTO_MAGIC_EXPECT_PARAMS \ + SF_PROTO_MAGIC_CHAR, SF_PROTO_MAGIC_CHAR, \ + SF_PROTO_MAGIC_CHAR, SF_PROTO_MAGIC_CHAR -#define FS_PROTO_MAGIC_PARAMS(m) \ +#define SF_PROTO_MAGIC_PARAMS(m) \ m[0], m[1], m[2], m[3] -#define FS_PROTO_SET_HEADER(header, _cmd, _body_len) \ +#define SF_PROTO_SET_HEADER(header, _cmd, _body_len) \ do { \ - FS_PROTO_SET_MAGIC((header)->magic); \ + SF_PROTO_SET_MAGIC((header)->magic); \ (header)->cmd = _cmd; \ (header)->status[0] = (header)->status[1] = 0; \ int2buff(_body_len, (header)->body_len); \ } while (0) -#define FS_PROTO_SET_RESPONSE_HEADER(proto_header, resp_header) \ +#define SF_PROTO_SET_RESPONSE_HEADER(proto_header, resp_header) \ do { \ (proto_header)->cmd = (resp_header).cmd; \ short2buff((resp_header).status, (proto_header)->status); \ @@ -59,24 +59,24 @@ typedef struct sf_common_proto_header { char padding[3]; } SFCommonProtoHeader; -typedef struct fs_proto_setup_channel_req { +typedef struct sf_proto_setup_channel_req { char channel_id[4]; //for hint char key[4]; //for validate when channel_id > 0 -} FSProtoSetupChannelReq; +} SFProtoSetupChannelReq; -typedef struct fs_proto_setup_channel_resp { +typedef struct sf_proto_setup_channel_resp { char channel_id[4]; char key[4]; -} FSProtoSetupChannelResp; +} SFProtoSetupChannelResp; -typedef struct fs_proto_report_req_receipt_header { +typedef struct sf_proto_report_req_receipt_header { char count[4]; char padding[4]; -} FSProtoReportReqReceiptHeader; +} SFProtoReportReqReceiptHeader; -typedef struct fs_proto_report_req_receipt_body { +typedef struct sf_proto_report_req_receipt_body { char req_id[8]; -} FSProtoReportReqReceiptBody; +} SFProtoReportReqReceiptBody; #ifdef __cplusplus extern "C" { @@ -163,6 +163,63 @@ static inline int sf_server_check_body_length( body_length, max_body_length); } +int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd); + +int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd, + char *recv_data, const int expect_body_len); + +int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data, + const int len, SFResponseInfo *response, const int network_timeout); + +static inline int sf_send_and_check_response_header(ConnectionInfo *conn, + char *data, const int len, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd) +{ + int result; + + if ((result=sf_send_and_recv_response_header(conn, data, len, + response, network_timeout)) != 0) + { + return result; + } + + + if ((result=sf_check_response(conn, response, network_timeout, + expect_cmd)) != 0) + { + return result; + } + + return 0; +} + +int sf_send_and_recv_response(ConnectionInfo *conn, char *send_data, + const int send_len, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd, + char *recv_data, const int expect_body_len); + +static inline int sf_send_and_recv_none_body_response(ConnectionInfo *conn, + char *send_data, const int send_len, SFResponseInfo *response, + const int network_timeout, const unsigned char expect_cmd) +{ + char *recv_data = NULL; + const int expect_body_len = 0; + + return sf_send_and_recv_response(conn, send_data, send_len, response, + network_timeout, expect_cmd, recv_data, expect_body_len); +} + +static inline void sf_proto_extract_header(SFCommonProtoHeader *header_proto, + SFHeaderInfo *header_info) +{ + header_info->cmd = header_proto->cmd; + header_info->body_len = buff2int(header_proto->body_len); + header_info->flags = buff2short(header_proto->flags); + header_info->status = buff2short(header_proto->status); +} + #ifdef __cplusplus } diff --git a/src/sf_types.h b/src/sf_types.h index 35c2e8b..8826201 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -11,9 +11,11 @@ #include "fastcommon/connection_pool.h" #include "fastcommon/fast_task_queue.h" -#define FS_ERROR_INFO_SIZE 256 -#define FS_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency -#define FS_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency +#define SF_ERROR_INFO_SIZE 256 + +#define SF_SERVER_TASK_TYPE_NONE 0 +#define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency +#define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, const bool bInnerPort); @@ -61,7 +63,7 @@ typedef struct { SFHeaderInfo header; struct { int length; - char message[FS_ERROR_INFO_SIZE]; + char message[SF_ERROR_INFO_SIZE]; } error; } SFResponseInfo;