server_handler.h OK

connection_manager
YuQing 2020-09-15 20:58:37 +08:00
parent 6d4af38931
commit 6433e3e8d9
7 changed files with 338 additions and 102 deletions

View File

@ -82,17 +82,17 @@ static int setup_channel_request(struct fast_task_info *task)
{ {
IdempotencyClientChannel *channel; IdempotencyClientChannel *channel;
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
FSProtoSetupChannelReq *req; SFProtoSetupChannelReq *req;
channel = (IdempotencyClientChannel *)task->arg; channel = (IdempotencyClientChannel *)task->arg;
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->data;
req = (FSProtoSetupChannelReq *)(header + 1); req = (SFProtoSetupChannelReq *)(header + 1);
int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id); int2buff(__sync_add_and_fetch(&channel->id, 0), req->channel_id);
int2buff(__sync_add_and_fetch(&channel->key, 0), req->key); int2buff(__sync_add_and_fetch(&channel->key, 0), req->key);
FS_PROTO_SET_HEADER(header, FS_SERVICE_PROTO_SETUP_CHANNEL_REQ, SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_SETUP_CHANNEL_REQ,
sizeof(FSProtoSetupChannelReq)); sizeof(SFProtoSetupChannelReq));
task->length = sizeof(SFCommonProtoHeader) + sizeof(FSProtoSetupChannelReq); task->length = sizeof(SFCommonProtoHeader) + sizeof(SFProtoSetupChannelReq);
return sf_send_add_event(task); return sf_send_add_event(task);
} }
@ -101,9 +101,9 @@ static int check_report_req_receipt(struct fast_task_info *task,
{ {
IdempotencyClientChannel *channel; IdempotencyClientChannel *channel;
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
FSProtoReportReqReceiptHeader *rheader; SFProtoReportReqReceiptHeader *rheader;
FSProtoReportReqReceiptBody *rbody; SFProtoReportReqReceiptBody *rbody;
FSProtoReportReqReceiptBody *rstart; SFProtoReportReqReceiptBody *rstart;
IdempotencyClientReceipt *last; IdempotencyClientReceipt *last;
IdempotencyClientReceipt *receipt; IdempotencyClientReceipt *receipt;
char *buff_end; char *buff_end;
@ -130,14 +130,14 @@ static int check_report_req_receipt(struct fast_task_info *task,
} }
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->data;
rheader = (FSProtoReportReqReceiptHeader *)(header + 1); rheader = (SFProtoReportReqReceiptHeader *)(header + 1);
rbody = rstart = (FSProtoReportReqReceiptBody *)(rheader + 1); rbody = rstart = (SFProtoReportReqReceiptBody *)(rheader + 1);
buff_end = task->data + task->size; buff_end = task->data + task->size;
last = NULL; last = NULL;
receipt = channel->waiting_resp_qinfo.head; receipt = channel->waiting_resp_qinfo.head;
do { do {
//check buffer remain space //check buffer remain space
if (buff_end - (char *)rbody < sizeof(FSProtoReportReqReceiptBody)) { if (buff_end - (char *)rbody < sizeof(SFProtoReportReqReceiptBody)) {
break; break;
} }
@ -164,7 +164,7 @@ static int check_report_req_receipt(struct fast_task_info *task,
int2buff(*count, rheader->count); int2buff(*count, rheader->count);
task->length = (char *)rbody - task->data; task->length = (char *)rbody - task->data;
int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len); int2buff(task->length - sizeof(SFCommonProtoHeader), header->body_len);
header->cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ; header->cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ;
return sf_send_add_event(task); return sf_send_add_event(task);
} }
@ -216,13 +216,13 @@ static int deal_setup_channel_response(struct fast_task_info *task)
{ {
int result; int result;
IdempotencyReceiptThreadContext *thread_ctx; IdempotencyReceiptThreadContext *thread_ctx;
FSProtoSetupChannelResp *resp; SFProtoSetupChannelResp *resp;
IdempotencyClientChannel *channel; IdempotencyClientChannel *channel;
int channel_id; int channel_id;
int channel_key; int channel_key;
if ((result=receipt_expect_body_length(task, if ((result=receipt_expect_body_length(task,
sizeof(FSProtoSetupChannelResp))) != 0) sizeof(SFProtoSetupChannelResp))) != 0)
{ {
return result; return result;
} }
@ -236,7 +236,7 @@ static int deal_setup_channel_response(struct fast_task_info *task)
return 0; return 0;
} }
resp = (FSProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader)); resp = (SFProtoSetupChannelResp *)(task->data + sizeof(SFCommonProtoHeader));
channel_id = buff2int(resp->channel_id); channel_id = buff2int(resp->channel_id);
channel_key = buff2int(resp->key); channel_key = buff2int(resp->key);
idempotency_client_channel_set_id_key(channel, channel_id, channel_key); idempotency_client_channel_set_id_key(channel, channel_id, channel_key);
@ -330,10 +330,10 @@ static int receipt_deal_task(struct fast_task_info *task)
} }
switch (((SFCommonProtoHeader *)task->data)->cmd) { switch (((SFCommonProtoHeader *)task->data)->cmd) {
case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP: case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP:
result = deal_setup_channel_response(task); result = deal_setup_channel_response(task);
break; break;
case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP:
result = deal_report_req_receipt_response(task); result = deal_report_req_receipt_response(task);
break; break;
default: default:

View File

@ -1,7 +1,7 @@
//receipt_handler.h //receipt_handler.h
#ifndef _FS_IDEMPOTENCY_RECEIPT_HANDLER_H #ifndef _SF_IDEMPOTENCY_RECEIPT_HANDLER_H
#define _FS_IDEMPOTENCY_RECEIPT_HANDLER_H #define _SF_IDEMPOTENCY_RECEIPT_HANDLER_H
#include "client_types.h" #include "client_types.h"

View File

@ -25,25 +25,25 @@
#define SF_TASK_BODY_LENGTH(task) \ #define SF_TASK_BODY_LENGTH(task) \
(task->length - sizeof(SFCommonProtoHeader)) (task->length - sizeof(SFCommonProtoHeader))
int service_deal_setup_channel(struct fast_task_info *task, int sf_server_deal_setup_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel, int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response) SFResponseInfo *response)
{ {
int result; int result;
FSProtoSetupChannelReq *req; SFProtoSetupChannelReq *req;
FSProtoSetupChannelResp *resp; SFProtoSetupChannelResp *resp;
uint32_t channel_id; uint32_t channel_id;
int key; int key;
response->header.cmd = FS_SERVICE_PROTO_SETUP_CHANNEL_RESP; response->header.cmd = SF_SERVICE_PROTO_SETUP_CHANNEL_RESP;
if ((result=sf_server_expect_body_length(response, if ((result=sf_server_expect_body_length(response,
SF_TASK_BODY_LENGTH(task), SF_TASK_BODY_LENGTH(task),
sizeof(FSProtoSetupChannelReq))) != 0) sizeof(SFProtoSetupChannelReq))) != 0)
{ {
return result; return result;
} }
req = (FSProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); req = (SFProtoSetupChannelReq *)(task->data + sizeof(SFCommonProtoHeader));
channel_id = buff2int(req->channel_id); channel_id = buff2int(req->channel_id);
key = buff2int(req->key); key = buff2int(req->key);
if (*channel != NULL) { if (*channel != NULL) {
@ -59,29 +59,29 @@ int service_deal_setup_channel(struct fast_task_info *task,
return ENOMEM; return ENOMEM;
} }
*task_type = FS_SERVER_TASK_TYPE_CHANNEL_HOLDER; *task_type = SF_SERVER_TASK_TYPE_CHANNEL_HOLDER;
resp = (FSProtoSetupChannelResp *)(task->data + resp = (SFProtoSetupChannelResp *)(task->data +
sizeof(SFCommonProtoHeader)); sizeof(SFCommonProtoHeader));
int2buff((*channel)->id, resp->channel_id); int2buff((*channel)->id, resp->channel_id);
int2buff((*channel)->key, resp->key); int2buff((*channel)->key, resp->key);
response->header.body_len = sizeof(FSProtoSetupChannelResp); response->header.body_len = sizeof(SFProtoSetupChannelResp);
//TASK_ARG->context.response_done = true; //TASK_ARG->context.response_done = true;
return 0; return 0;
} }
/* static int check_holder_channel(const int task_type,
static int check_holder_channel(struct fast_task_info *task) IdempotencyChannel *channel, SFResponseInfo *response)
{ {
if (SERVER_TASK_TYPE != FS_SERVER_TASK_TYPE_CHANNEL_HOLDER) { if (task_type != SF_SERVER_TASK_TYPE_CHANNEL_HOLDER) {
RESPONSE.error.length = sprintf(RESPONSE.error.message, response->error.length = sprintf(response->error.message,
"unexpect task type: %d", SERVER_TASK_TYPE); "unexpect task type: %d", task_type);
return EINVAL; return EINVAL;
} }
if (*channel == NULL) { if (channel == NULL) {
RESPONSE.error.length = sprintf( response->error.length = sprintf(
RESPONSE.error.message, response->error.message,
"channel not exist"); "channel not exist");
return SF_RETRIABLE_ERROR_NO_CHANNEL; return SF_RETRIABLE_ERROR_NO_CHANNEL;
} }
@ -89,21 +89,25 @@ static int check_holder_channel(struct fast_task_info *task)
return 0; return 0;
} }
int service_deal_close_channel(struct fast_task_info *task) int sf_server_deal_close_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response)
{ {
int result; int result;
if ((result=check_holder_channel(task)) != 0) { if ((result=check_holder_channel(*task_type, *channel, response)) != 0) {
return result; return result;
} }
RESPONSE.header.cmd = FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP;
idempotency_channel_free(*channel); idempotency_channel_free(*channel);
*channel = NULL; *channel = NULL;
*task_type = FS_SERVER_TASK_TYPE_NONE; *task_type = SF_SERVER_TASK_TYPE_NONE;
response->header.cmd = SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP;
return 0; return 0;
} }
int service_deal_report_req_receipt(struct fast_task_info *task) int sf_server_deal_report_req_receipt(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response)
{ {
int result; int result;
int count; int count;
@ -111,35 +115,35 @@ int service_deal_report_req_receipt(struct fast_task_info *task)
int body_len; int body_len;
int calc_body_len; int calc_body_len;
int64_t req_id; int64_t req_id;
FSProtoReportReqReceiptHeader *body_header; SFProtoReportReqReceiptHeader *body_header;
FSProtoReportReqReceiptBody *body_part; SFProtoReportReqReceiptBody *body_part;
FSProtoReportReqReceiptBody *body_end; SFProtoReportReqReceiptBody *body_end;
if ((result=check_holder_channel(task)) != 0) { if ((result=check_holder_channel(*task_type, *channel, response)) != 0) {
return result; return result;
} }
body_len = SF_TASK_BODY_LENGTH(task); body_len = SF_TASK_BODY_LENGTH(task);
if ((result=sf_server_check_min_body_length(response, body_len, if ((result=sf_server_check_min_body_length(response, body_len,
sizeof(FSProtoReportReqReceiptHeader))) != 0) sizeof(SFProtoReportReqReceiptHeader))) != 0)
{ {
return result; return result;
} }
body_header = (FSProtoReportReqReceiptHeader *) body_header = (SFProtoReportReqReceiptHeader *)
(task->data + sizeof(SFCommonProtoHeader)); (task->data + sizeof(SFCommonProtoHeader));
count = buff2int(body_header->count); count = buff2int(body_header->count);
calc_body_len = sizeof(FSProtoReportReqReceiptHeader) + calc_body_len = sizeof(SFProtoReportReqReceiptHeader) +
sizeof(FSProtoReportReqReceiptBody) * count; sizeof(SFProtoReportReqReceiptBody) * count;
if (body_len != calc_body_len) { if (body_len != calc_body_len) {
RESPONSE.error.length = sprintf(RESPONSE.error.message, response->error.length = sprintf(response->error.message,
"body length: %d != calculated body length: %d", "body length: %d != calculated body length: %d",
body_len, calc_body_len); body_len, calc_body_len);
return EINVAL; return EINVAL;
} }
success = 0; success = 0;
body_part = (FSProtoReportReqReceiptBody *)(body_header + 1); body_part = (SFProtoReportReqReceiptBody *)(body_header + 1);
body_end = body_part + count; body_end = body_part + count;
for (; body_part < body_end; body_part++) { for (; body_part < body_end; body_part++) {
req_id = buff2long(body_part->req_id); req_id = buff2long(body_part->req_id);
@ -150,8 +154,6 @@ int service_deal_report_req_receipt(struct fast_task_info *task)
logInfo("receipt count: %d, success: %d", count, success); logInfo("receipt count: %d, success: %d", count, success);
RESPONSE.header.cmd = FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP;
return 0; return 0;
} }
*/

View File

@ -1,7 +1,7 @@
//server_handler.h //server_handler.h
#ifndef _FS_IDEMPOTENCY_SERVER_HANDLER_H #ifndef _SF_IDEMPOTENCY_SERVER_HANDLER_H
#define _FS_IDEMPOTENCY_SERVER_HANDLER_H #define _SF_IDEMPOTENCY_SERVER_HANDLER_H
#include "server_types.h" #include "server_types.h"
@ -9,6 +9,18 @@
extern "C" { extern "C" {
#endif #endif
int sf_server_deal_setup_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response);
int sf_server_deal_close_channel(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response);
int sf_server_deal_report_req_receipt(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -8,33 +8,196 @@ int sf_proto_set_body_length(struct fast_task_info *task)
SFCommonProtoHeader *header; SFCommonProtoHeader *header;
header = (SFCommonProtoHeader *)task->data; header = (SFCommonProtoHeader *)task->data;
if (!FS_PROTO_CHECK_MAGIC(header->magic)) { if (!SF_PROTO_CHECK_MAGIC(header->magic)) {
logError("file: "__FILE__", line: %d, " logError("file: "__FILE__", line: %d, "
"client ip: %s, magic "FS_PROTO_MAGIC_FORMAT "client ip: %s, magic "SF_PROTO_MAGIC_FORMAT
" is invalid, expect: "FS_PROTO_MAGIC_FORMAT, " is invalid, expect: "SF_PROTO_MAGIC_FORMAT,
__LINE__, task->client_ip, __LINE__, task->client_ip,
FS_PROTO_MAGIC_PARAMS(header->magic), SF_PROTO_MAGIC_PARAMS(header->magic),
FS_PROTO_MAGIC_EXPECT_PARAMS); SF_PROTO_MAGIC_EXPECT_PARAMS);
return EINVAL; return EINVAL;
} }
task->length = buff2int(header->body_len); //set body length task->length = buff2int(header->body_len); //set body length
return 0; return 0;
} }
int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd)
{
int result;
if (response->header.status == 0) {
if (response->header.cmd != expect_cmd) {
response->error.length = sprintf(
response->error.message,
"response cmd: %d != expect: %d",
response->header.cmd, expect_cmd);
return EINVAL;
}
return 0;
}
if (response->header.body_len > 0) {
int recv_bytes;
if (response->header.body_len >= sizeof(response->error.message)) {
response->error.length = sizeof(response->error.message) - 1;
} else {
response->error.length = response->header.body_len;
}
if ((result=tcprecvdata_nb_ex(conn->sock, response->error.message,
response->error.length, network_timeout, &recv_bytes)) == 0)
{
response->error.message[response->error.length] = '\0';
} else {
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv error message fail, "
"recv bytes: %d, expect message length: %d, "
"errno: %d, error info: %s", recv_bytes,
response->error.length, result, STRERROR(result));
}
} else {
response->error.length = 0;
response->error.message[0] = '\0';
}
return response->header.status;
}
int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data,
const int len, SFResponseInfo *response, const int network_timeout)
{
int result;
SFCommonProtoHeader header_proto;
if ((result=tcpsenddata_nb(conn->sock, data, len, network_timeout)) != 0) {
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"send data fail, errno: %d, error info: %s",
result, STRERROR(result));
return result;
}
if ((result=tcprecvdata_nb(conn->sock, &header_proto,
sizeof(SFCommonProtoHeader), network_timeout)) != 0)
{
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv data fail, errno: %d, error info: %s",
result, STRERROR(result));
return result;
}
sf_proto_extract_header(&header_proto, &response->header);
return 0;
}
int sf_send_and_recv_response(ConnectionInfo *conn, char *send_data,
const int send_len, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd,
char *recv_data, const int expect_body_len)
{
int result;
int recv_bytes;
if ((result=sf_send_and_check_response_header(conn,
send_data, send_len, response,
network_timeout, expect_cmd)) != 0)
{
return result;
}
if (response->header.body_len != expect_body_len) {
response->error.length = sprintf(response->error.message,
"response body length: %d != %d",
response->header.body_len,
expect_body_len);
return EINVAL;
}
if (expect_body_len == 0) {
return 0;
}
if ((result=tcprecvdata_nb_ex(conn->sock, recv_data,
expect_body_len, network_timeout, &recv_bytes)) != 0)
{
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv body fail, recv bytes: %d, expect body length: %d, "
"errno: %d, error info: %s", recv_bytes,
response->header.body_len,
result, STRERROR(result));
}
return result;
}
int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd,
char *recv_data, const int expect_body_len)
{
int result;
int recv_bytes;
SFCommonProtoHeader header_proto;
if ((result=tcprecvdata_nb(conn->sock, &header_proto,
sizeof(SFCommonProtoHeader), network_timeout)) != 0)
{
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv data fail, errno: %d, error info: %s",
result, STRERROR(result));
return result;
}
sf_proto_extract_header(&header_proto, &response->header);
if ((result=sf_check_response(conn, response, network_timeout,
expect_cmd)) != 0)
{
return result;
}
if (response->header.body_len != expect_body_len) {
response->error.length = sprintf(response->error.message,
"response body length: %d != %d",
response->header.body_len,
expect_body_len);
return EINVAL;
}
if (expect_body_len == 0) {
return 0;
}
if ((result=tcprecvdata_nb_ex(conn->sock, recv_data,
expect_body_len, network_timeout, &recv_bytes)) != 0)
{
response->error.length = snprintf(response->error.message,
sizeof(response->error.message),
"recv body fail, recv bytes: %d, expect body length: %d, "
"errno: %d, error info: %s", recv_bytes,
response->header.body_len,
result, STRERROR(result));
}
return result;
}
const char *sf_get_cmd_caption(const int cmd) const char *sf_get_cmd_caption(const int cmd)
{ {
switch (cmd) { switch (cmd) {
case FS_SERVICE_PROTO_SETUP_CHANNEL_REQ: case SF_SERVICE_PROTO_SETUP_CHANNEL_REQ:
return "SETUP_CHANNEL_REQ"; return "SETUP_CHANNEL_REQ";
case FS_SERVICE_PROTO_SETUP_CHANNEL_RESP: case SF_SERVICE_PROTO_SETUP_CHANNEL_RESP:
return "SETUP_CHANNEL_RESP"; return "SETUP_CHANNEL_RESP";
case FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ: case SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ:
return "CLOSE_CHANNEL_REQ"; return "CLOSE_CHANNEL_REQ";
case FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP: case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP:
return "CLOSE_CHANNEL_RESP"; return "CLOSE_CHANNEL_RESP";
case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ: case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ:
return "REPORT_REQ_RECEIPT_REQ"; return "REPORT_REQ_RECEIPT_REQ";
case FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP:
return "REPORT_REQ_RECEIPT_RESP"; return "REPORT_REQ_RECEIPT_RESP";
default: default:
return "UNKOWN"; return "UNKOWN";

View File

@ -1,7 +1,7 @@
//sf_proto.h //sf_proto.h
#ifndef _FS_IDEMPOTENCY_PROTO_H #ifndef _SF_IDEMPOTENCY_PROTO_H
#define _FS_IDEMPOTENCY_PROTO_H #define _SF_IDEMPOTENCY_PROTO_H
#include "fastcommon/fast_task_queue.h" #include "fastcommon/fast_task_queue.h"
#include "fastcommon/shared_func.h" #include "fastcommon/shared_func.h"
@ -11,38 +11,38 @@
#include "sf_types.h" #include "sf_types.h"
//for request idempotency //for request idempotency
#define FS_SERVICE_PROTO_SETUP_CHANNEL_REQ 111 #define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 111
#define FS_SERVICE_PROTO_SETUP_CHANNEL_RESP 112 #define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 112
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113 #define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 113
#define FS_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114 #define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 114
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115 #define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 115
#define FS_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116 #define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 116
#define FS_PROTO_MAGIC_CHAR '@' #define SF_PROTO_MAGIC_CHAR '@'
#define FS_PROTO_SET_MAGIC(m) \ #define SF_PROTO_SET_MAGIC(m) \
m[0] = m[1] = m[2] = m[3] = FS_PROTO_MAGIC_CHAR m[0] = m[1] = m[2] = m[3] = SF_PROTO_MAGIC_CHAR
#define FS_PROTO_CHECK_MAGIC(m) \ #define SF_PROTO_CHECK_MAGIC(m) \
(m[0] == FS_PROTO_MAGIC_CHAR && m[1] == FS_PROTO_MAGIC_CHAR && \ (m[0] == SF_PROTO_MAGIC_CHAR && m[1] == SF_PROTO_MAGIC_CHAR && \
m[2] == FS_PROTO_MAGIC_CHAR && m[3] == FS_PROTO_MAGIC_CHAR) m[2] == SF_PROTO_MAGIC_CHAR && m[3] == SF_PROTO_MAGIC_CHAR)
#define FS_PROTO_MAGIC_FORMAT "0x%02X%02X%02X%02X" #define SF_PROTO_MAGIC_FORMAT "0x%02X%02X%02X%02X"
#define FS_PROTO_MAGIC_EXPECT_PARAMS \ #define SF_PROTO_MAGIC_EXPECT_PARAMS \
FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR, \ SF_PROTO_MAGIC_CHAR, SF_PROTO_MAGIC_CHAR, \
FS_PROTO_MAGIC_CHAR, FS_PROTO_MAGIC_CHAR SF_PROTO_MAGIC_CHAR, SF_PROTO_MAGIC_CHAR
#define FS_PROTO_MAGIC_PARAMS(m) \ #define SF_PROTO_MAGIC_PARAMS(m) \
m[0], m[1], m[2], m[3] m[0], m[1], m[2], m[3]
#define FS_PROTO_SET_HEADER(header, _cmd, _body_len) \ #define SF_PROTO_SET_HEADER(header, _cmd, _body_len) \
do { \ do { \
FS_PROTO_SET_MAGIC((header)->magic); \ SF_PROTO_SET_MAGIC((header)->magic); \
(header)->cmd = _cmd; \ (header)->cmd = _cmd; \
(header)->status[0] = (header)->status[1] = 0; \ (header)->status[0] = (header)->status[1] = 0; \
int2buff(_body_len, (header)->body_len); \ int2buff(_body_len, (header)->body_len); \
} while (0) } while (0)
#define FS_PROTO_SET_RESPONSE_HEADER(proto_header, resp_header) \ #define SF_PROTO_SET_RESPONSE_HEADER(proto_header, resp_header) \
do { \ do { \
(proto_header)->cmd = (resp_header).cmd; \ (proto_header)->cmd = (resp_header).cmd; \
short2buff((resp_header).status, (proto_header)->status); \ short2buff((resp_header).status, (proto_header)->status); \
@ -59,24 +59,24 @@ typedef struct sf_common_proto_header {
char padding[3]; char padding[3];
} SFCommonProtoHeader; } SFCommonProtoHeader;
typedef struct fs_proto_setup_channel_req { typedef struct sf_proto_setup_channel_req {
char channel_id[4]; //for hint char channel_id[4]; //for hint
char key[4]; //for validate when channel_id > 0 char key[4]; //for validate when channel_id > 0
} FSProtoSetupChannelReq; } SFProtoSetupChannelReq;
typedef struct fs_proto_setup_channel_resp { typedef struct sf_proto_setup_channel_resp {
char channel_id[4]; char channel_id[4];
char key[4]; char key[4];
} FSProtoSetupChannelResp; } SFProtoSetupChannelResp;
typedef struct fs_proto_report_req_receipt_header { typedef struct sf_proto_report_req_receipt_header {
char count[4]; char count[4];
char padding[4]; char padding[4];
} FSProtoReportReqReceiptHeader; } SFProtoReportReqReceiptHeader;
typedef struct fs_proto_report_req_receipt_body { typedef struct sf_proto_report_req_receipt_body {
char req_id[8]; char req_id[8];
} FSProtoReportReqReceiptBody; } SFProtoReportReqReceiptBody;
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -163,6 +163,63 @@ static inline int sf_server_check_body_length(
body_length, max_body_length); body_length, max_body_length);
} }
int sf_check_response(ConnectionInfo *conn, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd);
int sf_recv_response(ConnectionInfo *conn, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd,
char *recv_data, const int expect_body_len);
int sf_send_and_recv_response_header(ConnectionInfo *conn, char *data,
const int len, SFResponseInfo *response, const int network_timeout);
static inline int sf_send_and_check_response_header(ConnectionInfo *conn,
char *data, const int len, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd)
{
int result;
if ((result=sf_send_and_recv_response_header(conn, data, len,
response, network_timeout)) != 0)
{
return result;
}
if ((result=sf_check_response(conn, response, network_timeout,
expect_cmd)) != 0)
{
return result;
}
return 0;
}
int sf_send_and_recv_response(ConnectionInfo *conn, char *send_data,
const int send_len, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd,
char *recv_data, const int expect_body_len);
static inline int sf_send_and_recv_none_body_response(ConnectionInfo *conn,
char *send_data, const int send_len, SFResponseInfo *response,
const int network_timeout, const unsigned char expect_cmd)
{
char *recv_data = NULL;
const int expect_body_len = 0;
return sf_send_and_recv_response(conn, send_data, send_len, response,
network_timeout, expect_cmd, recv_data, expect_body_len);
}
static inline void sf_proto_extract_header(SFCommonProtoHeader *header_proto,
SFHeaderInfo *header_info)
{
header_info->cmd = header_proto->cmd;
header_info->body_len = buff2int(header_proto->body_len);
header_info->flags = buff2short(header_proto->flags);
header_info->status = buff2short(header_proto->status);
}
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -11,9 +11,11 @@
#include "fastcommon/connection_pool.h" #include "fastcommon/connection_pool.h"
#include "fastcommon/fast_task_queue.h" #include "fastcommon/fast_task_queue.h"
#define FS_ERROR_INFO_SIZE 256 #define SF_ERROR_INFO_SIZE 256
#define FS_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define FS_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency #define SF_SERVER_TASK_TYPE_NONE 0
#define SF_SERVER_TASK_TYPE_CHANNEL_HOLDER 101 //for request idempotency
#define SF_SERVER_TASK_TYPE_CHANNEL_USER 102 //for request idempotency
typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask, typedef void (*sf_accept_done_callback)(struct fast_task_info *pTask,
const bool bInnerPort); const bool bInnerPort);
@ -61,7 +63,7 @@ typedef struct {
SFHeaderInfo header; SFHeaderInfo header;
struct { struct {
int length; int length;
char message[FS_ERROR_INFO_SIZE]; char message[SF_ERROR_INFO_SIZE];
} error; } error;
} SFResponseInfo; } SFResponseInfo;