diff --git a/src/idempotency/client/rpc_wrapper.h b/src/idempotency/client/rpc_wrapper.h index 726bb4c..d1a3cc1 100644 --- a/src/idempotency/client/rpc_wrapper.h +++ b/src/idempotency/client/rpc_wrapper.h @@ -24,6 +24,7 @@ ConnectionInfo *conn; \ IdempotencyClientChannel *old_channel; \ int result; \ + int conn_result; \ int i; \ bool idempotency_enabled; \ uint64_t req_id; \ @@ -70,11 +71,21 @@ } \ } \ \ + conn_result = result; \ if (result == SF_RETRIABLE_ERROR_CHANNEL_INVALID && \ idempotency_enabled) \ { \ - idempotency_client_channel_check_reconnect( \ - connection_params->channel); \ + if (idempotency_client_channel_check_wait( \ + connection_params->channel) == 0) \ + { \ + if ((conn_result=sf_proto_rebind_idempotency_channel( \ + conn, connection_params->channel->id, \ + connection_params->channel->key, \ + client_ctx->network_timeout)) == 0) \ + { \ + continue; \ + } \ + } \ } \ \ SF_NET_RETRY_CHECK_AND_SLEEP(net_retry_ctx, client_ctx-> \ @@ -84,7 +95,7 @@ "net retry result: %d, retry count: %d", \ __LINE__, __FUNCTION__, result, i); \ */ \ - SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result); \ + SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, conn_result); \ if ((conn=GET_MASTER_CONNECTION(client_ctx, \ get_conn_arg1, &result)) == NULL) \ { \ diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index 3f32c38..68a338a 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -207,3 +207,49 @@ IdempotencyRequest *sf_server_update_prepare_and_check( return request; } + +int sf_server_deal_rebind_channel(struct fast_task_info *task, + int *server_task_type, IdempotencyChannel **channel, + SFResponseInfo *response) +{ + int result; + uint32_t channel_id; + int key; + SFProtoRebindChannelReq *req; + + if ((result=sf_server_expect_body_length(response, + SF_TASK_BODY_LENGTH(task), + sizeof(SFProtoRebindChannelReq))) != 0) + { + return result; + } + + if (*server_task_type != SF_SERVER_TASK_TYPE_CHANNEL_USER) { + response->error.length = sprintf(response->error.message, + "invalid task type: %d != %d", *server_task_type, + SF_SERVER_TASK_TYPE_CHANNEL_USER); + return EINVAL; + } + + if (*channel == NULL) { + response->error.length = sprintf(response->error.message, + "no channel binded"); + return EINVAL; + } + idempotency_channel_release(*channel, false); + + req = (SFProtoRebindChannelReq *)(task->data + sizeof(SFCommonProtoHeader)); + channel_id = buff2int(req->channel_id); + key = buff2int(req->key); + *channel = idempotency_channel_find_and_hold(channel_id, key, &result); + if (*channel == NULL) { + response->error.length = sprintf(response->error.message, + "find channel fail, channel id: %d, result: %d", + channel_id, result); + *server_task_type = SF_SERVER_TASK_TYPE_NONE; + return SF_RETRIABLE_ERROR_NO_CHANNEL; + } + + response->header.cmd = SF_SERVICE_PROTO_REBIND_CHANNEL_RESP; + return 0; +} diff --git a/src/idempotency/server/server_handler.h b/src/idempotency/server/server_handler.h index c63267c..73d8251 100644 --- a/src/idempotency/server/server_handler.h +++ b/src/idempotency/server/server_handler.h @@ -41,6 +41,10 @@ IdempotencyRequest *sf_server_update_prepare_and_check( request_allocator, IdempotencyChannel *channel, SFResponseInfo *response, int *result); +int sf_server_deal_rebind_channel(struct fast_task_info *task, + int *server_task_type, IdempotencyChannel **channel, + SFResponseInfo *response); + #ifdef __cplusplus } #endif diff --git a/src/sf_proto.c b/src/sf_proto.c index f6e4107..4298c7a 100644 --- a/src/sf_proto.c +++ b/src/sf_proto.c @@ -288,6 +288,10 @@ const char *sf_get_cmd_caption(const int cmd) return "CLOSE_CHANNEL_REQ"; case SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP: return "CLOSE_CHANNEL_RESP"; + case SF_SERVICE_PROTO_REBIND_CHANNEL_REQ: + return "REBIND_CHANNEL_REQ"; + case SF_SERVICE_PROTO_REBIND_CHANNEL_RESP: + return "REBIND_CHANNEL_RESP"; case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ: return "REPORT_REQ_RECEIPT_REQ"; case SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP: @@ -333,3 +337,30 @@ int sf_proto_deal_ack(struct fast_task_info *task, return 0; } + +int sf_proto_rebind_idempotency_channel(ConnectionInfo *conn, + const uint32_t channel_id, const int key, const int network_timeout) +{ + char out_buff[sizeof(SFCommonProtoHeader) + + sizeof(SFProtoRebindChannelReq)]; + SFCommonProtoHeader *header; + SFProtoRebindChannelReq *req; + SFResponseInfo response; + int result; + + header = (SFCommonProtoHeader *)out_buff; + req = (SFProtoRebindChannelReq *)(header + 1); + int2buff(channel_id, req->channel_id); + int2buff(key, req->key); + SF_PROTO_SET_HEADER(header, SF_SERVICE_PROTO_REBIND_CHANNEL_REQ, + sizeof(SFProtoRebindChannelReq)); + response.error.length = 0; + if ((result=sf_send_and_recv_none_body_response(conn, out_buff, + sizeof(out_buff), &response, network_timeout, + SF_SERVICE_PROTO_REBIND_CHANNEL_RESP)) != 0) + { + sf_log_network_error(&response, conn, result); + } + + return result; +} diff --git a/src/sf_proto.h b/src/sf_proto.h index 338d2be..e0590e9 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -32,10 +32,12 @@ #define SF_PROTO_ACTIVE_TEST_RESP 118 //for request idempotency -#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 121 -#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 122 -#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 123 -#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 124 +#define SF_SERVICE_PROTO_SETUP_CHANNEL_REQ 119 +#define SF_SERVICE_PROTO_SETUP_CHANNEL_RESP 120 +#define SF_SERVICE_PROTO_CLOSE_CHANNEL_REQ 121 +#define SF_SERVICE_PROTO_CLOSE_CHANNEL_RESP 122 +#define SF_SERVICE_PROTO_REBIND_CHANNEL_REQ 123 +#define SF_SERVICE_PROTO_REBIND_CHANNEL_RESP 124 #define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_REQ 125 #define SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP 126 @@ -102,6 +104,11 @@ typedef struct sf_proto_setup_channel_resp { char padding[4]; } SFProtoSetupChannelResp; +typedef struct sf_proto_rebind_channel_req { + char channel_id[4]; + char key[4]; +} SFProtoRebindChannelReq; + typedef struct sf_proto_report_req_receipt_header { char count[4]; char padding[4]; @@ -297,6 +304,9 @@ static inline int sf_proto_deal_active_test(struct fast_task_info *task, int sf_proto_deal_ack(struct fast_task_info *task, SFRequestInfo *request, SFResponseInfo *response); +int sf_proto_rebind_idempotency_channel(ConnectionInfo *conn, + const uint32_t channel_id, const int key, const int network_timeout); + #define SF_CLIENT_RELEASE_CONNECTION(client_ctx, conn, result) \ do { \ if (SF_FORCE_CLOSE_CONNECTION_ERROR(result)) { \