diff --git a/src/idempotency/server/request_htable.c b/src/idempotency/server/request_htable.c index 92b0f0a..9005a6a 100644 --- a/src/idempotency/server/request_htable.c +++ b/src/idempotency/server/request_htable.c @@ -35,7 +35,9 @@ int idempotency_request_htable_add(IdempotencyRequestHTable *htable, current = *bucket; while (current != NULL) { if (current->req_id == request->req_id) { - request->output = current->output; + request->output.result = current->output.result; + memcpy(request->output.response, current->output.response, + request->output.rsize); request->finished = current->finished; result = EEXIST; break; diff --git a/src/idempotency/server/server_channel.c b/src/idempotency/server/server_channel.c index d9d3dfc..9cdd462 100644 --- a/src/idempotency/server/server_channel.c +++ b/src/idempotency/server/server_channel.c @@ -287,3 +287,23 @@ void idempotency_channel_free(IdempotencyChannel *channel) channel_context.timeout_ctx.reserve_interval; } } + +int idempotency_request_alloc_init(void *element, void *args) +{ + static int i = 0; + + IdempotencyRequest *request; + request = (IdempotencyRequest *)element; + request->allocator = (struct fast_mblock_man *)args; + request->output.rsize = request->allocator->info.element_size - + sizeof(IdempotencyRequest); + request->output.response = request + 1; + + if (++i % 100 == 0) { + logInfo("i: %d, element_size: %d, rsize: %d", i, + request->allocator->info.element_size, + request->output.rsize); + } + + return 0; +} diff --git a/src/idempotency/server/server_channel.h b/src/idempotency/server/server_channel.h index 9909b37..c00f2bc 100644 --- a/src/idempotency/server/server_channel.h +++ b/src/idempotency/server/server_channel.h @@ -39,6 +39,8 @@ extern "C" { &channel->request_htable, req_id); } + int idempotency_request_alloc_init(void *element, void *args); + #ifdef __cplusplus } #endif diff --git a/src/idempotency/server/server_handler.c b/src/idempotency/server/server_handler.c index 027c76a..9264564 100644 --- a/src/idempotency/server/server_handler.c +++ b/src/idempotency/server/server_handler.c @@ -157,3 +157,38 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, response->header.cmd = SF_SERVICE_PROTO_REPORT_REQ_RECEIPT_RESP; return 0; } + +IdempotencyRequest *sf_server_update_prepare_and_check( + struct fast_task_info *task, struct fast_mblock_man * + request_allocator, IdempotencyChannel *channel, + SFResponseInfo *response, int *result) +{ + SFProtoIdempotencyAdditionalHeader *adheader; + IdempotencyRequest *request; + + if (!__sync_add_and_fetch(&channel->is_valid, 0)) { + response->error.length = sprintf(response->error.message, + "channel: %d is invalid", channel->id); + *result = SF_RETRIABLE_ERROR_CHANNEL_INVALID; + return NULL; + } + + adheader = (SFProtoIdempotencyAdditionalHeader *) + (task->data + sizeof(SFCommonProtoHeader)); + request = (IdempotencyRequest *)fast_mblock_alloc_object(request_allocator); + if (request == NULL) { + *result = ENOMEM; + return NULL; + } + + request->finished = false; + request->req_id = buff2long(adheader->req_id); + *result = idempotency_channel_add_request(channel, request); + if (*result == EEXIST) { + if (!request->finished) { + *result = EAGAIN; + } + } + + return request; +} diff --git a/src/idempotency/server/server_handler.h b/src/idempotency/server/server_handler.h index eec28ca..7956fff 100644 --- a/src/idempotency/server/server_handler.h +++ b/src/idempotency/server/server_handler.h @@ -21,6 +21,11 @@ int sf_server_deal_report_req_receipt(struct fast_task_info *task, int *task_type, IdempotencyChannel **channel, SFResponseInfo *response); +IdempotencyRequest *sf_server_update_prepare_and_check( + struct fast_task_info *task, struct fast_mblock_man * + request_allocator, IdempotencyChannel *channel, + SFResponseInfo *response, int *result); + #ifdef __cplusplus } #endif diff --git a/src/idempotency/server/server_types.h b/src/idempotency/server/server_types.h index efe400f..33f4f01 100644 --- a/src/idempotency/server/server_types.h +++ b/src/idempotency/server/server_types.h @@ -5,14 +5,17 @@ #include "fastcommon/fast_mblock.h" #include "fastcommon/fast_timer.h" +typedef struct idempotency_request_result { + int rsize; //response size defined by application + int result; + void *response; +} IdempotencyRequestResult; + typedef struct idempotency_request { uint64_t req_id; volatile int ref_count; bool finished; - struct { - int result; - int inc_alloc; - } output; + IdempotencyRequestResult output; struct fast_mblock_man *allocator; //for free struct idempotency_request *next; } IdempotencyRequest; diff --git a/src/sf_proto.h b/src/sf_proto.h index 612ce77..320db81 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -64,6 +64,10 @@ typedef struct sf_common_proto_header { char padding[3]; } SFCommonProtoHeader; +typedef struct sf_proto_idempotency_additional_header { + char req_id[8]; +} SFProtoIdempotencyAdditionalHeader; + typedef struct sf_proto_setup_channel_req { char channel_id[4]; //for hint char key[4]; //for validate when channel_id > 0