Skip to content

Commit

Permalink
Update comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov committed Aug 12, 2024
1 parent 71ce1f0 commit 68c807e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 48 deletions.
92 changes: 47 additions & 45 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,31 +466,29 @@ void ActiveStreamDecoderFilter::encode1xxHeaders(ResponseHeaderMapPtr&& headers)
}
}

void ActiveStreamDecoderFilter::maybeStopDecoderFilterChain(bool end_stream) {
filter_encoded_end_stream_ = end_stream;
if (end_stream && end_stream_ && !parent_.state_.decoder_filter_chain_complete_) {
parent_.state_.decoder_filter_chain_aborted_ = true;
}
}

void ActiveStreamDecoderFilter::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
absl::string_view details) {
encoded_end_stream_ = end_stream;
maybeStopDecoderFilterChain(end_stream);
parent_.streamInfo().setResponseCodeDetails(details);
parent_.filter_manager_callbacks_.setResponseHeaders(std::move(headers));
if (end_stream && end_stream_) {
parent_.state_.decoder_filter_chain_aborted_ = true;
}
parent_.encodeHeaders(nullptr, *parent_.filter_manager_callbacks_.responseHeaders(), end_stream);
}

void ActiveStreamDecoderFilter::encodeData(Buffer::Instance& data, bool end_stream) {
encoded_end_stream_ = end_stream;
if (end_stream && end_stream_) {
parent_.state_.decoder_filter_chain_aborted_ = true;
}
maybeStopDecoderFilterChain(end_stream);
parent_.encodeData(nullptr, data, end_stream,
FilterManager::FilterIterationStartState::CanStartFromCurrent);
}

void ActiveStreamDecoderFilter::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
encoded_end_stream_ = true;
if (end_stream_) {
parent_.state_.decoder_filter_chain_aborted_ = true;
}
maybeStopDecoderFilterChain(true);
parent_.filter_manager_callbacks_.setResponseTrailers(std::move(trailers));
parent_.encodeTrailers(nullptr, *parent_.filter_manager_callbacks_.responseTrailers());
}
Expand Down Expand Up @@ -538,8 +536,8 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();

bool last_filter_saw_end_stream = false;
// Terminal filter is either the last one or filter that encoded end_stream.
bool terminal_filter_decoded_end_stream = false;

for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
Expand Down Expand Up @@ -613,17 +611,17 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
continue_data_entry = entry;
}
last_filter_saw_end_stream =
terminal_filter_decoded_end_stream =
(end_stream && continue_data_entry == decoder_filters_.end()) &&
(std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_);
(std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_);
}

maybeContinueDecoding(continue_data_entry);

if (end_stream) {
disarmRequestTimeout();
}
maybeEndDecode(last_filter_saw_end_stream);
maybeEndDecode(terminal_filter_decoded_end_stream);
}

void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
Expand All @@ -643,7 +641,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
// Filter iteration may start at the current filter.
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, filter_iteration_start_state);
bool last_filter_saw_end_stream = false;
bool terminal_filter_decoded_end_stream = false;

for (; entry != decoder_filters_.end(); entry++) {
// If the filter pointed by entry has stopped for all frame types, return now.
Expand Down Expand Up @@ -721,8 +719,9 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
trailers_added_entry = entry;
}

last_filter_saw_end_stream =
end_stream && (std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_);
terminal_filter_decoded_end_stream =
end_stream &&
(std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_);

if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) &&
std::next(entry) != decoder_filters_.end()) {
Expand All @@ -742,7 +741,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
if (end_stream) {
disarmRequestTimeout();
}
maybeEndDecode(last_filter_saw_end_stream);
maybeEndDecode(terminal_filter_decoded_end_stream);
}

RequestTrailerMap& FilterManager::addDecodedTrailers() {
Expand Down Expand Up @@ -787,7 +786,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
// Filter iteration may start at the current filter.
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);
bool last_filter_saw_end_stream = false;
bool terminal_filter_decoded_end_stream = false;

for (; entry != decoder_filters_.end(); entry++) {
// If the filter pointed by entry has stopped for all frame type, return now.
Expand All @@ -811,10 +810,10 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
}

processNewlyAddedMetadata();
last_filter_saw_end_stream =
std::next(entry) == decoder_filters_.end() || (*entry)->encoded_end_stream_;
terminal_filter_decoded_end_stream =
std::next(entry) == decoder_filters_.end() || (*entry)->filter_encoded_end_stream_;

if (last_filter_saw_end_stream) {
if (terminal_filter_decoded_end_stream) {
break;
}

Expand All @@ -824,7 +823,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
}
}
disarmRequestTimeout();
maybeEndDecode(last_filter_saw_end_stream);
maybeEndDecode(terminal_filter_decoded_end_stream);
}

void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) {
Expand Down Expand Up @@ -954,9 +953,9 @@ void DownstreamFilterManager::sendLocalReply(

// Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks
// are running.
/*if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {*/
// Local reply always stops decoder filter chain.
state_.decoder_filter_chain_aborted_ = true;
/*} else*/ if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
state_.encoder_filter_chain_aborted_ = true;
}

Expand Down Expand Up @@ -1481,17 +1480,23 @@ void FilterManager::maybeEndEncode(bool end_stream) {
ASSERT(!state_.encoder_filter_chain_complete_);
state_.encoder_filter_chain_complete_ = true;
if (filter_manager_callbacks_.isHalfCloseEnabled()) {
// If independent half close is enabled the stream is closed when both decoder and encoder
// filter chains has completed or were aborted.
checkAndCloseStreamIfFullyClosed();
} else {
// Otherwise encoding end_stream always closes the stream (and resets it if request was not
// complete yet).
filter_manager_callbacks_.endStream();
}
}
}

void FilterManager::maybeEndDecode(bool end_stream) {
if (end_stream) {
void FilterManager::maybeEndDecode(bool terminal_filter_decoded_end_stream) {
if (terminal_filter_decoded_end_stream) {
ASSERT(!state_.decoder_filter_chain_complete_);
state_.decoder_filter_chain_complete_ = true;
// If the decoder filter chain was aborted (i.e. due to local reply)
// we rely on the encoding of end_stream to close the stream.
if (filter_manager_callbacks_.isHalfCloseEnabled() && !stopDecoderFilterChain()) {
checkAndCloseStreamIfFullyClosed();
}
Expand All @@ -1503,30 +1508,27 @@ void FilterManager::checkAndCloseStreamIfFullyClosed() {
if (!filter_manager_callbacks_.isHalfCloseEnabled()) {
return;
}

std::cout << typeid(*this).name() << "::checkAndCloseStreamIfFullyClosed() "
<< state_.encoder_filter_chain_complete_ << " " << state_.decoder_filter_chain_complete_
<< " " << state_.decoder_filter_chain_aborted_ << "\n";

// When the upstream half close is enabled the stream decoding is stopped on error responses
// When the independent half close is enabled the stream is always closed on error responses
// from the server.
bool error_response = false;
if (filter_manager_callbacks_.responseHeaders().has_value()) {
const uint64_t response_status =
Http::Utility::getResponseStatus(filter_manager_callbacks_.responseHeaders().ref());
error_response =
bool error_response =
!(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status));
// Abort the decoder filter if it has not yet been completed.
if (error_response && !state_.decoder_filter_chain_complete_) {
state_.decoder_filter_chain_aborted_ = true;
}
}

// If upstream half close is enabled then close the stream either when force close
// is set (i.e local reply) or when both server and client half closed.
// If independent half close is enabled then close the stream when
// 1. Both encoder and decoder filter chains has completed.
// 2. Encoder filter chain has completed and decoder filter chain was aborted (i.e. local reply).
// There is no need to check for aborted encoder filter chain as the filter will either be
// completed or stream is reset.
if (state_.encoder_filter_chain_complete_ &&
(state_.decoder_filter_chain_complete_ || error_response ||
state_.decoder_filter_chain_aborted_)) {
(state_.decoder_filter_chain_complete_ || state_.decoder_filter_chain_aborted_)) {
ENVOY_STREAM_LOG(trace, "closing stream", *this);
if (error_response) {
state_.decoder_filter_chain_aborted_ = true;
}
filter_manager_callbacks_.endStream();
}
}
Expand Down
17 changes: 14 additions & 3 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,20 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,

void requestDataTooLarge();
void requestDataDrained();
// Check if the filter that encoded end_stream has also decoded end_stream and if true
// stop the decoder filter chain. This will end the request after encoder filter chain
// is completed.
// This allows non-terminal filters (i.e. cache filter) to encode responses when independent
// half-close is enabled. Encoding end_stream effectively makes the filter terminal - decoder
// filer chain will not go past this filter.
void maybeStopDecoderFilterChain(bool end_stream);

StreamDecoderFilterSharedPtr handle_;
bool is_grpc_request_{};
bool encoded_end_stream_{false};
// Indicates that this filter called an encodeXXX method with end_stream == true.
// When independent half close is enabled this filter becomes the terminal filter
// in the decoder filter chain.
bool filter_encoded_end_stream_{false};
};

using ActiveStreamDecoderFilterPtr = std::unique_ptr<ActiveStreamDecoderFilter>;
Expand Down Expand Up @@ -788,10 +798,11 @@ class FilterManager : public ScopeTrackedObject,
void maybeEndEncode(bool end_stream);

/**
* If end_stream is true, marks decoding as complete. This is a noop if end_stream is false.
* If terminal_filter_decoded_end_stream is true, marks decoding as complete. This is a noop if
* terminal_filter_decoded_end_stream is false.
* @param end_stream whether decoding is complete.
*/
void maybeEndDecode(bool end_stream);
void maybeEndDecode(bool terminal_filter_decoded_end_stream);

void checkAndCloseStreamIfFullyClosed();

Expand Down

0 comments on commit 68c807e

Please sign in to comment.