Skip to content

Commit

Permalink
[Filestore] Implement a tool which replays filestore-vhost's profile …
Browse files Browse the repository at this point in the history
…log (#2124)

* [Filestore] Implement a tool which replays filestore-vhost's profile log - part0: fs player
  • Loading branch information
proller authored Nov 18, 2024
1 parent 5b6de05 commit 08e3324
Show file tree
Hide file tree
Showing 10 changed files with 1,394 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cloud/filestore/libs/client/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ FILESTORE_SESSION_FORWARD(FILESTORE_IMPLEMENT_METHOD)
with_lock (SessionLock) {
if (!HasError(response)) {
STORAGE_INFO(LogTag(GetSessionId(response), GetSessionSeqNo(response))
<< " session established" << GetSessionState(response).size());
<< " session established " << GetSessionState(response).size());

SessionState = SessionEstablished;

Expand Down
22 changes: 20 additions & 2 deletions cloud/filestore/tools/testing/loadtest/lib/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace NCloud::NFileStore::NLoadTest {

struct TCompletedRequest
{
NProto::EAction Action;
NProto::EAction Action{};
TDuration Elapsed;
NProto::TError Error;

Expand All @@ -41,8 +41,19 @@ struct IRequestGenerator
virtual ~IRequestGenerator() = default;

virtual bool HasNextRequest() = 0;
virtual TInstant NextRequestAt() = 0;
virtual NThreading::TFuture<TCompletedRequest> ExecuteNextRequest() = 0;

// With false collect request futures and process them in bulk
// With true process every request future immediately after ExecuteNextRequest
virtual bool ShouldImmediatelyProcessQueue()
{
return false;
}

virtual bool ShouldFailOnError()
{
return true;
}
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -61,4 +72,11 @@ IRequestGeneratorPtr CreateDataRequestGenerator(
TString filesystemId,
NProto::THeaders headers);

IRequestGeneratorPtr CreateReplayRequestGeneratorFs(
NProto::TReplaySpec spec,
ILoggingServicePtr logging,
NClient::ISessionPtr session,
TString filesystemId,
NProto::THeaders headers);

} // namespace NCloud::NFileStore::NLoadTest
5 changes: 0 additions & 5 deletions cloud/filestore/tools/testing/loadtest/lib/request_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ class TDataRequestGenerator final
return true;
}

TInstant NextRequestAt() override
{
return TInstant::Max();
}

NThreading::TFuture<TCompletedRequest> ExecuteNextRequest() override
{
const auto& action = PeekNextAction();
Expand Down
5 changes: 0 additions & 5 deletions cloud/filestore/tools/testing/loadtest/lib/request_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ class TIndexRequestGenerator final
return true;
}

TInstant NextRequestAt() override
{
return TInstant::Max();
}

NThreading::TFuture<TCompletedRequest> ExecuteNextRequest() override
{
const auto& action = PeekNextAction();
Expand Down
203 changes: 203 additions & 0 deletions cloud/filestore/tools/testing/loadtest/lib/request_replay.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#include "request_replay.h"

#include <cloud/filestore/libs/diagnostics/events/profile_events.ev.pb.h>
#include <cloud/filestore/libs/diagnostics/profile_log_events.h>
#include <cloud/filestore/libs/service/request.h>
#include <cloud/filestore/tools/analytics/libs/event-log/dump.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <library/cpp/eventlog/eventlog.h>
#include <library/cpp/eventlog/iterator.h>
#include <library/cpp/logger/log.h>

namespace NCloud::NFileStore::NLoadTest {

////////////////////////////////////////////////////////////////////////////////

IReplayRequestGenerator::IReplayRequestGenerator(
NProto::TReplaySpec spec,
ILoggingServicePtr logging,
NClient::ISessionPtr session,
TString /*filesystemId*/,
NProto::THeaders headers)
: Spec(std::move(spec))
, Headers(std::move(headers))
, Session(std::move(session))
{
Log = logging->CreateLog(Headers.GetClientId());

NEventLog::TOptions options;
options.FileName = Spec.GetFileName();

// Sort eventlog items by timestamp
options.SetForceStrongOrdering(true);
CurrentEvent = CreateIterator(options);
}

bool IReplayRequestGenerator::HasNextRequest()
{
if (!EventPtr) {
Advance();
}
return !!EventPtr;
}

bool IReplayRequestGenerator::ShouldImmediatelyProcessQueue()
{
return true;
}

bool IReplayRequestGenerator::ShouldFailOnError()
{
return false;
}

void IReplayRequestGenerator::Advance()
{
for (EventPtr = CurrentEvent->Next(); EventPtr;
EventPtr = CurrentEvent->Next())
{
MessagePtr = dynamic_cast<const NProto::TProfileLogRecord*>(
EventPtr->GetProto());

if (!MessagePtr) {
return;
}

const TString fileSystemId{MessagePtr->GetFileSystemId()};
if (!Spec.GetFileSystemIdFilter().empty() &&
fileSystemId != Spec.GetFileSystemIdFilter())
{
STORAGE_DEBUG(
"Skipped event with FileSystemId=%s",
fileSystemId.c_str());
continue;
}

EventMessageNumber = MessagePtr->GetRequests().size();
return;
}
}

TFuture<TCompletedRequest> IReplayRequestGenerator::ProcessRequest(
const NProto::TProfileLogRequestInfo& request)
{
const auto& action = request.GetRequestType();
switch (static_cast<EFileStoreRequest>(action)) {
case EFileStoreRequest::ReadData:
return DoReadData(request);
case EFileStoreRequest::WriteData:
return DoWrite(request);
case EFileStoreRequest::CreateNode:
return DoCreateNode(request);
case EFileStoreRequest::RenameNode:
return DoRenameNode(request);
case EFileStoreRequest::UnlinkNode:
return DoUnlinkNode(request);
case EFileStoreRequest::CreateHandle:
return DoCreateHandle(request);
case EFileStoreRequest::DestroyHandle:
return DoDestroyHandle(request);
case EFileStoreRequest::GetNodeAttr:
return DoGetNodeAttr(request);
case EFileStoreRequest::AccessNode:
return DoAccessNode(request);
case EFileStoreRequest::ListNodes:
return DoListNodes(request);
case EFileStoreRequest::AcquireLock:
return DoAcquireLock(request);
case EFileStoreRequest::ReleaseLock:
return DoReleaseLock(request);

case EFileStoreRequest::ReadBlob:
case EFileStoreRequest::WriteBlob:
case EFileStoreRequest::GenerateBlobIds:
case EFileStoreRequest::PingSession:
case EFileStoreRequest::Ping:
return {};

default:
break;
}

switch (static_cast<NFuse::EFileStoreFuseRequest>(action)) {
case NFuse::EFileStoreFuseRequest::Flush:
return DoFlush(request);

default:
break;
}

STORAGE_INFO(
"Uninmplemented action=%u %s",
action,
RequestName(request.GetRequestType()).c_str());

return {};
}

NThreading::TFuture<TCompletedRequest>
IReplayRequestGenerator::ExecuteNextRequest()
{
if (!HasNextRequest()) {
return {};
}

for (; EventPtr; Advance()) {
if (!MessagePtr) {
continue;
}

for (; EventMessageNumber > 0;) {
NProto::TProfileLogRequestInfo request =
MessagePtr->GetRequests()[--EventMessageNumber];
{
auto timediff = (request.GetTimestampMcs() - TimestampMcs) *
Spec.GetTimeScale();
TimestampMcs = request.GetTimestampMcs();
if (timediff > MaxSleepMcs) {
timediff = 0;
}

const auto current = TInstant::Now();
auto diff = current - Started;

if (timediff > diff.MicroSeconds()) {
auto sleep =
TDuration::MicroSeconds(timediff - diff.MicroSeconds());
STORAGE_DEBUG(
"Sleep=%lu timediff=%f diff=%lu",
sleep.MicroSeconds(),
timediff,
diff.MicroSeconds());

Sleep(sleep);
}

Started = current;
}

STORAGE_DEBUG(
"Processing message n=%d typename=%s type=%d name=%s data=%s",
EventMessageNumber,
request.GetTypeName().c_str(),
request.GetRequestType(),
RequestName(request.GetRequestType()).c_str(),
request.ShortDebugString().Quote().c_str());

const auto future = ProcessRequest(request);
if (future.Initialized()) {
return future;
}
}
}

STORAGE_INFO(
"Profile log finished n=%d hasPtr=%d",
EventMessageNumber,
!!EventPtr);

return {};
}

} // namespace NCloud::NFileStore::NLoadTest
99 changes: 99 additions & 0 deletions cloud/filestore/tools/testing/loadtest/lib/request_replay.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include "request.h"

#include <cloud/filestore/libs/diagnostics/events/profile_events.ev.pb.h>

#include <library/cpp/eventlog/eventlog.h>
#include <library/cpp/eventlog/iterator.h>

namespace NCloud::NFileStore::NLoadTest {

using namespace NThreading;
using namespace NCloud::NFileStore::NClient;

////////////////////////////////////////////////////////////////////////////////

class IReplayRequestGenerator: public IRequestGenerator
{
protected:
const ::NCloud::NFileStore::NProto::TReplaySpec Spec;
TLog Log;
TString FileSystemIdFilter;
const ::NCloud::NFileStore::NProto::THeaders Headers;
NClient::ISessionPtr Session;

ui64 TimestampMcs = 0;
TInstant Started;

// Do not sleep too much if timestamps in log are broken
constexpr static auto MaxSleepMcs = 1000000;

private:
THolder<NEventLog::IIterator> CurrentEvent;
TConstEventPtr EventPtr;
int EventMessageNumber = 0;
const NProto::TProfileLogRecord* MessagePtr{};
TFuture<TCompletedRequest> ProcessRequest(
const NProto::TProfileLogRequestInfo& request);

public:
IReplayRequestGenerator(
NProto::TReplaySpec spec,
ILoggingServicePtr logging,
NClient::ISessionPtr session,
TString filesystemId,
NProto::THeaders headers);

bool ShouldImmediatelyProcessQueue() override;

bool ShouldFailOnError() override;

void Advance();

bool HasNextRequest() override;

TFuture<TCompletedRequest> ExecuteNextRequest() override;

virtual TFuture<TCompletedRequest> DoReadData(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoWrite(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoCreateNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoRenameNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoUnlinkNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoCreateHandle(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoDestroyHandle(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoGetNodeAttr(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoAccessNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoListNodes(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoFlush(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoAcquireLock(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoReleaseLock(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
};

} // namespace NCloud::NFileStore::NLoadTest
Loading

0 comments on commit 08e3324

Please sign in to comment.