Skip to content

Commit

Permalink
Separate state for tracking downstream end_stream in filter manager a…
Browse files Browse the repository at this point in the history
…nd HCM

Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov committed Aug 22, 2024
1 parent ec09f36 commit 1127fac
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 52 deletions.
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ bug_fixes:
change: |
RBAC will now allow stat prefixes configured in per-route config to override the base config's
stat prefix.
- area: http
change: |
Fixed a bug where an incomplete request (missing body or trailers) may be proxied to the upstream when the limit on
the number of requests per I/O cycle is configured and an HTTP decoder filter that pauses filter chain is present. This behavior
can be reverted by setting the runtime guard ``envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream``
to false.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
74 changes: 42 additions & 32 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
// explicitly nulls out response_encoder to avoid the downstream being notified of the
// Envoy-internal stream instance being ended.
if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.decoderObservedEndStream() ||
!stream.state_.codec_saw_local_complete_)) {
if (stream.response_encoder_ != nullptr &&
(!stream.filter_manager_.hasLastDownstreamByteReceived() ||
!stream.state_.codec_saw_local_complete_)) {
// Indicate local is complete at this point so that if we reset during a continuation, we don't
// raise further data or trailers.
ENVOY_STREAM_LOG(debug, "doEndStream() resetting stream", stream);
Expand Down Expand Up @@ -294,7 +295,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// fully read, as there's no race condition to avoid.
const bool connection_close =
stream.filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
const bool request_complete = stream.filter_manager_.decoderObservedEndStream();
const bool request_complete = stream.filter_manager_.hasLastDownstreamByteReceived();

// Don't do delay close for HTTP/1.0 or if the request is complete.
checkForDeferredClose(connection_close && (request_complete || http_10_sans_cl));
Expand Down Expand Up @@ -943,32 +944,35 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();

filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
}

void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
"downstream duration timeout", nullptr,
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
sendLocalReply(
Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
"downstream duration timeout", nullptr, Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
}

void ConnectionManagerImpl::ActiveStream::chargeStats(const ResponseHeaderMap& headers) {
Expand Down Expand Up @@ -1086,15 +1090,15 @@ bool ConnectionManagerImpl::ActiveStream::validateHeaders() {
return true;
}

bool ConnectionManagerImpl::ActiveStream::validateTrailers() {
bool ConnectionManagerImpl::ActiveStream::validateTrailers(RequestTrailerMap& trailers) {
if (!header_validator_) {
return true;
}

auto validation_result = header_validator_->validateRequestTrailers(*request_trailers_);
auto validation_result = header_validator_->validateRequestTrailers(trailers);
std::string failure_details(validation_result.details());
if (validation_result.ok()) {
auto transformation_result = header_validator_->transformRequestTrailers(*request_trailers_);
auto transformation_result = header_validator_->transformRequestTrailers(trailers);
if (transformation_result.ok()) {
return true;
}
Expand Down Expand Up @@ -1129,12 +1133,12 @@ bool ConnectionManagerImpl::ActiveStream::validateTrailers() {
return false;
}

void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
void ConnectionManagerImpl::ActiveStream::maybeRecordLastByteReceived(bool end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !filter_manager_.decoderObservedEndStream()) {
if (end_stream && !filter_manager_.hasLastDownstreamByteReceived()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
connection_manager_.dispatcher_->timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
ENVOY_STREAM_LOG(debug, "request end stream timestamp recorded", *this);
}
}

Expand All @@ -1154,7 +1158,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
*headers);
// We only want to record this when reading the headers the first time, not when recreating
// a stream.
if (!filter_manager_.decoderObservedEndStream()) {
if (!filter_manager_.hasLastDownstreamByteReceived()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
connection_manager_.dispatcher_->timeSource());
}
Expand All @@ -1180,7 +1184,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
filter_manager_.streamInfo().protocol(protocol);

// We end the decode here to mark that the downstream stream is complete.
maybeEndDecode(end_stream);
maybeRecordLastByteReceived(end_stream);

if (!validateHeaders()) {
ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
Expand Down Expand Up @@ -1458,7 +1462,7 @@ void ConnectionManagerImpl::ActiveStream::traceRequest() {
void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
ScopeTrackerScopeState scope(this,
connection_manager_.read_callbacks_->connection().dispatcher());
maybeEndDecode(end_stream);
maybeRecordLastByteReceived(end_stream);
filter_manager_.streamInfo().addBytesReceived(data.length());
if (!state_.deferred_to_next_io_iteration_) {
filter_manager_.decodeData(data, end_stream);
Expand All @@ -1478,14 +1482,19 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
resetIdleTimer();

ASSERT(!request_trailers_);
request_trailers_ = std::move(trailers);
if (!validateTrailers()) {
if (!validateTrailers(*trailers)) {
ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *request_trailers_);
return;
}
maybeEndDecode(true);
maybeRecordLastByteReceived(true);
if (!state_.deferred_to_next_io_iteration_) {
request_trailers_ = std::move(trailers);
filter_manager_.decodeTrailers(*request_trailers_);
} else {
// Save trailers in a different variable since `request_trailers_` is available to the filter
// manager via `requestTrailers()` callback and makes filter manager see trailers prematurely
// when deferred request is processed.
deferred_request_trailers_ = std::move(trailers);
}
}

Expand Down Expand Up @@ -1776,7 +1785,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade
// If we are destroying a stream before remote is complete and the connection does not support
// multiplexing, we should disconnect since we don't want to wait around for the request to
// finish.
if (!filter_manager_.decoderObservedEndStream()) {
if (!filter_manager_.hasLastDownstreamByteReceived()) {
if (connection_manager_.codec_->protocol() < Protocol::Http2) {
connection_manager_.drain_state_ = DrainState::Closing;
}
Expand Down Expand Up @@ -2221,7 +2230,7 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
}
state_.deferred_to_next_io_iteration_ = false;
bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
request_trailers_ == nullptr && deferred_metadata_.empty();
deferred_request_trailers_ == nullptr && deferred_metadata_.empty();
filter_manager_.decodeHeaders(*request_headers_, end_stream);
if (end_stream) {
return true;
Expand All @@ -2235,10 +2244,11 @@ bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
// Filter manager will return early from decodeData and decodeTrailers if
// request has completed.
if (deferred_data_ != nullptr) {
end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
end_stream = state_.deferred_end_stream_ && deferred_request_trailers_ == nullptr;
filter_manager_.decodeData(*deferred_data_, end_stream);
}
if (request_trailers_ != nullptr) {
if (deferred_request_trailers_ != nullptr) {
request_trailers_ = std::move(deferred_request_trailers_);
filter_manager_.decodeTrailers(*request_trailers_);
}
return true;
Expand Down
7 changes: 4 additions & 3 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void decodeData(Buffer::Instance& data, bool end_stream) override;
void decodeMetadata(MetadataMapPtr&&) override;

// Mark that the last downstream byte is received, and the downstream stream is complete.
void maybeEndDecode(bool end_stream);
// Record the timestamp of last downstream byte is received.
void maybeRecordLastByteReceived(bool end_stream);

// Http::RequestDecoder
void decodeHeaders(RequestHeaderMapSharedPtr&& headers, bool end_stream) override;
Expand Down Expand Up @@ -410,7 +410,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// harmonize this behavior with H/1.
// 3. If the `stream_error_on_invalid_http_message` is set to `false` (it is by default) in the
// HTTP connection manager configuration, then the entire connection is closed.
bool validateTrailers();
bool validateTrailers(RequestTrailerMap& trailers);

std::weak_ptr<bool> stillAlive() { return {still_alive_}; }

Expand Down Expand Up @@ -508,6 +508,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
std::shared_ptr<bool> still_alive_ = std::make_shared<bool>(true);
std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
std::queue<MetadataMapPtr> deferred_metadata_;
RequestTrailerMapPtr deferred_request_trailers_;
};

using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
Expand Down
26 changes: 22 additions & 4 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,9 @@ class DownstreamFilterManager : public FilterManager {
std::move(parent_filter_state)),
local_reply_(local_reply),
downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)) {
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)),
use_filter_manager_state_for_downstream_end_stream_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream")) {
ENVOY_LOG_ONCE_IF(
trace, downstream_filter_load_shed_point_ == nullptr,
"LoadShedPoint envoy.load_shed_points.http_downstream_filter_check is not found. "
Expand Down Expand Up @@ -1153,11 +1155,24 @@ class DownstreamFilterManager : public FilterManager {
absl::string_view details) override;

/**
* Whether remote processing has been marked as complete.
* For the DownstreamFilterManager rely on external state, to handle the case
* of internal redirects.
* Whether downstream has observed end_stream.
*/
bool decoderObservedEndStream() const override {
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream
// runtime flag.
if (use_filter_manager_state_for_downstream_end_stream_) {
return state_.observed_decode_end_stream_;
}

return hasLastDownstreamByteReceived();
}

/**
* Return true if the timestamp of the downstream end_stream was recorded.
* For the HCM to handle the case of internal redirects, timeout error replies
* and stream resets on premature upstream response.
*/
bool hasLastDownstreamByteReceived() const {
return streamInfo().downstreamTiming() &&
streamInfo().downstreamTiming()->lastDownstreamRxByteReceived().has_value();
}
Expand Down Expand Up @@ -1206,6 +1221,9 @@ class DownstreamFilterManager : public FilterManager {
const LocalReply::LocalReply& local_reply_;
Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
// Set by the envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream runtime
// flag.
const bool use_filter_manager_state_for_downstream_end_stream_{};
};

} // namespace Http
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ RUNTIME_GUARD(envoy_reloadable_features_udp_socket_apply_aggregated_read_limit);
RUNTIME_GUARD(envoy_reloadable_features_uhv_allow_malformed_url_encoding);
RUNTIME_GUARD(envoy_reloadable_features_upstream_remote_address_use_connection);
RUNTIME_GUARD(envoy_reloadable_features_use_config_in_happy_eyeballs);
RUNTIME_GUARD(envoy_reloadable_features_use_filter_manager_state_for_downstream_end_stream);
RUNTIME_GUARD(envoy_reloadable_features_use_http3_header_normalisation);
RUNTIME_GUARD(envoy_reloadable_features_use_route_host_mutation_for_auto_sni_san);
RUNTIME_GUARD(envoy_reloadable_features_use_typed_metadata_in_proxy_protocol_listener);
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ envoy_cc_test(
"//test/integration/filters:response_metadata_filter_config_lib",
"//test/integration/filters:set_response_code_filter_config_proto_cc_proto",
"//test/integration/filters:set_response_code_filter_lib",
"//test/integration/filters:stop_in_headers_continue_in_data_filter_lib",
"//test/integration/filters:stop_iteration_and_continue",
"//test/mocks/http:http_mocks",
"//test/mocks/upstream:retry_priority_factory_mocks",
Expand Down
14 changes: 14 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,17 @@ envoy_cc_test_library(
"//source/extensions/filters/network/common:factory_base_lib",
],
)

envoy_cc_test_library(
name = "stop_in_headers_continue_in_data_filter_lib",
srcs = ["stop_in_headers_continue_in_data_filter.cc"],
deps = [
":common_lib",
"//envoy/http:filter_interface",
"//envoy/registry",
"//envoy/server:filter_config_interface",
"//source/common/protobuf",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "source/extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"
#include "test/integration/filters/common.h"

namespace Envoy {

// A test filter that stops iteration of headers request/response
// then injects a body later.
class StopInHeadersContinueInBodyFilter : public Http::PassThroughFilter {
public:
constexpr static char name[] = "stop-in-headers-continue-in-body-filter";

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool /* end_stream */) override {
return Http::FilterHeadersStatus::StopIteration;
}

Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override {
return Http::FilterDataStatus::Continue;
}
};

static Registry::RegisterFactory<SimpleFilterConfig<StopInHeadersContinueInBodyFilter>,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
static Registry::RegisterFactory<SimpleFilterConfig<StopInHeadersContinueInBodyFilter>,
Server::Configuration::UpstreamHttpFilterConfigFactory>
register_upstream_;
} // namespace Envoy
Loading

0 comments on commit 1127fac

Please sign in to comment.