support rebind idempotency channel

connection_manager
YuQing 2020-12-18 22:27:10 +08:00
parent 61bf945e11
commit f7e76c0a97
5 changed files with 109 additions and 7 deletions

View File

@ -24,6 +24,7 @@
ConnectionInfo *conn; \ ConnectionInfo *conn; \
IdempotencyClientChannel *old_channel; \ IdempotencyClientChannel *old_channel; \
int result; \ int result; \
int conn_result; \
int i; \ int i; \
bool idempotency_enabled; \ bool idempotency_enabled; \
uint64_t req_id; \ uint64_t req_id; \
@ -70,11 +71,21 @@
} \ } \
} \ } \
\ \
conn_result = result; \
if (result == SF_RETRIABLE_ERROR_CHANNEL_INVALID && \ if (result == SF_RETRIABLE_ERROR_CHANNEL_INVALID && \
idempotency_enabled) \ idempotency_enabled) \
{ \ { \
idempotency_client_channel_check_reconnect( \ if (idempotency_client_channel_check_wait( \
connection_params->channel); \ connection_params->channel) == 0) \
{ \
if ((conn_result=sf_proto_rebind_idempotency_channel( \
conn, connection_params->channel->id, \
connection_params->channel->key, \
client_ctx->network_timeout)) == 0) \
{ \
continue; \
} \
} \
} \ } \
\ \
SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \
@ -84,7 +95,7 @@
"net retry result: %d, retry count: %d", \ "net retry result: %d, retry count: %d", \
__LINE__, __FUNCTION__, result, i); \ __LINE__, __FUNCTION__, result, i); \
*/ \ */ \
SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, conn_result); \
if ((conn=GET_MASTER_CONNECTION(client_ctx, \ if ((conn=GET_MASTER_CONNECTION(client_ctx, \
get_conn_arg1, &result)) == NULL) \ get_conn_arg1, &result)) == NULL) \
{ \ { \

View File

@ -207,3 +207,49 @@ IdempotencyRequest *sf_server_update_prepare_and_check(
return request; return request;
} }
int sf_server_deal_rebind_channel(struct fast_task_info *task,
int *server_task_type, IdempotencyChannel **channel,
SFResponseInfo *response)
{
int result;
uint32_t channel_id;
int key;
SFProtoRebindChannelReq *req;
if ((result=sf_server_expect_body_length(response,
SF_TASK_BODY_LENGTH(task),
sizeof(SFProtoRebindChannelReq))) != 0)
{
return result;
}
if (*server_task_type != SF_SERVER_TASK_TYPE_CHANNEL_USER) {
response->error.length = sprintf(response->error.message,
"invalid task type: %d != %d", *server_task_type,
SF_SERVER_TASK_TYPE_CHANNEL_USER);
return EINVAL;
}
if (*channel == NULL) {
response->error.length = sprintf(response->error.message,
"no channel binded");
return EINVAL;
}
idempotency_channel_release(*channel, false);
req = (SFProtoRebindChannelReq *)(task->data + sizeof(SFCommonProtoHeader));
channel_id = buff2int(req->channel_id);
key = buff2int(req->key);
*channel = idempotency_channel_find_and_hold(channel_id, key, &result);
if (*channel == NULL) {
response->error.length = sprintf(response->error.message,
"find channel fail, channel id: %d, result: %d",
channel_id, result);
*server_task_type = SF_SERVER_TASK_TYPE_NONE;
return SF_RETRIABLE_ERROR_NO_CHANNEL;
}
response->header.cmd = SF_SERVICE_PROTO_REBIND_CHANNEL_RESP;
return 0;
}

View File

@ -41,6 +41,10 @@ IdempotencyRequest *sf_server_update_prepare_and_check(
request_allocator, IdempotencyChannel *channel, request_allocator, IdempotencyChannel *channel,
SFResponseInfo *response, int *result); SFResponseInfo *response, int *result);
int sf_server_deal_rebind_channel(struct fast_task_info *task,
int *server_task_type, IdempotencyChannel **channel,
SFResponseInfo *response);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -288,6 +288,10 @@ const char *sf_get_cmd_caption(const int cmd)
return "CLOSE_CHANNEL_REQ"; return "CLOSE_CHANNEL_REQ";
case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP:
return "CLOSE_CHANNEL_RESP"; return "CLOSE_CHANNEL_RESP";
case SF_SERVICE_PROTO_REBIND_CHANNEL_REQ:
return "REBIND_CHANNEL_REQ";
case SF_SERVICE_PROTO_REBIND_CHANNEL_RESP:
return "REBIND_CHANNEL_RESP";
case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ: case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ:
return "REPORT_REQ_RECEIPT_REQ"; return "REPORT_REQ_RECEIPT_REQ";
case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP:
@ -333,3 +337,30 @@ int sf_proto_deal_ack(struct fast_task_info *task,
return 0; return 0;
} }
int sf_proto_rebind_idempotency_channel(ConnectionInfo *conn,
const uint32_t channel_id, const int key, const int network_timeout)
{
char out_buff[sizeof(SFCommonProtoHeader) +
sizeof(SFProtoRebindChannelReq)];
SFCommonProtoHeader *header;
SFProtoRebindChannelReq *req;
SFResponseInfo response;
int result;
header = (SFCommonProtoHeader *)out_buff;
req = (SFProtoRebindChannelReq *)(header + 1);
int2buff(channel_id, req->channel_id);
int2buff(key, req->key);
SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_REBIND_CHANNEL_REQ,
sizeof(SFProtoRebindChannelReq));
response.error.length = 0;
if ((result=sf_send_and_recv_none_body_response(conn, out_buff,
sizeof(out_buff), &response, network_timeout,
SF_SERVICE_PROTO_REBIND_CHANNEL_RESP)) != 0)
{
sf_log_network_error(&response, conn, result);
}
return result;
}

View File

@ -32,10 +32,12 @@
#define SF_PROTO_ACTIVE_TEST_RESP 118 #define SF_PROTO_ACTIVE_TEST_RESP 118
//for request idempotency //for request idempotency
#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 121 #define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 119
#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 122 #define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 120
#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 123 #define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 121
#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 124 #define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 122
#define SF_SERVICE_PROTO_REBIND_CHANNEL_REQ 123
#define SF_SERVICE_PROTO_REBIND_CHANNEL_RESP 124
#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 125 #define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 125
#define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 126 #define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 126
@ -102,6 +104,11 @@ typedef struct sf_proto_setup_channel_resp {
char padding[4]; char padding[4];
} SFProtoSetupChannelResp; } SFProtoSetupChannelResp;
typedef struct sf_proto_rebind_channel_req {
char channel_id[4];
char key[4];
} SFProtoRebindChannelReq;
typedef struct sf_proto_report_req_receipt_header { typedef struct sf_proto_report_req_receipt_header {
char count[4]; char count[4];
char padding[4]; char padding[4];
@ -297,6 +304,9 @@ static inline int sf_proto_deal_active_test(struct fast_task_info *task,
int sf_proto_deal_ack(struct fast_task_info *task, int sf_proto_deal_ack(struct fast_task_info *task,
SFRequestInfo *request, SFResponseInfo *response); SFRequestInfo *request, SFResponseInfo *response);
int sf_proto_rebind_idempotency_channel(ConnectionInfo *conn,
const uint32_t channel_id, const int key, const int network_timeout);
#define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ #define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \
do { \ do { \
if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \ if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \