Skip to content

Commit

Permalink
[Filestore] issue-2806: support multiple aio engines in local filestore
Browse files Browse the repository at this point in the history
Additionally dispatch async read/write into thread pool and increase default
number of worker threads to 8
  • Loading branch information
budevg committed Jan 7, 2025
1 parent b5c6c77 commit e4413e1
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cloud/filestore/libs/daemon/vhost/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <cloud/filestore/libs/vfs/probes.h>
#include <cloud/filestore/libs/vhost/server.h>

#include <cloud/storage/core/libs/aio/service.h>
#include <cloud/storage/core/libs/common/scheduler.h>
#include <cloud/storage/core/libs/common/task_queue.h>
#include <cloud/storage/core/libs/common/thread_pool.h>
Expand Down Expand Up @@ -285,6 +286,7 @@ void TBootstrapVhost::InitEndpoints()
auto serviceConfig = std::make_shared<TLocalFileStoreConfig>(
*localServiceConfig);
ThreadPool = CreateThreadPool("svc", serviceConfig->GetNumThreads());
FileIOService = CreateThreadedAIOService(serviceConfig->GetNumThreads());
LocalService = CreateLocalFileStore(
std::move(serviceConfig),
Timer,
Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/service_local/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ constexpr TDuration AsyncHandleOpsPeriod = TDuration::MilliSeconds(50);
xxx(PathPrefix, TString, "nfs_" )\
xxx(DefaultPermissions, ui32, 0775 )\
xxx(IdleSessionTimeout, TDuration, TDuration::Seconds(30) )\
xxx(NumThreads, ui32, 4 )\
xxx(NumThreads, ui32, 8 )\
xxx(StatePath, TString, "./" )\
xxx(MaxNodeCount, ui32, 1000000 )\
xxx(MaxHandlePerSessionCount, ui32, 10000 )\
Expand Down
4 changes: 3 additions & 1 deletion cloud/filestore/libs/service_local/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ class TLocalFileStore final
std::shared_ptr<NProto::T##name##Request> request) override \
{ \
Y_UNUSED(callContext); \
return ExecuteWithProfileLogAsync<T##name##Method>(*request); \
return TaskQueue->Execute([this, request = std::move(request)] { \
return ExecuteWithProfileLogAsync<T##name##Method>(*request); \
}); \
} \
// FILESTORE_IMPLEMENT_METHOD_ASYNC

Expand Down
3 changes: 2 additions & 1 deletion cloud/filestore/libs/vfs_fuse/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ class TSessionThread final
{
STORAGE_INFO("starting FUSE loop");

::NCloud::SetCurrentThreadName("FUSE");
static std::atomic<ui64> index = 0;
::NCloud::SetCurrentThreadName("FUSE" + ToString(index++));

AtomicSet(ThreadId, pthread_self());
fuse_session_loop(Session);
Expand Down
64 changes: 63 additions & 1 deletion cloud/storage/core/libs/aio/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <util/string/builder.h>
#include <util/system/file.h>
#include <util/system/thread.h>
#include <util/string/cast.h>

#include <atomic>

Expand Down Expand Up @@ -223,7 +224,9 @@ class TAIOService final
void Run()
{
SetHighestThreadPriority();
NCloud::SetCurrentThreadName("AIO");

static std::atomic<ui64> index = 0;
NCloud::SetCurrentThreadName("AIO" + ToString(index++));

timespec timeout = WAIT_TIMEOUT;

Expand All @@ -248,6 +251,60 @@ class TAIOService final
}
};

////////////////////////////////////////////////////////////////////////////////

class TThreadedAIOService final
: public IFileIOService
{
private:
TVector<IFileIOServicePtr> IoServices;
std::atomic<i64> NextService = 0;

public:
TThreadedAIOService(ui32 threadCount, size_t maxEvents)
{
for (ui32 i = 0; i < threadCount; i++) {
IoServices.push_back(CreateAIOService(maxEvents));
}
}

void AsyncRead(
TFileHandle& file,
i64 offset,
TArrayRef<char> buffer,
TFileIOCompletion* completion) override
{
auto index = NextService++;
IoServices[index % IoServices.size()]
->AsyncRead(file, offset, buffer, completion);
}

void AsyncWrite(
TFileHandle& file,
i64 offset,
TArrayRef<const char> buffer,
TFileIOCompletion* completion) override
{
auto index = NextService++;
IoServices[index % IoServices.size()]
->AsyncWrite(file, offset, buffer, completion);
}

void Start() override
{
for (auto& ioService : IoServices) {
ioService->Start();
}
}

void Stop() override
{
for (auto& ioService : IoServices) {
ioService->Stop();
}
}
};

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -257,4 +314,9 @@ IFileIOServicePtr CreateAIOService(size_t maxEvents)
return std::make_shared<TAIOService>(maxEvents);
}

IFileIOServicePtr CreateThreadedAIOService(ui32 threadCount, size_t maxEvents)
{
return std::make_shared<TThreadedAIOService>(threadCount, maxEvents);
}

} // namespace NCloud
2 changes: 2 additions & 0 deletions cloud/storage/core/libs/aio/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ namespace NCloud {

IFileIOServicePtr CreateAIOService(size_t maxEvents = 1024);

IFileIOServicePtr CreateThreadedAIOService(ui32 threadCount, size_t maxEvents = 1024);

} // namespace NCloud

0 comments on commit e4413e1

Please sign in to comment.