add function sf_release_task_shared_mbuffer
parent
c27cb2a9af
commit
8b22655352
22
src/sf_nio.c
22
src/sf_nio.c
|
|
@ -41,8 +41,6 @@
|
||||||
#include "sf_service.h"
|
#include "sf_service.h"
|
||||||
#include "sf_nio.h"
|
#include "sf_nio.h"
|
||||||
|
|
||||||
#define SF_CTX ((SFContext *)(task->ctx))
|
|
||||||
|
|
||||||
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
void sf_set_parameters_ex(SFContext *sf_context, const int header_size,
|
||||||
sf_set_body_length_callback set_body_length_func,
|
sf_set_body_length_callback set_body_length_func,
|
||||||
sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
|
sf_alloc_recv_buffer_callback alloc_recv_buffer_func,
|
||||||
|
|
@ -480,6 +478,7 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
int bytes;
|
int bytes;
|
||||||
int recv_bytes;
|
int recv_bytes;
|
||||||
int total_read;
|
int total_read;
|
||||||
|
bool new_alloc;
|
||||||
struct fast_task_info *task;
|
struct fast_task_info *task;
|
||||||
|
|
||||||
task = (struct fast_task_info *)arg;
|
task = (struct fast_task_info *)arg;
|
||||||
|
|
@ -530,8 +529,12 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
bytes = read(sock, task->data + task->offset, recv_bytes);
|
bytes = read(sock, task->data + task->offset, recv_bytes);
|
||||||
} else {
|
} else {
|
||||||
recv_bytes = task->length - task->offset;
|
recv_bytes = task->length - task->offset;
|
||||||
bytes = read(sock, task->recv_body + (task->offset -
|
if (task->recv_body == NULL) {
|
||||||
SF_CTX->header_size), recv_bytes);
|
bytes = read(sock, task->data + task->offset, recv_bytes);
|
||||||
|
} else {
|
||||||
|
bytes = read(sock, task->recv_body + (task->offset -
|
||||||
|
SF_CTX->header_size), recv_bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytes < 0) {
|
if (bytes < 0) {
|
||||||
|
|
@ -614,11 +617,18 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SF_CTX->alloc_recv_buffer != NULL) {
|
if (SF_CTX->alloc_recv_buffer != NULL) {
|
||||||
if ((task->recv_body=SF_CTX->alloc_recv_buffer(task)) == NULL) {
|
task->recv_body = SF_CTX->alloc_recv_buffer(task,
|
||||||
|
task->length - SF_CTX->header_size, &new_alloc);
|
||||||
|
if (new_alloc && task->recv_body == NULL) {
|
||||||
|
logInfo("recv_body is NULL!!!!!!!!!!!!!!!!!");
|
||||||
ioevent_add_to_deleted_list(task);
|
ioevent_add_to_deleted_list(task);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
new_alloc = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!new_alloc) {
|
||||||
if (task->length > task->size) {
|
if (task->length > task->size) {
|
||||||
int old_size;
|
int old_size;
|
||||||
|
|
||||||
|
|
@ -649,8 +659,6 @@ int sf_client_sock_read(int sock, short event, void *arg)
|
||||||
"size from %d to %d", __LINE__, task->client_ip,
|
"size from %d to %d", __LINE__, task->client_ip,
|
||||||
task->length, old_size, task->size);
|
task->length, old_size, task->size);
|
||||||
}
|
}
|
||||||
|
|
||||||
task->recv_body = task->data + SF_CTX->header_size;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,8 @@
|
||||||
#include "sf_define.h"
|
#include "sf_define.h"
|
||||||
#include "sf_types.h"
|
#include "sf_types.h"
|
||||||
|
|
||||||
|
#define SF_CTX ((SFContext *)(task->ctx))
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -293,7 +293,11 @@ static inline void sf_proto_init_task_context(struct fast_task_info *task,
|
||||||
ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader);
|
ctx->request.header.body_len = task->length - sizeof(SFCommonProtoHeader);
|
||||||
ctx->request.header.status = buff2short(((SFCommonProtoHeader *)
|
ctx->request.header.status = buff2short(((SFCommonProtoHeader *)
|
||||||
task->data)->status);
|
task->data)->status);
|
||||||
ctx->request.body = task->data + sizeof(SFCommonProtoHeader);
|
if (task->recv_body != NULL) {
|
||||||
|
ctx->request.body = task->recv_body;
|
||||||
|
} else {
|
||||||
|
ctx->request.body = task->data + sizeof(SFCommonProtoHeader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void sf_log_network_error_ex1(SFResponseInfo *response,
|
static inline void sf_log_network_error_ex1(SFResponseInfo *response,
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,8 @@ static int sf_shared_mbuffer_alloc_init(void *element, void *args)
|
||||||
{
|
{
|
||||||
SFSharedMBuffer *buffer;
|
SFSharedMBuffer *buffer;
|
||||||
|
|
||||||
buffer = (SFSharedMBuffer *)element;
|
buffer = (SFSharedMBuffer *)((char *)element +
|
||||||
|
sizeof(struct fast_allocator_wrapper));
|
||||||
buffer->buff = (char *)(buffer + 1);
|
buffer->buff = (char *)(buffer + 1);
|
||||||
buffer->ctx = (SFSharedMBufferContext *)args;
|
buffer->ctx = (SFSharedMBufferContext *)args;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
||||||
|
|
@ -16,9 +16,11 @@
|
||||||
#ifndef _SF_SHARED_MBUFFER_H__
|
#ifndef _SF_SHARED_MBUFFER_H__
|
||||||
#define _SF_SHARED_MBUFFER_H__
|
#define _SF_SHARED_MBUFFER_H__
|
||||||
|
|
||||||
#include "fastcommon/fast_allocator.h"
|
#include "fastcommon/fc_list.h"
|
||||||
|
#include "fastcommon/fast_task_queue.h"
|
||||||
#include "fastcommon/shared_func.h"
|
#include "fastcommon/shared_func.h"
|
||||||
#include "fastcommon/logger.h"
|
#include "fastcommon/logger.h"
|
||||||
|
#include "fastcommon/fast_allocator.h"
|
||||||
|
|
||||||
typedef struct sf_shared_mbuffer_context {
|
typedef struct sf_shared_mbuffer_context {
|
||||||
struct fast_allocator_context allocator;
|
struct fast_allocator_context allocator;
|
||||||
|
|
@ -68,6 +70,10 @@ static inline SFSharedMBuffer *sf_shared_mbuffer_alloc_ex(
|
||||||
fc_sleep_ms(sleep_ms);
|
fc_sleep_ms(sleep_ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logInfo("file: "__FILE__", line: %d, "
|
||||||
|
"alloc shared buffer: %p, buff: %p",
|
||||||
|
__LINE__, buffer, buffer->buff);
|
||||||
|
|
||||||
if (init_reffer_count > 0) {
|
if (init_reffer_count > 0) {
|
||||||
__sync_add_and_fetch(&buffer->reffer_count, init_reffer_count);
|
__sync_add_and_fetch(&buffer->reffer_count, init_reffer_count);
|
||||||
}
|
}
|
||||||
|
|
@ -82,14 +88,20 @@ static inline void sf_shared_mbuffer_hold(SFSharedMBuffer *buffer)
|
||||||
static inline void sf_shared_mbuffer_release(SFSharedMBuffer *buffer)
|
static inline void sf_shared_mbuffer_release(SFSharedMBuffer *buffer)
|
||||||
{
|
{
|
||||||
if (__sync_sub_and_fetch(&buffer->reffer_count, 1) == 0) {
|
if (__sync_sub_and_fetch(&buffer->reffer_count, 1) == 0) {
|
||||||
/*
|
logInfo("file: "__FILE__", line: %d, "
|
||||||
logDebug("file: "__FILE__", line: %d, "
|
|
||||||
"free shared buffer: %p", __LINE__, buffer);
|
"free shared buffer: %p", __LINE__, buffer);
|
||||||
*/
|
|
||||||
fast_allocator_free(&buffer->ctx->allocator, buffer);
|
fast_allocator_free(&buffer->ctx->allocator, buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline void sf_release_task_shared_mbuffer(struct fast_task_info *task)
|
||||||
|
{
|
||||||
|
SFSharedMBuffer *mbuffer;
|
||||||
|
mbuffer = fc_list_entry(task->recv_body, SFSharedMBuffer, buff);
|
||||||
|
sf_shared_mbuffer_release(mbuffer);
|
||||||
|
task->recv_body = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,8 @@
|
||||||
typedef void (*sf_accept_done_callback)(struct fast_task_info *task,
|
typedef void (*sf_accept_done_callback)(struct fast_task_info *task,
|
||||||
const bool bInnerPort);
|
const bool bInnerPort);
|
||||||
typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
|
typedef int (*sf_set_body_length_callback)(struct fast_task_info *task);
|
||||||
typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task);
|
typedef char *(*sf_alloc_recv_buffer_callback)(struct fast_task_info *task,
|
||||||
|
const int buff_size, bool *new_alloc);
|
||||||
typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage);
|
typedef int (*sf_deal_task_func)(struct fast_task_info *task, const int stage);
|
||||||
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
|
typedef int (*sf_recv_timeout_callback)(struct fast_task_info *task);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue