diff --git a/src/sf_nio.c b/src/sf_nio.c index b307ed4..1ea93a8 100644 --- a/src/sf_nio.c +++ b/src/sf_nio.c @@ -41,8 +41,6 @@ #include "sf_service.h" #include "sf_nio.h" -#define SF_CTX ((SFContext *)(task->ctx)) - void sf_set_parameters_ex(SFContext *sf_context, const int header_size, sf_set_body_length_callback set_body_length_func, sf_alloc_recv_buffer_callback alloc_recv_buffer_func, @@ -480,6 +478,7 @@ int sf_client_sock_read(int sock, short event, void *arg) int bytes; int recv_bytes; int total_read; + bool new_alloc; struct fast_task_info *task; task = (struct fast_task_info *)arg; @@ -530,8 +529,12 @@ int sf_client_sock_read(int sock, short event, void *arg) bytes = read(sock, task->data + task->offset, recv_bytes); } else { recv_bytes = task->length - task->offset; - bytes = read(sock, task->recv_body + (task->offset - - SF_CTX->header_size), recv_bytes); + if (task->recv_body == NULL) { + bytes = read(sock, task->data + task->offset, recv_bytes); + } else { + bytes = read(sock, task->recv_body + (task->offset - + SF_CTX->header_size), recv_bytes); + } } if (bytes < 0) { @@ -614,11 +617,18 @@ int sf_client_sock_read(int sock, short event, void *arg) } if (SF_CTX->alloc_recv_buffer != NULL) { - if ((task->recv_body=SF_CTX->alloc_recv_buffer(task)) == NULL) { + task->recv_body = SF_CTX->alloc_recv_buffer(task, + task->length - SF_CTX->header_size, &new_alloc); + if (new_alloc && task->recv_body == NULL) { + logInfo("recv_body is NULL!!!!!!!!!!!!!!!!!"); ioevent_add_to_deleted_list(task); return -1; } } else { + new_alloc = false; + } + + if (!new_alloc) { if (task->length > task->size) { int old_size; @@ -649,8 +659,6 @@ int sf_client_sock_read(int sock, short event, void *arg) "size from %d to %d", __LINE__, task->client_ip, task->length, old_size, task->size); } - - task->recv_body = task->data + SF_CTX->header_size; } } diff --git a/src/sf_nio.h b/src/sf_nio.h index 9863e38..fb4b06e 100644 --- a/src/sf_nio.h +++ b/src/sf_nio.h @@ -26,6 +26,8 @@ #include "sf_define.h" #include "sf_types.h" +#define SF_CTX ((SFContext *)(task->ctx)) + #ifdef __cplusplus extern "C" { #endif diff --git a/src/sf_proto.h b/src/sf_proto.h index ac0dcb4..4bb93dc 100644 --- a/src/sf_proto.h +++ b/src/sf_proto.h @@ -293,7 +293,11 @@ static inline void sf_proto_init_task_context(struct fast_task_info *task, ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader); ctx->request.header.status = buff2short(((SFCommonProtoHeader *) task->data)->status); - ctx->request.body = task->data + sizeof(SFCommonProtoHeader); + if (task->recv_body != NULL) { + ctx->request.body = task->recv_body; + } else { + ctx->request.body = task->data + sizeof(SFCommonProtoHeader); + } } static inline void sf_log_network_error_ex1(SFResponseInfo *response, diff --git a/src/sf_shared_mbuffer.c b/src/sf_shared_mbuffer.c index 2e6e691..e5b74ce 100644 --- a/src/sf_shared_mbuffer.c +++ b/src/sf_shared_mbuffer.c @@ -19,7 +19,8 @@ static int sf_shared_mbuffer_alloc_init(void *element, void *args) { SFSharedMBuffer *buffer; - buffer = (SFSharedMBuffer *)element; + buffer = (SFSharedMBuffer *)((char *)element + + sizeof(struct fast_allocator_wrapper)); buffer->buff = (char *)(buffer + 1); buffer->ctx = (SFSharedMBufferContext *)args; return 0; diff --git a/src/sf_shared_mbuffer.h b/src/sf_shared_mbuffer.h index 7ed06bb..abc3f34 100644 --- a/src/sf_shared_mbuffer.h +++ b/src/sf_shared_mbuffer.h @@ -16,9 +16,11 @@ #ifndef _SF_SHARED_MBUFFER_H__ #define _SF_SHARED_MBUFFER_H__ -#include "fastcommon/fast_allocator.h" +#include "fastcommon/fc_list.h" +#include "fastcommon/fast_task_queue.h" #include "fastcommon/shared_func.h" #include "fastcommon/logger.h" +#include "fastcommon/fast_allocator.h" typedef struct sf_shared_mbuffer_context { struct fast_allocator_context allocator; @@ -68,6 +70,10 @@ static inline SFSharedMBuffer *sf_shared_mbuffer_alloc_ex( fc_sleep_ms(sleep_ms); } + logInfo("file: "__FILE__", line: %d, " + "alloc shared buffer: %p, buff: %p", + __LINE__, buffer, buffer->buff); + if (init_reffer_count > 0) { __sync_add_and_fetch(&buffer->reffer_count, init_reffer_count); } @@ -82,14 +88,20 @@ static inline void sf_shared_mbuffer_hold(SFSharedMBuffer *buffer) static inline void sf_shared_mbuffer_release(SFSharedMBuffer *buffer) { if (__sync_sub_and_fetch(&buffer->reffer_count, 1) == 0) { - /* - logDebug("file: "__FILE__", line: %d, " + logInfo("file: "__FILE__", line: %d, " "free shared buffer: %p", __LINE__, buffer); - */ fast_allocator_free(&buffer->ctx->allocator, buffer); } } +static inline void sf_release_task_shared_mbuffer(struct fast_task_info *task) +{ + SFSharedMBuffer *mbuffer; + mbuffer = fc_list_entry(task->recv_body, SFSharedMBuffer, buff); + sf_shared_mbuffer_release(mbuffer); + task->recv_body = NULL; +} + #ifdef __cplusplus } #endif diff --git a/src/sf_types.h b/src/sf_types.h index a7ddfdf..cbaa06b 100644 --- a/src/sf_types.h +++ b/src/sf_types.h @@ -37,7 +37,8 @@ typedef void (*sf_accept_done_callback)(struct fast_task_info *task, const bool bInnerPort); typedef int (*sf_set_body_length_callback)(struct fast_task_info *task); -typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task); +typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task, + const int buff_size, bool *new_alloc); typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage); typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);