dio thread use blocked_queue

pull/56/head
yuqing 2016-01-26 21:16:04 +08:00
parent da9521f0c2
commit 521f346d3e
7 changed files with 11 additions and 49 deletions

View File

@ -1,7 +1,8 @@
Version 5.08 2015-12-08 Version 5.08 2016-01-26
* install library to $(TARGET_PREFIX)/lib anyway * install library to $(TARGET_PREFIX)/lib anyway
* php extension compiled in PHP 7 * php extension compiled in PHP 7
* dio thread use blocked_queue
Version 5.07 2015-09-13 Version 5.07 2015-09-13
* schedule task add the "second" field * schedule task add the "second" field

View File

@ -2,7 +2,7 @@
%define FDFSServer fastdfs-server %define FDFSServer fastdfs-server
%define FDFSClient libfdfsclient %define FDFSClient libfdfsclient
%define FDFSTool fastdfs-tool %define FDFSTool fastdfs-tool
%define FDFSVersion 5.0.7 %define FDFSVersion 5.0.8
Name: %{FastDFS} Name: %{FastDFS}
Version: %{FDFSVersion} Version: %{FDFSVersion}
@ -16,13 +16,13 @@ Source: http://perso.orange.fr/sebastien.godard/%{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id Requires: %__cp %__mv %__chmod %__grep %__mkdir %__install %__id
BuildRequires: libfastcommon-devel >= 1.0.21 BuildRequires: libfastcommon-devel >= 1.0.24
%description %description
This package provides tracker & storage of fastdfs This package provides tracker & storage of fastdfs
%package -n %{FDFSServer} %package -n %{FDFSServer}
Requires: libfastcommon >= 1.0.21 Requires: libfastcommon >= 1.0.24
Summary: fastdfs tracker & storage Summary: fastdfs tracker & storage
%package -n %{FDFSTool} %package -n %{FDFSTool}

View File

@ -100,26 +100,11 @@ int storage_dio_init()
for (pContext=pThreadData->contexts; pContext<pContextEnd; \ for (pContext=pThreadData->contexts; pContext<pContextEnd; \
pContext++) pContext++)
{ {
if ((result=task_queue_init(&(pContext->queue))) != 0) if ((result=blocked_queue_init(&(pContext->queue))) != 0)
{ {
return result; return result;
} }
if ((result=init_pthread_lock(&(pContext->lock))) != 0)
{
return result;
}
result = pthread_cond_init(&(pContext->cond), NULL);
if (result != 0)
{
logError("file: "__FILE__", line: %d, " \
"pthread_cond_init fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
if ((result=pthread_create(&tid, &thread_attr, \ if ((result=pthread_create(&tid, &thread_attr, \
dio_thread_entrance, pContext)) != 0) dio_thread_entrance, pContext)) != 0)
{ {
@ -153,7 +138,7 @@ void storage_dio_terminate()
pContextEnd = g_dio_contexts + g_dio_thread_count; pContextEnd = g_dio_contexts + g_dio_thread_count;
for (pContext=g_dio_contexts; pContext<pContextEnd; pContext++) for (pContext=g_dio_contexts; pContext<pContextEnd; pContext++)
{ {
pthread_cond_signal(&(pContext->cond)); blocked_queue_terminate(&(pContext->queue));
} }
} }
@ -169,23 +154,12 @@ int storage_dio_queue_push(struct fast_task_info *pTask)
pContext = g_dio_contexts + pFileContext->dio_thread_index; pContext = g_dio_contexts + pFileContext->dio_thread_index;
pClientInfo->stage |= FDFS_STORAGE_STAGE_DIO_THREAD; pClientInfo->stage |= FDFS_STORAGE_STAGE_DIO_THREAD;
if ((result=task_queue_push(&(pContext->queue), pTask)) != 0) if ((result=blocked_queue_push(&(pContext->queue), pTask)) != 0)
{ {
add_to_deleted_list(pTask); add_to_deleted_list(pTask);
return result; return result;
} }
if ((result=pthread_cond_signal(&(pContext->cond))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"pthread_cond_signal fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
add_to_deleted_list(pTask);
return result;
}
return 0; return 0;
} }
@ -757,25 +731,13 @@ static void *dio_thread_entrance(void* arg)
struct fast_task_info *pTask; struct fast_task_info *pTask;
pContext = (struct storage_dio_context *)arg; pContext = (struct storage_dio_context *)arg;
pthread_mutex_lock(&(pContext->lock));
while (g_continue_flag) while (g_continue_flag)
{ {
if ((result=pthread_cond_wait(&(pContext->cond), \ while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL)
&(pContext->lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_cond_wait fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
while ((pTask=task_queue_pop(&(pContext->queue))) != NULL)
{ {
((StorageClientInfo *)pTask->arg)->deal_func(pTask); ((StorageClientInfo *)pTask->arg)->deal_func(pTask);
} }
} }
pthread_mutex_unlock(&(pContext->lock));
if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0) if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0)
{ {

View File

@ -17,12 +17,11 @@
#include <pthread.h> #include <pthread.h>
#include "tracker_types.h" #include "tracker_types.h"
#include "fast_task_queue.h" #include "fast_task_queue.h"
#include "fast_blocked_queue.h"
struct storage_dio_context struct storage_dio_context
{ {
struct fast_task_queue queue; struct fast_blocked_queue queue;
pthread_mutex_t lock;
pthread_cond_t cond;
}; };
struct storage_dio_thread_data struct storage_dio_thread_data

0
test/test_delete.sh Normal file → Executable file
View File

0
test/test_download.sh Normal file → Executable file
View File

0
test/test_upload.sh Normal file → Executable file
View File