Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve handling of streaming error state changes/logging #439

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions libs/client-sdk/src/data_sources/streaming_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@

StreamingDataSource::StreamingDataSource(
config::shared::built::ServiceEndpoints const& endpoints,
config::shared::built::DataSourceConfig<config::shared::ClientSDK> const&

Check warning on line 24 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:24:5 [modernize-pass-by-value]

pass by value and use std::move
data_source_config,
config::shared::built::HttpProperties const& http_properties,

Check warning on line 26 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:26:5 [modernize-pass-by-value]

pass by value and use std::move
boost::asio::any_io_executor ioc,

Check warning on line 27 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:27:5 [modernize-pass-by-value]

pass by value and use std::move
Context context,
IDataSourceUpdateSink& handler,
DataSourceStatusManager& status_manager,
Logger const& logger)
: exec_(ioc),

Check warning on line 32 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:32:13 [performance-unnecessary-value-param]

parameter 'ioc' is passed by value and only copied once; consider moving it to avoid unnecessary copies
logger_(logger),
context_(std::move(context)),
status_manager_(status_manager),
Expand Down Expand Up @@ -99,7 +99,7 @@
// TODO: can the read timeout be shared with *all* http requests? Or should
// it have a default in defaults.hpp? This must be greater than the
// heartbeat interval of the streaming service.
client_builder.read_timeout(std::chrono::minutes(5));

Check warning on line 102 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:102:54 [cppcoreguidelines-avoid-magic-numbers

5 is a magic number; consider replacing it with a named constant

client_builder.write_timeout(http_config_.WriteTimeout());

Expand Down Expand Up @@ -140,12 +140,11 @@

client_builder.errors([weak_self](auto error) {
if (auto self = weak_self.lock()) {
auto error_string = launchdarkly::sse::ErrorToString(error);
LD_LOG(self->logger_, LogLevel::kError) << error_string;
self->status_manager_.SetState(
DataSourceStatus::DataSourceState::kShutdown,
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
std::move(error_string));
std::string error_string = sse::ErrorToString(error);
LD_LOG(self->logger_, sse::IsRecoverable(error) ? LogLevel::kDebug
: LogLevel::kError);
self->HandleErrorStateChange(std::move(error),
std::move(error_string));
}
});

Expand All @@ -162,6 +161,52 @@
client_->async_connect();
}

template <class>
inline constexpr bool always_false_v = false;

void StreamingDataSource::HandleErrorStateChange(sse::Error error,
std::string error_string) {
auto const state = sse::IsRecoverable(error) ? DataSourceState::kInterrupted
: DataSourceState::kShutdown;
std::visit(
[this, state, error_string = std::move(error_string)](auto error) {
using T = std::decay_t<decltype(error)>;
if constexpr (std::is_same_v<T, sse::errors::ReadTimeout>) {

Check warning on line 174 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:174:72 [bugprone-branch-clone]

repeated branch in conditional chain
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

Check warning on line 178 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:178:21 [performance-move-const-arg]

std::move of the const variable 'error_string' has no effect; remove std::move() or make the variable non-const

} else if constexpr (std::is_same_v<
T,
sse::errors::UnrecoverableClientError>) {
this->status_manager_.SetState(
state,
static_cast<DataSourceStatusManager::StatusCodeType>(
error.status),
std::move(error_string));

Check warning on line 187 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:187:21 [performance-move-const-arg]

std::move of the const variable 'error_string' has no effect; remove std::move() or make the variable non-const

} else if constexpr (std::is_same_v<
T, sse::errors::InvalidRedirectLocation>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

Check warning on line 194 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:194:21 [performance-move-const-arg]

std::move of the const variable 'error_string' has no effect; remove std::move() or make the variable non-const

} else if constexpr (std::is_same_v<T,
sse::errors::NotRedirectable>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

Check warning on line 201 in libs/client-sdk/src/data_sources/streaming_data_source.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/libs/client-sdk/src/data_sources/streaming_data_source.cpp:201:21 [performance-move-const-arg]

std::move of the const variable 'error_string' has no effect; remove std::move() or make the variable non-const
} else {
static_assert(always_false_v<decltype(error)>,
"non-exhaustive visitor");
}
},
std::move(error));
}

void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
if (client_) {
status_manager_.SetState(
Expand Down
2 changes: 2 additions & 0 deletions libs/client-sdk/src/data_sources/streaming_data_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class StreamingDataSource final
void ShutdownAsync(std::function<void()>) override;

private:
void HandleErrorStateChange(sse::Error error, std::string error_string);

Context context_;
boost::asio::any_io_executor exec_;
DataSourceStatusManager& status_manager_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace launchdarkly::internal::data_sources {
template <typename TDataSourceStatus, typename TInterface>
class DataSourceStatusManagerBase : public TInterface {
public:
using StatusCodeType =
typename TDataSourceStatus::ErrorInfo::StatusCodeType;

/**
* Set the state.
*
Expand All @@ -39,7 +42,7 @@ class DataSourceStatusManagerBase : public TInterface {
* @param message The message to associate with the error.
*/
void SetState(typename TDataSourceStatus::DataSourceState state,
typename TDataSourceStatus::ErrorInfo::StatusCodeType code,
StatusCodeType code,
std::string message) {
{
std::lock_guard lock(status_mutex_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,11 @@ void StreamingDataSource::StartAsync(

client_builder.errors([weak_self](auto error) {
if (auto self = weak_self.lock()) {
std::string error_string = launchdarkly::sse::ErrorToString(error);
LD_LOG(self->logger_, LogLevel::kError) << error_string;
self->status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
std::move(error_string));
std::string error_string = sse::ErrorToString(error);
LD_LOG(self->logger_, sse::IsRecoverable(error) ? LogLevel::kDebug
: LogLevel::kError);
self->HandleErrorStateChange(std::move(error),
std::move(error_string));
}
});

Expand All @@ -159,6 +158,52 @@ void StreamingDataSource::StartAsync(
client_->async_connect();
}

template <class>
inline constexpr bool always_false_v = false;

void StreamingDataSource::HandleErrorStateChange(sse::Error error,
std::string error_string) {
auto const state = sse::IsRecoverable(error) ? DataSourceState::kInterrupted
: DataSourceState::kOff;
std::visit(
[this, state, error_string = std::move(error_string)](auto error) {
using T = std::decay_t<decltype(error)>;
if constexpr (std::is_same_v<T, sse::errors::ReadTimeout>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

} else if constexpr (std::is_same_v<
T,
sse::errors::UnrecoverableClientError>) {
this->status_manager_.SetState(
state,
static_cast<data_components::DataSourceStatusManager::
StatusCodeType>(error.status),
std::move(error_string));

} else if constexpr (std::is_same_v<
T, sse::errors::InvalidRedirectLocation>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

} else if constexpr (std::is_same_v<T,
sse::errors::NotRedirectable>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));
} else {
static_assert(always_false_v<decltype(error)>,
"non-exhaustive visitor");
}
},
std::move(error));
}

void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
if (client_) {
status_manager_.SetState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class StreamingDataSource final
[[nodiscard]] std::string const& Identity() const override;

private:
void HandleErrorStateChange(sse::Error error, std::string error_string);

boost::asio::any_io_executor io_;
Logger const& logger_;

Expand Down
8 changes: 3 additions & 5 deletions libs/server-sent-events/include/launchdarkly/sse/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
namespace launchdarkly::sse {
namespace errors {

struct NoContent {};
std::ostream& operator<<(std::ostream& out, NoContent const&);

struct InvalidRedirectLocation {
std::string location;
};
Expand All @@ -32,12 +29,13 @@ std::ostream& operator<<(std::ostream& out, UnrecoverableClientError const&);

} // namespace errors

using Error = std::variant<errors::NoContent,
errors::InvalidRedirectLocation,
using Error = std::variant<errors::InvalidRedirectLocation,
errors::NotRedirectable,
errors::ReadTimeout,
errors::UnrecoverableClientError>;

bool IsRecoverable(Error const& error);

std::ostream& operator<<(std::ostream& out, Error const& error);

std::string ErrorToString(Error const& error);
Expand Down
4 changes: 2 additions & 2 deletions libs/server-sent-events/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class FoxyClient : public Client,

if (status_class == beast::http::status_class::successful) {
if (response.result() == beast::http::status::no_content) {
errors_(errors::NoContent{});
errors_(
errors::UnrecoverableClientError{http::status::no_content});
return;
}
if (!correct_content_type(response)) {
Expand Down Expand Up @@ -355,7 +356,6 @@ class FoxyClient : public Client,
logger_("exception closing stream: " + std::string(err.what()));
}


// Ideally we would call session_->async_shutdown() here to gracefully
// terminate the SSL session. For unknown reasons, this call appears to
// hang indefinitely and never complete until the SDK client is
Expand Down
10 changes: 4 additions & 6 deletions libs/server-sent-events/src/error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@
namespace launchdarkly::sse {
namespace errors {

std::ostream& operator<<(std::ostream& out, NoContent const&) {
out << "received HTTP error 204 (no content) - giving up "
"permanently";
return out;
}

std::ostream& operator<<(std::ostream& out,
InvalidRedirectLocation const& invalid) {
out << "received invalid redirect from server, cannot follow ("
Expand Down Expand Up @@ -60,4 +54,8 @@ std::string ErrorToString(Error const& error) {
return ss.str();
}

bool IsRecoverable(Error const& error) {
return std::holds_alternative<errors::ReadTimeout>(error);
}

} // namespace launchdarkly::sse
Loading