Skip to content

Commit

Permalink
ext_proc: convert ext_proc filter into dual filter (envoyproxy#33273)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Yanjun Xiang <yanjunxiang@google.com>
  • Loading branch information
yanjunxiang-google authored Apr 12, 2024
1 parent f5bca68 commit 34bb97c
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 30 deletions.
3 changes: 2 additions & 1 deletion api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ message CommonResponse {

// Clear the route cache for the current client request. This is necessary
// if the remote server modified headers that are used to calculate the route.
// This field is ignored in the response direction.
// This field is ignored in the response direction. This field is also ignored
// if the Envoy ext_proc filter is in the upstream filter chain.
bool clear_route_cache = 5;
}

Expand Down
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ new_features:
metadata may be sent to the server. If
:ref:`receiving_namespaces <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.MetadataOptions.receiving_namespaces>`
is defined, returned metadata may be written to the specified allowed namespaces.
- area: ext_proc
change: |
made the :ref:`ExternalProcessor <envoy_v3_api_msg_extensions.filters.http.ext_proc.v3.ExternalProcessor>` work as an upstream filter.
- area: wasm
change: |
added ``verify_signature`` foreign function to verify cryptographic signatures.
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,10 @@ envoy.filters.http.ext_authz:
envoy.filters.http.ext_proc:
categories:
- envoy.filters.http
- envoy.filters.http.upstream
security_posture: robust_to_untrusted_downstream_and_upstream
status: stable
status_upstream: alpha
type_urls:
- envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute
- envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor
Expand Down
22 changes: 13 additions & 9 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,24 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
absl::StatusOr<Http::FilterFactoryCb>
ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {
const std::string& stats_prefix, DualInfo dual_info,
Server::Configuration::ServerFactoryContext& context) {
const uint32_t message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
context.scope(), stats_prefix,
Envoy::Extensions::Filters::Common::Expr::getBuilder(context.serverFactoryContext()),
context.serverFactoryContext());
dual_info.scope, stats_prefix, dual_info.is_upstream,
Envoy::Extensions::Filters::Common::Expr::getBuilder(context), context);

return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
return [filter_config, grpc_service = proto_config.grpc_service(), &context,
dual_info](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.serverFactoryContext().clusterManager().grpcAsyncClientManager(), context.scope());
context.clusterManager().grpcAsyncClientManager(), dual_info.scope);

callbacks.addStreamFilter(Http::StreamFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client), grpc_service)});
Expand All @@ -39,6 +40,7 @@ ExternalProcessingFilterConfig::createRouteSpecificFilterConfigTyped(
return std::make_shared<FilterConfigPerRoute>(proto_config);
}

// This method will only be called when the filter is in downstream.
Http::FilterFactoryCb
ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& proto_config,
Expand All @@ -49,7 +51,7 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
server_context.scope(), stats_prefix,
server_context.scope(), stats_prefix, false,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context), server_context);

return [filter_config, grpc_service = proto_config.grpc_service(),
Expand All @@ -64,6 +66,8 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp

LEGACY_REGISTER_FACTORY(ExternalProcessingFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory, "envoy.ext_proc");
LEGACY_REGISTER_FACTORY(UpstreamExternalProcessingFilterConfig,
Server::Configuration::UpstreamHttpFilterConfigFactory, "envoy.ext_proc");

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
14 changes: 9 additions & 5 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ namespace HttpFilters {
namespace ExternalProcessing {

class ExternalProcessingFilterConfig
: public Common::FactoryBase<envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor,
envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute> {
: public Common::DualFactoryBase<
envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor,
envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute> {

public:
ExternalProcessingFilterConfig() : FactoryBase("envoy.filters.http.ext_proc") {}
ExternalProcessingFilterConfig() : DualFactoryBase("envoy.filters.http.ext_proc") {}

private:
static constexpr uint64_t DefaultMessageTimeoutMs = 200;
static constexpr uint64_t DefaultMaxMessageTimeoutMs = 0;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
virtual absl::StatusOr<Http::FilterFactoryCb> createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
const std::string& stats_prefix, DualInfo dual_info,
Server::Configuration::ServerFactoryContext& context) override;

Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& proto_config,
Expand All @@ -38,6 +40,8 @@ class ExternalProcessingFilterConfig
Server::Configuration::ServerFactoryContext& server_context) override;
};

using UpstreamExternalProcessingFilterConfig = ExternalProcessingFilterConfig;

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,11 @@ void Filter::onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status) {
}

void Filter::sendImmediateResponse(const ImmediateResponse& response) {
if (config_->isUpstream()) {
stats_.send_immediate_resp_upstream_ignored_.inc();
ENVOY_LOG(debug, "Ignoring send immediate response when ext_proc filter is in upstream");
return;
}
auto status_code = response.has_status() ? response.status().code() : DefaultImmediateStatus;
if (!MutationUtils::isValidHttpStatus(status_code)) {
ENVOY_LOG(debug, "Ignoring attempt to set invalid HTTP status {}", status_code);
Expand Down
13 changes: 9 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ namespace ExternalProcessing {
COUNTER(override_message_timeout_received) \
COUNTER(override_message_timeout_ignored) \
COUNTER(clear_route_cache_ignored) \
COUNTER(clear_route_cache_disabled)
COUNTER(clear_route_cache_disabled) \
COUNTER(clear_route_cache_upstream_ignored) \
COUNTER(send_immediate_resp_upstream_ignored)

struct ExtProcFilterStats {
ALL_EXT_PROC_FILTER_STATS(GENERATE_COUNTER_STRUCT)
Expand Down Expand Up @@ -148,7 +150,7 @@ class FilterConfig {
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config,
const std::chrono::milliseconds message_timeout,
const uint32_t max_message_timeout_ms, Stats::Scope& scope,
const std::string& stats_prefix,
const std::string& stats_prefix, bool is_upstream,
Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder,
Server::Configuration::CommonFactoryContext& context)
: failure_mode_allow_(config.failure_mode_allow()),
Expand All @@ -163,6 +165,7 @@ class FilterConfig {
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers(), context)),
disallowed_headers_(
initHeaderMatchers(config.forward_rules().disallowed_headers(), context)),
is_upstream_(is_upstream),
untyped_forwarding_namespaces_(
config.metadata_options().forwarding_namespaces().untyped().begin(),
config.metadata_options().forwarding_namespaces().untyped().end()),
Expand Down Expand Up @@ -206,6 +209,8 @@ class FilterConfig {

const ExpressionManager& expressionManager() const { return expression_manager_; }

bool isUpstream() const { return is_upstream_; }

const std::vector<std::string>& untypedForwardingMetadataNamespaces() const {
return untyped_forwarding_namespaces_;
}
Expand Down Expand Up @@ -246,11 +251,11 @@ class FilterConfig {
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
const std::vector<Matchers::StringMatcherPtr> disallowed_headers_;

// is_upstream_ is true if ext_proc filter is in the upstream filter chain.
const bool is_upstream_;
const std::vector<std::string> untyped_forwarding_namespaces_;
const std::vector<std::string> typed_forwarding_namespaces_;
const std::vector<std::string> untyped_receiving_namespaces_;

const ExpressionManager expression_manager_;

const ImmediateMutationChecker immediate_mutation_checker_;
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ void DecodingProcessorState::clearRouteCache(const CommonResponse& common_respon
if (!common_response.clear_route_cache()) {
return;
}
if (filter_.config().isUpstream()) {
filter_.stats().clear_route_cache_upstream_ignored_.inc();
ENVOY_LOG(debug, "NOT clearing route cache. The filter is in upstream filter chain.");
return;
}
// Only clear the route cache if there is a mutation to the header and clearing is allowed.
if (filter_.config().disableClearRouteCache()) {
filter_.stats().clear_route_cache_disabled_.inc();
Expand Down
83 changes: 77 additions & 6 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,17 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_wrong_server",
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.1", 1234));
}
// Construct a configuration proto for our filter and then re-write it
// to JSON so that we can add it to the overall config
envoy::extensions::filters::network::http_connection_manager::v3::HttpFilter ext_proc_filter;

std::string ext_proc_filter_name = "envoy.filters.http.ext_proc";
ext_proc_filter.set_name(ext_proc_filter_name);
ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_);
config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter));
if (downstream_filter_) {
// Construct a configuration proto for our filter and then re-write it
// to JSON so that we can add it to the overall config
envoy::extensions::filters::network::http_connection_manager::v3::HttpFilter
ext_proc_filter;
ext_proc_filter.set_name(ext_proc_filter_name);
ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_);
config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter));
}

// Add set_metadata filter to inject dynamic metadata used for testing
if (config_option.add_metadata) {
Expand Down Expand Up @@ -594,6 +598,7 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
TestScopedRuntime scoped_runtime_;
std::string header_raw_value_{"false"};
std::string filter_mutation_rule_{"false"};
bool downstream_filter_{true};
// Number of grpc upstreams in the test.
int grpc_upstream_count_ = 2;
};
Expand Down Expand Up @@ -3758,6 +3763,72 @@ TEST_P(ExtProcIntegrationTest, RequestResponseAttributes) {
}
#endif

TEST_P(ExtProcIntegrationTest, GetAndSetHeadersUpstream) {
downstream_filter_ = false;
initializeConfig();
// Add ext_proc as upstream filter.
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* static_resources = bootstrap.mutable_static_resources();
// Retrieve cluster_0.
auto* cluster = static_resources->mutable_clusters(0);
ConfigHelper::HttpProtocolOptions old_protocol_options;
if (cluster->typed_extension_protocol_options().contains(
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions")) {
old_protocol_options = MessageUtil::anyConvert<ConfigHelper::HttpProtocolOptions>(
(*cluster->mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]);
}
if (old_protocol_options.http_filters().empty()) {
old_protocol_options.add_http_filters()->set_name("envoy.filters.http.upstream_codec");
}
auto* ext_proc_filter = old_protocol_options.add_http_filters();
ext_proc_filter->set_name("envoy.filters.http.ext_proc");
ext_proc_filter->mutable_typed_config()->PackFrom(proto_config_);
for (int i = old_protocol_options.http_filters_size() - 1; i > 0; --i) {
old_protocol_options.mutable_http_filters()->SwapElements(i, i - 1);
}
(*cluster->mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]
.PackFrom(old_protocol_options);
});
HttpIntegrationTest::initialize();

auto response = sendDownstreamRequest(
[](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("x-remove-this"), "yes"); });

processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) {
Http::TestRequestHeaderMapImpl expected_request_headers{
{":scheme", "http"}, {":method", "GET"}, {":authority", "host"},
{":path", "/"}, {"x-remove-this", "yes"}, {"x-forwarded-proto", "http"}};
EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers));
auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation();
auto* mut1 = response_header_mutation->add_set_headers();
mut1->mutable_header()->set_key("x-new-header");
mut1->mutable_header()->set_value("new");
response_header_mutation->add_remove_headers("x-remove-this");
return true;
});

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this"));
EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new"));

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(100, true);

processResponseHeadersMessage(
*grpc_upstreams_[0], false, [](const HttpHeaders& headers, HeadersResponse&) {
Http::TestRequestHeaderMapImpl expected_response_headers{{":status", "200"}};
EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_response_headers));
return true;
});
verifyDownstreamResponse(*response, 200);
}

// Test the filter when configured, upon a grpc error, can retry the request and
// get response back.
TEST_P(ExtProcIntegrationTest, RetryOnResponseError) {
Expand Down
Loading

0 comments on commit 34bb97c

Please sign in to comment.