idempotency request: user defined response size

connection_manager
YuQing 2020-09-17 21:33:43 +08:00
parent de20796666
commit 451f6da578
7 changed files with 76 additions and 5 deletions

View File

@ -35,7 +35,9 @@ int idempotency_request_htable_add(IdempotencyRequestHTable *htable,
current = *bucket;
while (current != NULL) {
if (current->req_id == request->req_id) {
request->output = current->output;
request->output.result = current->output.result;
memcpy(request->output.response, current->output.response,
request->output.rsize);
request->finished = current->finished;
result = EEXIST;
break;

View File

@ -287,3 +287,23 @@ void idempotency_channel_free(IdempotencyChannel *channel)
channel_context.timeout_ctx.reserve_interval;
}
}
int idempotency_request_alloc_init(void *element, void *args)
{
static int i = 0;
IdempotencyRequest *request;
request = (IdempotencyRequest *)element;
request->allocator = (struct fast_mblock_man *)args;
request->output.rsize = request->allocator->info.element_size -
sizeof(IdempotencyRequest);
request->output.response = request + 1;
if (++i % 100 == 0) {
logInfo("i: %d, element_size: %d, rsize: %d", i,
request->allocator->info.element_size,
request->output.rsize);
}
return 0;
}

View File

@ -39,6 +39,8 @@ extern "C" {
&channel->request_htable, req_id);
}
int idempotency_request_alloc_init(void *element, void *args);
#ifdef __cplusplus
}
#endif

View File

@ -157,3 +157,38 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task,
response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP;
return 0;
}
IdempotencyRequest *sf_server_update_prepare_and_check(
struct fast_task_info *task, struct fast_mblock_man *
request_allocator, IdempotencyChannel *channel,
SFResponseInfo *response, int *result)
{
SFProtoIdempotencyAdditionalHeader *adheader;
IdempotencyRequest *request;
if (!__sync_add_and_fetch(&channel->is_valid, 0)) {
response->error.length = sprintf(response->error.message,
"channel: %d is invalid", channel->id);
*result = SF_RETRIABLE_ERROR_CHANNEL_INVALID;
return NULL;
}
adheader = (SFProtoIdempotencyAdditionalHeader *)
(task->data + sizeof(SFCommonProtoHeader));
request = (IdempotencyRequest *)fast_mblock_alloc_object(request_allocator);
if (request == NULL) {
*result = ENOMEM;
return NULL;
}
request->finished = false;
request->req_id = buff2long(adheader->req_id);
*result = idempotency_channel_add_request(channel, request);
if (*result == EEXIST) {
if (!request->finished) {
*result = EAGAIN;
}
}
return request;
}

View File

@ -21,6 +21,11 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task,
int *task_type, IdempotencyChannel **channel,
SFResponseInfo *response);
IdempotencyRequest *sf_server_update_prepare_and_check(
struct fast_task_info *task, struct fast_mblock_man *
request_allocator, IdempotencyChannel *channel,
SFResponseInfo *response, int *result);
#ifdef __cplusplus
}
#endif

View File

@ -5,14 +5,17 @@
#include "fastcommon/fast_mblock.h"
#include "fastcommon/fast_timer.h"
typedef struct idempotency_request_result {
int rsize; //response size defined by application
int result;
void *response;
} IdempotencyRequestResult;
typedef struct idempotency_request {
uint64_t req_id;
volatile int ref_count;
bool finished;
struct {
int result;
int inc_alloc;
} output;
IdempotencyRequestResult output;
struct fast_mblock_man *allocator; //for free
struct idempotency_request *next;
} IdempotencyRequest;

View File

@ -64,6 +64,10 @@ typedef struct sf_common_proto_header {
char padding[3];
} SFCommonProtoHeader;
typedef struct sf_proto_idempotency_additional_header {
char req_id[8];
} SFProtoIdempotencyAdditionalHeader;
typedef struct sf_proto_setup_channel_req {
char channel_id[4]; //for hint
char key[4]; //for validate when channel_id > 0