From 1885db70af6ef34510498953e1568058f40192d9 Mon Sep 17 00:00:00 2001 From: Andrei Strelkovskii Date: Tue, 14 May 2024 18:54:01 +0300 Subject: [PATCH] issue-1164: endpoint-proxy client retry policy (#1198) * issue-1164: endpoint-proxy client retry policy * issue-1164: endpoint-proxy client retry policy - deleted accidentally added server code --- cloud/blockstore/apps/client/lib/command.cpp | 6 +- .../libs/endpoint_proxy/client/client.cpp | 100 +++++++++++++++--- .../libs/endpoint_proxy/client/client.h | 10 ++ 3 files changed, 102 insertions(+), 14 deletions(-) diff --git a/cloud/blockstore/apps/client/lib/command.cpp b/cloud/blockstore/apps/client/lib/command.cpp index 54192f0397b..1858d6845c5 100644 --- a/cloud/blockstore/apps/client/lib/command.cpp +++ b/cloud/blockstore/apps/client/lib/command.cpp @@ -592,7 +592,11 @@ void TCommand::Init() static_cast(EndpointProxyInsecurePort), static_cast(EndpointProxySecurePort), {}, // rootCertsFile - }, Logging); + { + TDuration::Seconds(1), + TDuration::Minutes(5), + }, + }, Scheduler, Timer, Logging); } Start(); diff --git a/cloud/blockstore/libs/endpoint_proxy/client/client.cpp b/cloud/blockstore/libs/endpoint_proxy/client/client.cpp index 59a7fc68307..943a0da780c 100644 --- a/cloud/blockstore/libs/endpoint_proxy/client/client.cpp +++ b/cloud/blockstore/libs/endpoint_proxy/client/client.cpp @@ -4,6 +4,8 @@ #include #include +#include +#include #include #include @@ -55,11 +57,14 @@ struct TRequestContextImpl: TRequestContextBase NThreading::TPromise Promise; + TInstant ProcessingStartTs; + TRequestContextImpl( grpc::CompletionQueue& cq, const TString& host, ui16 port, TRequest request, + TInstant now, const std::shared_ptr& channelCredentials) : Request(std::move(request)) , CQ(cq) @@ -67,6 +72,7 @@ struct TRequestContextImpl: TRequestContextBase Sprintf("%s:%u", host.c_str(), port), channelCredentials)) , Promise(NThreading::NewPromise()) + , ProcessingStartTs(now) {} }; @@ -83,12 +89,14 @@ struct TStartRequestContext: TRequestContextImpl< const TString& host, ui16 port, TRequest request, + TInstant now, const std::shared_ptr& channelCredentials) : TRequestContextImpl( cq, host, port, std::move(request), + now, channelCredentials) { Reader = Service.AsyncStartProxyEndpoint(&ClientContext, Request, &CQ); @@ -108,12 +116,14 @@ struct TStopRequestContext: TRequestContextImpl< const TString& host, ui16 port, TRequest request, + TInstant now, const std::shared_ptr& channelCredentials) : TRequestContextImpl( cq, host, port, std::move(request), + now, channelCredentials) { Reader = Service.AsyncStopProxyEndpoint(&ClientContext, Request, &CQ); @@ -122,9 +132,13 @@ struct TStopRequestContext: TRequestContextImpl< //////////////////////////////////////////////////////////////////////////////// -struct TEndpointProxyClient: IEndpointProxyClient +struct TEndpointProxyClient + : IEndpointProxyClient + , std::enable_shared_from_this { const TEndpointProxyClientConfig Config; + const ISchedulerPtr Scheduler; + const ITimerPtr Timer; TLog Log; grpc::CompletionQueue CQ; @@ -133,8 +147,12 @@ struct TEndpointProxyClient: IEndpointProxyClient TEndpointProxyClient( TEndpointProxyClientConfig config, + ISchedulerPtr scheduler, + ITimerPtr timer, ILoggingServicePtr logging) : Config(std::move(config)) + , Scheduler(std::move(scheduler)) + , Timer(std::move(timer)) , Log(logging->CreateLog("ENDPOINT_PROXY_CLIENT")) { if (config.SecurePort) { @@ -153,13 +171,15 @@ struct TEndpointProxyClient: IEndpointProxyClient template TFuture RequestImpl( - std::shared_ptr request) + typename TContext::TRequest request, + TInstant now) { auto requestContext = std::make_unique( CQ, Config.Host, Port(), - *request, + request, + now, ChannelCredentials ); @@ -179,7 +199,9 @@ struct TEndpointProxyClient: IEndpointProxyClient { STORAGE_INFO("StartRequest: " << request->DebugString().Quote()); - return RequestImpl(std::move(request)); + return RequestImpl( + std::move(*request), + Timer->Now()); } TFuture StopProxyEndpoint( @@ -187,7 +209,9 @@ struct TEndpointProxyClient: IEndpointProxyClient { STORAGE_INFO("StopRequest: " << request->DebugString().Quote()); - return RequestImpl(std::move(request)); + return RequestImpl( + std::move(*request), + Timer->Now()); } void Loop() @@ -203,14 +227,14 @@ struct TEndpointProxyClient: IEndpointProxyClient auto* startRequestContext = dynamic_cast(&*requestContext); if (startRequestContext) { - FinishRequest(*startRequestContext); + FinishRequest(*startRequestContext, ok); continue; } auto* stopRequestContext = dynamic_cast(&*requestContext); if (stopRequestContext) { - FinishRequest(*stopRequestContext); + FinishRequest(*stopRequestContext, ok); continue; } } @@ -219,14 +243,60 @@ struct TEndpointProxyClient: IEndpointProxyClient } template - void FinishRequest(TRequestContext& requestContext) + void FinishRequest(TRequestContext& requestContext, bool ok) { - if (!requestContext.Status.ok()) { - auto* e = requestContext.Response.MutableError(); - e->SetCode(MAKE_GRPC_ERROR(requestContext.Status.error_code())); - e->SetMessage(requestContext.Status.error_message()); + if (!ok) { + *requestContext.Response.MutableError() = + MakeError(E_CANCELLED, "cq shutting down"); + } else if (!requestContext.Status.ok()) { + *requestContext.Response.MutableError() = MakeError( + MAKE_GRPC_ERROR(requestContext.Status.error_code()), + requestContext.Status.error_message()); + } + + const auto errorKind = + GetErrorKind(requestContext.Response.GetError()); + const auto deadline = requestContext.ProcessingStartTs + + Config.RetryPolicy.TotalTimeout; + + if (errorKind == EErrorKind::ErrorRetriable && Timer->Now() < deadline) { + Retry(requestContext); + } else { + requestContext.Promise.SetValue(std::move(requestContext.Response)); } - requestContext.Promise.SetValue(std::move(requestContext.Response)); + } + + template + void Retry(TRequestContext& requestContext) + { + auto request = std::move(requestContext.Request); + auto p = std::move(requestContext.Promise); + auto weakPtr = this->weak_from_this(); + Scheduler->Schedule( + Timer->Now() + Config.RetryPolicy.Backoff, + [ + request = std::move(request), + now = requestContext.ProcessingStartTs, + p = std::move(p), + weakPtr = std::move(weakPtr) + ] () mutable { + auto pThis = weakPtr.lock(); + if (!pThis) { + typename TRequestContext::TResponse response; + *response.MutableError() = + MakeError(E_CANCELLED, "client destroyed"); + p.SetValue(std::move(response)); + return; + } + + auto& Log = pThis->Log; + STORAGE_INFO("Retry: " << request.DebugString().Quote()); + + pThis->RequestImpl(std::move(request), now) + .Subscribe([p = std::move(p)] (auto f) mutable { + p.SetValue(f.ExtractValue()); + }); + }); } void Start() override @@ -252,10 +322,14 @@ struct TEndpointProxyClient: IEndpointProxyClient IEndpointProxyClientPtr CreateClient( TEndpointProxyClientConfig config, + ISchedulerPtr scheduler, + ITimerPtr timer, ILoggingServicePtr logging) { return std::make_shared( std::move(config), + std::move(scheduler), + std::move(timer), std::move(logging)); } diff --git a/cloud/blockstore/libs/endpoint_proxy/client/client.h b/cloud/blockstore/libs/endpoint_proxy/client/client.h index 6817e83d783..9d89ef9e786 100644 --- a/cloud/blockstore/libs/endpoint_proxy/client/client.h +++ b/cloud/blockstore/libs/endpoint_proxy/client/client.h @@ -2,6 +2,7 @@ #include "public.h" +#include #include #include @@ -29,14 +30,23 @@ struct IEndpointProxyClient: IStartable struct TEndpointProxyClientConfig { + struct TRetryPolicy + { + TDuration Backoff; + TDuration TotalTimeout; + }; + TString Host; ui16 Port; ui16 SecurePort; TString RootCertsFile; + TRetryPolicy RetryPolicy; }; IEndpointProxyClientPtr CreateClient( TEndpointProxyClientConfig config, + ISchedulerPtr scheduler, + ITimerPtr timer, ILoggingServicePtr logging); } // namespace NCloud::NBlockStore::NClient