diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index a4d999f40a1f..f514ee075444 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -466,31 +466,29 @@ void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers) } } +void ActiveStreamDecoderFilter::maybeStopDecoderFilterChain(bool end_stream) { + filter_encoded_end_stream_ = end_stream; + if (end_stream && end_stream_ && !parent_.state_.decoder_filter_chain_complete_) { + parent_.state_.decoder_filter_chain_aborted_ = true; + } +} + void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream, absl::string_view details) { - encoded_end_stream_ = end_stream; + maybeStopDecoderFilterChain(end_stream); parent_.streamInfo().setResponseCodeDetails(details); parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers)); - if (end_stream && end_stream_) { - parent_.state_.decoder_filter_chain_aborted_ = true; - } parent_.encodeHeaders(nullptr, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream); } void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stream) { - encoded_end_stream_ = end_stream; - if (end_stream && end_stream_) { - parent_.state_.decoder_filter_chain_aborted_ = true; - } + maybeStopDecoderFilterChain(end_stream); parent_.encodeData(nullptr, data, end_stream, FilterManager::FilterIterationStartState::CanStartFromCurrent); } void ActiveStreamDecoderFilter::encodeTrailers(ResponseTrailerMapPtr&& trailers) { - encoded_end_stream_ = true; - if (end_stream_) { - parent_.state_.decoder_filter_chain_aborted_ = true; - } + maybeStopDecoderFilterChain(true); parent_.filter_manager_callbacks_.setResponseTrailers(std::move(trailers)); parent_.encodeTrailers(nullptr, *parent_.filter_manager_callbacks_.responseTrailers()); } @@ -538,8 +536,8 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead std::list::iterator entry = commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext); std::list::iterator continue_data_entry = decoder_filters_.end(); - - bool last_filter_saw_end_stream = false; + // Terminal filter is either the last one or filter that encoded end_stream. + bool terminal_filter_decoded_end_stream = false; for (; entry != decoder_filters_.end(); entry++) { ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders)); @@ -613,9 +611,9 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) { continue_data_entry = entry; } - last_filter_saw_end_stream = + terminal_filter_decoded_end_stream = (end_stream && continue_data_entry == decoder_filters_.end()) && - (std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_); + (std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_); } maybeContinueDecoding(continue_data_entry); @@ -623,7 +621,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead if (end_stream) { disarmRequestTimeout(); } - maybeEndDecode(last_filter_saw_end_stream); + maybeEndDecode(terminal_filter_decoded_end_stream); } void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, @@ -643,7 +641,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan // Filter iteration may start at the current filter. std::list::iterator entry = commonDecodePrefix(filter, filter_iteration_start_state); - bool last_filter_saw_end_stream = false; + bool terminal_filter_decoded_end_stream = false; for (; entry != decoder_filters_.end(); entry++) { // If the filter pointed by entry has stopped for all frame types, return now. @@ -721,8 +719,9 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan trailers_added_entry = entry; } - last_filter_saw_end_stream = - end_stream && (std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_); + terminal_filter_decoded_end_stream = + end_stream && + (std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_); if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) && std::next(entry) != decoder_filters_.end()) { @@ -742,7 +741,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan if (end_stream) { disarmRequestTimeout(); } - maybeEndDecode(last_filter_saw_end_stream); + maybeEndDecode(terminal_filter_decoded_end_stream); } RequestTrailerMap& FilterManager::addDecodedTrailers() { @@ -787,7 +786,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra // Filter iteration may start at the current filter. std::list::iterator entry = commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent); - bool last_filter_saw_end_stream = false; + bool terminal_filter_decoded_end_stream = false; for (; entry != decoder_filters_.end(); entry++) { // If the filter pointed by entry has stopped for all frame type, return now. @@ -811,10 +810,10 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra } processNewlyAddedMetadata(); - last_filter_saw_end_stream = - std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_; + terminal_filter_decoded_end_stream = + std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_; - if (last_filter_saw_end_stream) { + if (terminal_filter_decoded_end_stream) { break; } @@ -824,7 +823,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra } } disarmRequestTimeout(); - maybeEndDecode(last_filter_saw_end_stream); + maybeEndDecode(terminal_filter_decoded_end_stream); } void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) { @@ -954,9 +953,9 @@ void DownstreamFilterManager::sendLocalReply( // Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks // are running. - /*if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {*/ + // Local reply always stops decoder filter chain. state_.decoder_filter_chain_aborted_ = true; - /*} else*/ if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) { + if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) { state_.encoder_filter_chain_aborted_ = true; } @@ -1481,17 +1480,23 @@ void FilterManager::maybeEndEncode(bool end_stream) { ASSERT(!state_.encoder_filter_chain_complete_); state_.encoder_filter_chain_complete_ = true; if (filter_manager_callbacks_.isHalfCloseEnabled()) { + // If independent half close is enabled the stream is closed when both decoder and encoder + // filter chains has completed or were aborted. checkAndCloseStreamIfFullyClosed(); } else { + // Otherwise encoding end_stream always closes the stream (and resets it if request was not + // complete yet). filter_manager_callbacks_.endStream(); } } } -void FilterManager::maybeEndDecode(bool end_stream) { - if (end_stream) { +void FilterManager::maybeEndDecode(bool terminal_filter_decoded_end_stream) { + if (terminal_filter_decoded_end_stream) { ASSERT(!state_.decoder_filter_chain_complete_); state_.decoder_filter_chain_complete_ = true; + // If the decoder filter chain was aborted (i.e. due to local reply) + // we rely on the encoding of end_stream to close the stream. if (filter_manager_callbacks_.isHalfCloseEnabled() && !stopDecoderFilterChain()) { checkAndCloseStreamIfFullyClosed(); } @@ -1503,30 +1508,27 @@ void FilterManager::checkAndCloseStreamIfFullyClosed() { if (!filter_manager_callbacks_.isHalfCloseEnabled()) { return; } - - std::cout << typeid(*this).name() << "::checkAndCloseStreamIfFullyClosed() " - << state_.encoder_filter_chain_complete_ << " " << state_.decoder_filter_chain_complete_ - << " " << state_.decoder_filter_chain_aborted_ << "\n"; - - // When the upstream half close is enabled the stream decoding is stopped on error responses + // When the independent half close is enabled the stream is always closed on error responses // from the server. - bool error_response = false; if (filter_manager_callbacks_.responseHeaders().has_value()) { const uint64_t response_status = Http::Utility::getResponseStatus(filter_manager_callbacks_.responseHeaders().ref()); - error_response = + bool error_response = !(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status)); + // Abort the decoder filter if it has not yet been completed. + if (error_response && !state_.decoder_filter_chain_complete_) { + state_.decoder_filter_chain_aborted_ = true; + } } - // If upstream half close is enabled then close the stream either when force close - // is set (i.e local reply) or when both server and client half closed. + // If independent half close is enabled then close the stream when + // 1. Both encoder and decoder filter chains has completed. + // 2. Encoder filter chain has completed and decoder filter chain was aborted (i.e. local reply). + // There is no need to check for aborted encoder filter chain as the filter will either be + // completed or stream is reset. if (state_.encoder_filter_chain_complete_ && - (state_.decoder_filter_chain_complete_ || error_response || - state_.decoder_filter_chain_aborted_)) { + (state_.decoder_filter_chain_complete_ || state_.decoder_filter_chain_aborted_)) { ENVOY_STREAM_LOG(trace, "closing stream", *this); - if (error_response) { - state_.decoder_filter_chain_aborted_ = true; - } filter_manager_callbacks_.endStream(); } } diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 44e23186c331..f4b5184d9bf1 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -280,10 +280,20 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, void requestDataTooLarge(); void requestDataDrained(); + // Check if the filter that encoded end_stream has also decoded end_stream and if true + // stop the decoder filter chain. This will end the request after encoder filter chain + // is completed. + // This allows non-terminal filters (i.e. cache filter) to encode responses when independent + // half-close is enabled. Encoding end_stream effectively makes the filter terminal - decoder + // filer chain will not go past this filter. + void maybeStopDecoderFilterChain(bool end_stream); StreamDecoderFilterSharedPtr handle_; bool is_grpc_request_{}; - bool encoded_end_stream_{false}; + // Indicates that this filter called an encodeXXX method with end_stream == true. + // When independent half close is enabled this filter becomes the terminal filter + // in the decoder filter chain. + bool filter_encoded_end_stream_{false}; }; using ActiveStreamDecoderFilterPtr = std::unique_ptr; @@ -788,10 +798,11 @@ class FilterManager : public ScopeTrackedObject, void maybeEndEncode(bool end_stream); /** - * If end_stream is true, marks decoding as complete. This is a noop if end_stream is false. + * If terminal_filter_decoded_end_stream is true, marks decoding as complete. This is a noop if + * terminal_filter_decoded_end_stream is false. * @param end_stream whether decoding is complete. */ - void maybeEndDecode(bool end_stream); + void maybeEndDecode(bool terminal_filter_decoded_end_stream); void checkAndCloseStreamIfFullyClosed();