Skip to content

Commit

Permalink
issue-1164: endpoint-proxy client retry policy (#1198)
Browse files Browse the repository at this point in the history
* issue-1164: endpoint-proxy client retry policy

* issue-1164: endpoint-proxy client retry policy - deleted accidentally added server code
  • Loading branch information
qkrorlqr authored May 14, 2024
1 parent 21446f8 commit 1885db7
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 14 deletions.
6 changes: 5 additions & 1 deletion cloud/blockstore/apps/client/lib/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,11 @@ void TCommand::Init()
static_cast<ui16>(EndpointProxyInsecurePort),
static_cast<ui16>(EndpointProxySecurePort),
{}, // rootCertsFile
}, Logging);
{
TDuration::Seconds(1),
TDuration::Minutes(5),
},
}, Scheduler, Timer, Logging);
}

Start();
Expand Down
100 changes: 87 additions & 13 deletions cloud/blockstore/libs/endpoint_proxy/client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <cloud/blockstore/public/api/protos/endpoints.pb.h>

#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/common/scheduler.h>
#include <cloud/storage/core/libs/common/timer.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <util/stream/file.h>
Expand Down Expand Up @@ -55,18 +57,22 @@ struct TRequestContextImpl: TRequestContextBase

NThreading::TPromise<TResponse> Promise;

TInstant ProcessingStartTs;

TRequestContextImpl(
grpc::CompletionQueue& cq,
const TString& host,
ui16 port,
TRequest request,
TInstant now,
const std::shared_ptr<grpc::ChannelCredentials>& channelCredentials)
: Request(std::move(request))
, CQ(cq)
, Service(grpc::CreateChannel(
Sprintf("%s:%u", host.c_str(), port),
channelCredentials))
, Promise(NThreading::NewPromise<TResponse>())
, ProcessingStartTs(now)
{}
};

Expand All @@ -83,12 +89,14 @@ struct TStartRequestContext: TRequestContextImpl<
const TString& host,
ui16 port,
TRequest request,
TInstant now,
const std::shared_ptr<grpc::ChannelCredentials>& channelCredentials)
: TRequestContextImpl(
cq,
host,
port,
std::move(request),
now,
channelCredentials)
{
Reader = Service.AsyncStartProxyEndpoint(&ClientContext, Request, &CQ);
Expand All @@ -108,12 +116,14 @@ struct TStopRequestContext: TRequestContextImpl<
const TString& host,
ui16 port,
TRequest request,
TInstant now,
const std::shared_ptr<grpc::ChannelCredentials>& channelCredentials)
: TRequestContextImpl(
cq,
host,
port,
std::move(request),
now,
channelCredentials)
{
Reader = Service.AsyncStopProxyEndpoint(&ClientContext, Request, &CQ);
Expand All @@ -122,9 +132,13 @@ struct TStopRequestContext: TRequestContextImpl<

////////////////////////////////////////////////////////////////////////////////

struct TEndpointProxyClient: IEndpointProxyClient
struct TEndpointProxyClient
: IEndpointProxyClient
, std::enable_shared_from_this<TEndpointProxyClient>
{
const TEndpointProxyClientConfig Config;
const ISchedulerPtr Scheduler;
const ITimerPtr Timer;
TLog Log;

grpc::CompletionQueue CQ;
Expand All @@ -133,8 +147,12 @@ struct TEndpointProxyClient: IEndpointProxyClient

TEndpointProxyClient(
TEndpointProxyClientConfig config,
ISchedulerPtr scheduler,
ITimerPtr timer,
ILoggingServicePtr logging)
: Config(std::move(config))
, Scheduler(std::move(scheduler))
, Timer(std::move(timer))
, Log(logging->CreateLog("ENDPOINT_PROXY_CLIENT"))
{
if (config.SecurePort) {
Expand All @@ -153,13 +171,15 @@ struct TEndpointProxyClient: IEndpointProxyClient

template <typename TContext>
TFuture<typename TContext::TResponse> RequestImpl(
std::shared_ptr<typename TContext::TRequest> request)
typename TContext::TRequest request,
TInstant now)
{
auto requestContext = std::make_unique<TContext>(
CQ,
Config.Host,
Port(),
*request,
request,
now,
ChannelCredentials
);

Expand All @@ -179,15 +199,19 @@ struct TEndpointProxyClient: IEndpointProxyClient
{
STORAGE_INFO("StartRequest: " << request->DebugString().Quote());

return RequestImpl<TStartRequestContext>(std::move(request));
return RequestImpl<TStartRequestContext>(
std::move(*request),
Timer->Now());
}

TFuture<NProto::TStopProxyEndpointResponse> StopProxyEndpoint(
std::shared_ptr<NProto::TStopProxyEndpointRequest> request) override
{
STORAGE_INFO("StopRequest: " << request->DebugString().Quote());

return RequestImpl<TStopRequestContext>(std::move(request));
return RequestImpl<TStopRequestContext>(
std::move(*request),
Timer->Now());
}

void Loop()
Expand All @@ -203,14 +227,14 @@ struct TEndpointProxyClient: IEndpointProxyClient
auto* startRequestContext =
dynamic_cast<TStartRequestContext*>(&*requestContext);
if (startRequestContext) {
FinishRequest(*startRequestContext);
FinishRequest(*startRequestContext, ok);
continue;
}

auto* stopRequestContext =
dynamic_cast<TStopRequestContext*>(&*requestContext);
if (stopRequestContext) {
FinishRequest(*stopRequestContext);
FinishRequest(*stopRequestContext, ok);
continue;
}
}
Expand All @@ -219,14 +243,60 @@ struct TEndpointProxyClient: IEndpointProxyClient
}

template <typename TRequestContext>
void FinishRequest(TRequestContext& requestContext)
void FinishRequest(TRequestContext& requestContext, bool ok)
{
if (!requestContext.Status.ok()) {
auto* e = requestContext.Response.MutableError();
e->SetCode(MAKE_GRPC_ERROR(requestContext.Status.error_code()));
e->SetMessage(requestContext.Status.error_message());
if (!ok) {
*requestContext.Response.MutableError() =
MakeError(E_CANCELLED, "cq shutting down");
} else if (!requestContext.Status.ok()) {
*requestContext.Response.MutableError() = MakeError(
MAKE_GRPC_ERROR(requestContext.Status.error_code()),
requestContext.Status.error_message());
}

const auto errorKind =
GetErrorKind(requestContext.Response.GetError());
const auto deadline = requestContext.ProcessingStartTs
+ Config.RetryPolicy.TotalTimeout;

if (errorKind == EErrorKind::ErrorRetriable && Timer->Now() < deadline) {
Retry(requestContext);
} else {
requestContext.Promise.SetValue(std::move(requestContext.Response));
}
requestContext.Promise.SetValue(std::move(requestContext.Response));
}

template <typename TRequestContext>
void Retry(TRequestContext& requestContext)
{
auto request = std::move(requestContext.Request);
auto p = std::move(requestContext.Promise);
auto weakPtr = this->weak_from_this();
Scheduler->Schedule(
Timer->Now() + Config.RetryPolicy.Backoff,
[
request = std::move(request),
now = requestContext.ProcessingStartTs,
p = std::move(p),
weakPtr = std::move(weakPtr)
] () mutable {
auto pThis = weakPtr.lock();
if (!pThis) {
typename TRequestContext::TResponse response;
*response.MutableError() =
MakeError(E_CANCELLED, "client destroyed");
p.SetValue(std::move(response));
return;
}

auto& Log = pThis->Log;
STORAGE_INFO("Retry: " << request.DebugString().Quote());

pThis->RequestImpl<TRequestContext>(std::move(request), now)
.Subscribe([p = std::move(p)] (auto f) mutable {
p.SetValue(f.ExtractValue());
});
});
}

void Start() override
Expand All @@ -252,10 +322,14 @@ struct TEndpointProxyClient: IEndpointProxyClient

IEndpointProxyClientPtr CreateClient(
TEndpointProxyClientConfig config,
ISchedulerPtr scheduler,
ITimerPtr timer,
ILoggingServicePtr logging)
{
return std::make_shared<TEndpointProxyClient>(
std::move(config),
std::move(scheduler),
std::move(timer),
std::move(logging));
}

Expand Down
10 changes: 10 additions & 0 deletions cloud/blockstore/libs/endpoint_proxy/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "public.h"

#include <cloud/blockstore/libs/common/public.h>
#include <cloud/blockstore/libs/diagnostics/public.h>
#include <cloud/blockstore/public/api/protos/endpoints.pb.h>

Expand Down Expand Up @@ -29,14 +30,23 @@ struct IEndpointProxyClient: IStartable

struct TEndpointProxyClientConfig
{
struct TRetryPolicy
{
TDuration Backoff;
TDuration TotalTimeout;
};

TString Host;
ui16 Port;
ui16 SecurePort;
TString RootCertsFile;
TRetryPolicy RetryPolicy;
};

IEndpointProxyClientPtr CreateClient(
TEndpointProxyClientConfig config,
ISchedulerPtr scheduler,
ITimerPtr timer,
ILoggingServicePtr logging);

} // namespace NCloud::NBlockStore::NClient

0 comments on commit 1885db7

Please sign in to comment.