Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov committed Aug 1, 2024
1 parent c0881b5 commit b7dd3ce
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 36 deletions.
18 changes: 9 additions & 9 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
// explicitly nulls out response_encoder to avoid the downstream being notified of the
// Envoy-internal stream instance being ended.
if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.decoderObservedEndStream() ||
if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.remoteDecodeComplete() ||
!stream.state_.codec_saw_local_complete_)) {
// Indicate local is complete at this point so that if we reset during a continuation, we don't
// raise further data or trailers.
Expand Down Expand Up @@ -288,7 +288,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
// fully read, as there's no race condition to avoid.
const bool connection_close =
stream.filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
const bool request_complete = stream.filter_manager_.decoderObservedEndStream();
const bool request_complete = stream.filter_manager_.remoteDecodeComplete();

// Don't do delay close for HTTP/1.0 or if the request is complete.
checkForDeferredClose(connection_close && (request_complete || http_10_sans_cl));
Expand Down Expand Up @@ -923,29 +923,29 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();

filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
}

void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.decoderObservedEndStream()),
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"downstream duration timeout", nullptr,
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
Expand Down Expand Up @@ -1111,7 +1111,7 @@ bool ConnectionManagerImpl::ActiveStream::validateTrailers() {

void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !filter_manager_.decoderObservedEndStream()) {
if (end_stream && !filter_manager_.remoteDecodeComplete()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
connection_manager_.dispatcher_->timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
Expand All @@ -1134,7 +1134,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
*headers);
// We only want to record this when reading the headers the first time, not when recreating
// a stream.
if (!filter_manager_.decoderObservedEndStream()) {
if (!filter_manager_.remoteDecodeComplete()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
connection_manager_.dispatcher_->timeSource());
}
Expand Down Expand Up @@ -1762,7 +1762,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade
// If we are destroying a stream before remote is complete and the connection does not support
// multiplexing, we should disconnect since we don't want to wait around for the request to
// finish.
if (!filter_manager_.decoderObservedEndStream()) {
if (!filter_manager_.remoteDecodeComplete()) {
if (connection_manager_.codec_->protocol() < Protocol::Http2) {
connection_manager_.drain_state_ = DrainState::Closing;
}
Expand Down
14 changes: 7 additions & 7 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ bool ActiveStreamDecoderFilter::canContinue() {
bool ActiveStreamEncoderFilter::canContinue() {
// As with ActiveStreamDecoderFilter::canContinue() make sure we do not
// continue if a local reply has been sent.
return !parent_.state_.remote_encode_complete_;
return !parent_.state_.encoder_filter_chain_complete_;
}

Buffer::InstancePtr ActiveStreamDecoderFilter::createBuffer() {
Expand All @@ -369,7 +369,7 @@ Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
return parent_.buffered_request_data_;
}

bool ActiveStreamDecoderFilter::observedEndStream() { return parent_.decoderObservedEndStream(); }
bool ActiveStreamDecoderFilter::observedEndStream() { return parent_.remoteDecodeComplete(); }

void ActiveStreamDecoderFilter::doHeaders(bool end_stream) {
parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream);
Expand Down Expand Up @@ -860,8 +860,8 @@ FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_st
end_stream, filter_manager_callbacks_.isHalfCloseEnabled());
if (filter == nullptr) {
if (end_stream) {
ASSERT(!state_.encoder_observed_end_stream_);
state_.encoder_observed_end_stream_ = true;
ASSERT(!state_.observed_encode_end_stream_);
state_.observed_encode_end_stream_ = true;

// When half close semantics are disabled, receiving end stream from the upstream causes
// decoder filter to stop, as neither filters nor upstream is interested in downstream data.
Expand Down Expand Up @@ -1446,8 +1446,8 @@ void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter,

void FilterManager::maybeEndEncode(bool end_stream) {
if (end_stream) {
ASSERT(!state_.remote_encode_complete_);
state_.remote_encode_complete_ = true;
ASSERT(!state_.encoder_filter_chain_complete_);
state_.encoder_filter_chain_complete_ = true;
filter_manager_callbacks_.endStream();
}
}
Expand Down Expand Up @@ -1632,7 +1632,7 @@ Buffer::InstancePtr& ActiveStreamEncoderFilter::bufferedData() {
return parent_.buffered_response_data_;
}
bool ActiveStreamEncoderFilter::observedEndStream() {
return parent_.state_.encoder_observed_end_stream_;
return parent_.state_.observed_encode_end_stream_;
}
bool ActiveStreamEncoderFilter::has1xxHeaders() {
return parent_.state_.has_1xx_headers_ && !continued_1xx_headers_;
Expand Down
38 changes: 20 additions & 18 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ class FilterManager : public ScopeTrackedObject,
* @param end_stream whether the request is header only.
*/
void decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
state_.decoder_observed_end_stream_ = end_stream;
state_.observed_decode_end_stream_ = end_stream;
decodeHeaders(nullptr, headers, end_stream);
}

Expand All @@ -755,7 +755,7 @@ class FilterManager : public ScopeTrackedObject,
* @param end_stream whether this data is the end of the request.
*/
void decodeData(Buffer::Instance& data, bool end_stream) {
state_.decoder_observed_end_stream_ = end_stream;
state_.observed_decode_end_stream_ = end_stream;
decodeData(nullptr, data, end_stream, FilterIterationStartState::CanStartFromCurrent);
}

Expand All @@ -764,7 +764,7 @@ class FilterManager : public ScopeTrackedObject,
* @param trailers the trailers to decode.
*/
void decodeTrailers(RequestTrailerMap& trailers) {
state_.decoder_observed_end_stream_ = true;
state_.observed_decode_end_stream_ = true;
decodeTrailers(nullptr, trailers);
}

Expand Down Expand Up @@ -825,7 +825,7 @@ class FilterManager : public ScopeTrackedObject,
/**
* Whether remote processing has been marked as complete.
*/
virtual bool decoderObservedEndStream() const { return state_.decoder_observed_end_stream_; }
virtual bool remoteDecodeComplete() const { return state_.observed_decode_end_stream_; }

/**
* Instructs the FilterManager to not create a filter chain. This makes it possible to issue
Expand Down Expand Up @@ -859,24 +859,26 @@ class FilterManager : public ScopeTrackedObject,
protected:
struct State {
State()
: remote_encode_complete_(false), decoder_observed_end_stream_(false),
encoder_observed_end_stream_(false), has_1xx_headers_(false),
created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false),
: encoder_filter_chain_complete_(false), observed_decode_end_stream_(false),
observed_encode_end_stream_(false), has_1xx_headers_(false), created_filter_chain_(false),
is_head_request_(false), is_grpc_request_(false),
non_100_response_headers_encoded_(false), under_on_local_reply_(false),
decoder_filter_chain_aborted_(false), encoder_filter_chain_aborted_(false),
saw_downstream_reset_(false) {}
uint32_t filter_call_state_{0};

bool remote_encode_complete_ : 1; // Set after encoder filter chain has completed iteration.
// Prevents further calls to encoder filters.
bool decoder_observed_end_stream_ : 1; // Set when the end stream is observed on the decoder
// path before iteration of the filter chain begins.
// This flag is used for setting end_stream value
// when resuming decoder filter chain iteration.
bool encoder_observed_end_stream_ : 1; // Set when the end stream is observed on the encoder
// path before iteration of the filter chain begins.
// This flag is used for setting end_stream value
// when resuming encoder filter chain iteration.
// Set after encoder filter chain has completed iteration. Prevents further calls to encoder
// filters.
bool encoder_filter_chain_complete_ : 1;

// Set `true` when the filter manager observes end stream on the decoder path (from downstream
// client) before iteration of the decoder filter chain begins. This flag is used for setting
// end_stream value when resuming decoder filter chain iteration.
bool observed_decode_end_stream_ : 1;
// Set `true` when the filter manager observes end stream on the encoder path (from upstream
// server or Envoy's local reply) before iteration of the encoder filter chain begins. This flag
// is used for setting end_stream value when resuming encoder filter chain iteration.
bool observed_encode_end_stream_ : 1;

// By default, we will assume there are no 1xx. If encode1xxHeaders
// is ever called, this is set to true so commonContinue resumes processing the 1xx.
Expand Down Expand Up @@ -1146,7 +1148,7 @@ class DownstreamFilterManager : public FilterManager {
* For the DownstreamFilterManager rely on external state, to handle the case
* of internal redirects.
*/
bool decoderObservedEndStream() const override {
bool remoteDecodeComplete() const override {
return streamInfo().downstreamTiming() &&
streamInfo().downstreamTiming()->lastDownstreamRxByteReceived().has_value();
}
Expand Down
4 changes: 2 additions & 2 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class UpstreamFilterManager : public Http::FilterManager {
absl::string_view details) override {
state().decoder_filter_chain_aborted_ = true;
state().encoder_filter_chain_aborted_ = true;
state().remote_encode_complete_ = true;
state().encoder_observed_end_stream_ = true;
state().encoder_filter_chain_complete_ = true;
state().observed_encode_end_stream_ = true;
// TODO(alyssawilk) this should be done through the router to play well with hedging.
upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
details);
Expand Down

0 comments on commit b7dd3ce

Please sign in to comment.