From 1e5fac86069943d3c9a3c6c9fdd5bec4f7fd960e Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Thu, 19 Sep 2024 17:49:13 +0000 Subject: [PATCH 01/14] ext_proc: refactoring: moving stream_ from Filter into client object for better abstraction Signed-off-by: Yanjun Xiang --- .../extensions/filters/http/ext_proc/client.h | 2 + .../filters/http/ext_proc/client_impl.h | 5 +++ .../filters/http/ext_proc/ext_proc.cc | 43 ++++++++++--------- .../filters/http/ext_proc/ext_proc.h | 4 -- .../filters/http/ext_proc/filter_test.cc | 4 ++ .../filters/http/ext_proc/mock_server.h | 2 + 6 files changed, 36 insertions(+), 24 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 54493c094fe3..413bbcac7730 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -48,6 +48,8 @@ class ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE; + virtual ExternalProcessorStream* stream() PURE; + virtual void setStream(ExternalProcessorStream* stream) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 745bd3f167c8..8ef177cda00c 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -32,10 +32,15 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override; + ExternalProcessorStream* stream() override { return stream_; } + void setStream(ExternalProcessorStream* stream) override { stream_ = stream; } private: Grpc::AsyncClientManager& client_manager_; Stats::Scope& scope_; + // The gRPC stream to the external processor, which will be opened + // when it's time to send the first message. + ExternalProcessorStream* stream_ = nullptr; }; class ExternalProcessorStreamImpl : public ExternalProcessorStream, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 3013ea1d82dc..ed9313fd8e6b 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -336,7 +336,7 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream"); return StreamOpenState::IgnoreError; } - if (!stream_) { + if (!client_->stream()) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); Http::AsyncClient::ParentContext grpc_context; @@ -351,32 +351,33 @@ Filter::StreamOpenState Filter::openStream() { if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called - // Asserts that `stream_` is nullptr since it is not valid to be used any further + // Asserts that `stream_object` is nullptr since it is not valid to be used any further // beyond this point. ASSERT(stream_object == nullptr); return sent_immediate_response_ ? StreamOpenState::Error : StreamOpenState::IgnoreError; } stats_.streams_started_.inc(); - stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), - config_->deferredCloseTimeout()); + auto* stream = config_->threadLocalStreamManager().store( + std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); + client_->setStream(stream); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); + logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo()); } } return StreamOpenState::Ok; } void Filter::closeStream() { - if (stream_) { + if (client_->stream()) { ENVOY_LOG(debug, "Calling close on stream"); - if (stream_->close()) { + if (client_->stream()->close()) { stats_.streams_closed_.inc(); } - config_->threadLocalStreamManager().erase(stream_); - stream_ = nullptr; + config_->threadLocalStreamManager().erase(client_->stream()); + client_->setStream(nullptr); } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -384,7 +385,8 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(client_->stream(), + filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -402,8 +404,8 @@ void Filter::onDestroy() { // closure is deferred upon filter destruction with a timer. // First, release the referenced filter resource. - if (stream_ != nullptr) { - stream_->notifyFilterDestroy(); + if (client_->stream() != nullptr) { + client_->stream()->notifyFilterDestroy(); } // Second, perform stream deferred closure. @@ -433,7 +435,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); state.setPaused(true); return FilterHeadersStatus::StopIteration; @@ -658,7 +660,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers ProcessingRequest req = buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true); ENVOY_LOG(debug, "Sending headers message in observability mode"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); return FilterHeadersStatus::Continue; @@ -683,7 +685,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat // Set up the the body chunk and send. auto req = setupBodyChunk(state, data, end_stream); req.set_observability_mode(true); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); ENVOY_LOG(debug, "Sending body message in ObservabilityMode"); } else if (state.bodyMode() != ProcessingMode::NONE) { @@ -875,7 +877,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState ProcessingRequest& req) { state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } @@ -891,20 +893,21 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::TrailersCallback); ENVOY_LOG(debug, "Sending trailers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } void Filter::logGrpcStreamInfo() { - if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); + if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter(); if (upstream_meter != nullptr) { logging_info_->setBytesSent(upstream_meter->wireBytesSent()); logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); } // Only set upstream host in logging info once. if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); + logging_info_->setUpstreamHost( + client_->stream()->streamInfo().upstreamInfo()->upstreamHost()); } } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 070b60b58d8e..ab31f0f8bd38 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -479,10 +479,6 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; - // The gRPC stream to the external processor, which will be opened - // when it's time to send the first message. - ExternalProcessorStream* stream_ = nullptr; - // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 008974567edc..c9ec03548db4 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -92,6 +92,9 @@ class HttpFilterTest : public testing::Test { client_ = std::make_unique(); route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_, _, _, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); + EXPECT_CALL(filter_->client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); + EXPECT_CALL(*client_, setStream(_)) + .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); @@ -594,6 +597,7 @@ class HttpFilterTest : public testing::Test { absl::optional final_expected_grpc_service_; Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; std::unique_ptr client_; + ExternalProcessorStream* stream_ = nullptr; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; bool server_closed_stream_ = false; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index d0b0389b0fd4..03b0850fff46 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -17,6 +17,8 @@ class MockClient : public ExternalProcessorClient { (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); + MOCK_METHOD(ExternalProcessorStream*, stream, ()); + MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); }; class MockStream : public ExternalProcessorStream { From ff1b6f6d334a79642d226fbe8b5006008539867e Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Thu, 19 Sep 2024 18:39:35 +0000 Subject: [PATCH 02/14] fix an issue Signed-off-by: Yanjun Xiang --- test/extensions/filters/http/ext_proc/filter_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index c9ec03548db4..99a8fe5db282 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -92,7 +92,7 @@ class HttpFilterTest : public testing::Test { client_ = std::make_unique(); route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_, _, _, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); - EXPECT_CALL(filter_->client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); + EXPECT_CALL(*client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); EXPECT_CALL(*client_, setStream(_)) .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); From 10908d05e0282ccea887a8147b4b090b24294686 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Thu, 19 Sep 2024 19:01:32 +0000 Subject: [PATCH 03/14] fixing format Signed-off-by: Yanjun Xiang --- test/extensions/filters/http/ext_proc/filter_test.cc | 2 +- test/extensions/filters/http/ext_proc/ordering_test.cc | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 99a8fe5db282..698ce1b08ef7 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -94,7 +94,7 @@ class HttpFilterTest : public testing::Test { EXPECT_CALL(*client_, start(_, _, _, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); EXPECT_CALL(*client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); EXPECT_CALL(*client_, setStream(_)) - .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); + .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index 4b1e4cf1330f..d03814b1473a 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -61,6 +61,9 @@ class OrderingTest : public testing::Test { client_ = std::make_unique(); route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_, _, _, _)).WillRepeatedly(Invoke(this, &OrderingTest::doStart)); + EXPECT_CALL(*client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); + EXPECT_CALL(*client_, setStream(_)) + .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); @@ -202,6 +205,7 @@ class OrderingTest : public testing::Test { } std::unique_ptr client_; + ExternalProcessorStream* stream_ = nullptr; MockStream stream_delegate_; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; NiceMock stats_store_; From 4ac853c1eaa8ed7d5f63b7b21d4ed3d981a36052 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Thu, 19 Sep 2024 19:53:02 +0000 Subject: [PATCH 04/14] fixing unit test fuzzer Signed-off-by: Yanjun Xiang --- .../http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 6 ++++++ .../extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h | 2 ++ 2 files changed, 8 insertions(+) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index df321fe20487..f3e5712fe2ba 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -94,6 +94,12 @@ DEFINE_PROTO_FUZZER( filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); + ExternalProcessing::ExternalProcessorStream* stream = nullptr; + EXPECT_CALL(*client, stream()).WillRepeatedly(Invoke([&stream]() { return stream; })); + EXPECT_CALL(*client, setStream(_)) + .WillRepeatedly(Invoke([&stream](ExternalProcessing::ExternalProcessorStream* stream_ptr) { + stream = stream_ptr; + })); EXPECT_CALL(*client, start(_, _, _, _)) .WillRepeatedly(Invoke([&](ExternalProcessing::ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h index 49ff067dd353..d4e6eac19d70 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h @@ -35,6 +35,8 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); + MOCK_METHOD(ExternalProcessing::ExternalProcessorStream*, stream, ()); + MOCK_METHOD(void, setStream, (ExternalProcessing::ExternalProcessorStream * stream)); }; } // namespace UnitTestFuzz From e06a6e00458c172dfd5901b8540d8018b0c7721c Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Fri, 20 Sep 2024 04:19:50 +0000 Subject: [PATCH 05/14] unit test fuzzer issue Signed-off-by: Yanjun Xiang --- .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index f3e5712fe2ba..60be50ff59f6 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -94,12 +94,6 @@ DEFINE_PROTO_FUZZER( filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); - ExternalProcessing::ExternalProcessorStream* stream = nullptr; - EXPECT_CALL(*client, stream()).WillRepeatedly(Invoke([&stream]() { return stream; })); - EXPECT_CALL(*client, setStream(_)) - .WillRepeatedly(Invoke([&stream](ExternalProcessing::ExternalProcessorStream* stream_ptr) { - stream = stream_ptr; - })); EXPECT_CALL(*client, start(_, _, _, _)) .WillRepeatedly(Invoke([&](ExternalProcessing::ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, @@ -120,8 +114,16 @@ DEFINE_PROTO_FUZZER( return stream; })); + ExternalProcessing::ExternalProcessorStream* stream = nullptr; + EXPECT_CALL(*client, stream()).WillRepeatedly(Invoke([&stream]() { return stream; })); + EXPECT_CALL(*client, setStream(_)) + .WillRepeatedly(Invoke([&stream](ExternalProcessing::ExternalProcessorStream* stream_ptr) { + stream = stream_ptr; + })); + Envoy::Extensions::HttpFilters::HttpFilterFuzzer fuzzer; fuzzer.runData(static_cast(filter.get()), input.request()); + delete client; } } // namespace UnitTestFuzz From a08941886f62e098cbbcd85156ab3269e6c78053 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Fri, 20 Sep 2024 17:51:20 +0000 Subject: [PATCH 06/14] CI Signed-off-by: Yanjun Xiang --- .../http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 60be50ff59f6..ce471dcfbda1 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -123,7 +123,6 @@ DEFINE_PROTO_FUZZER( Envoy::Extensions::HttpFilters::HttpFilterFuzzer fuzzer; fuzzer.runData(static_cast(filter.get()), input.request()); - delete client; } } // namespace UnitTestFuzz From 037c7d1899f2162b8fde2e8c0e05a97f2e01592c Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Sun, 22 Sep 2024 16:30:40 +0000 Subject: [PATCH 07/14] fixing CI issue Signed-off-by: Yanjun Xiang --- test/extensions/filters/http/ext_proc/filter_test.cc | 4 ---- test/extensions/filters/http/ext_proc/mock_server.cc | 8 +++++++- test/extensions/filters/http/ext_proc/mock_server.h | 2 ++ .../ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 7 ------- .../filters/http/ext_proc/unit_test_fuzz/mocks.h | 10 +++++++++- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 698ce1b08ef7..008974567edc 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -92,9 +92,6 @@ class HttpFilterTest : public testing::Test { client_ = std::make_unique(); route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_, _, _, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); - EXPECT_CALL(*client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); - EXPECT_CALL(*client_, setStream(_)) - .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); @@ -597,7 +594,6 @@ class HttpFilterTest : public testing::Test { absl::optional final_expected_grpc_service_; Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; std::unique_ptr client_; - ExternalProcessorStream* stream_ = nullptr; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; bool server_closed_stream_ = false; diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc index 25286be792c3..29637f793f83 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.cc +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -5,7 +5,13 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -MockClient::MockClient() = default; +MockClient::MockClient() { + EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); + + EXPECT_CALL(*this, setStream(testing::_)) + .WillRepeatedly( + testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; })); +} MockClient::~MockClient() = default; MockStream::MockStream() = default; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index 03b0850fff46..12c9d7308a93 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -19,6 +19,8 @@ class MockClient : public ExternalProcessorClient { Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); MOCK_METHOD(ExternalProcessorStream*, stream, ()); MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); + + ExternalProcessorStream* stream_ = nullptr; }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index ce471dcfbda1..df321fe20487 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -114,13 +114,6 @@ DEFINE_PROTO_FUZZER( return stream; })); - ExternalProcessing::ExternalProcessorStream* stream = nullptr; - EXPECT_CALL(*client, stream()).WillRepeatedly(Invoke([&stream]() { return stream; })); - EXPECT_CALL(*client, setStream(_)) - .WillRepeatedly(Invoke([&stream](ExternalProcessing::ExternalProcessorStream* stream_ptr) { - stream = stream_ptr; - })); - Envoy::Extensions::HttpFilters::HttpFilterFuzzer fuzzer; fuzzer.runData(static_cast(filter.get()), input.request()); } diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h index d4e6eac19d70..d5b018298a30 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h @@ -27,7 +27,14 @@ class MockStream : public ExternalProcessing::ExternalProcessorStream { class MockClient : public ExternalProcessing::ExternalProcessorClient { public: - MockClient() = default; + MockClient() { + EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); + EXPECT_CALL(*this, setStream(testing::_)) + .WillRepeatedly( + testing::Invoke([this](ExternalProcessing::ExternalProcessorStream* stream) -> void { + stream_ = stream; + })); + } ~MockClient() override = default; MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, @@ -37,6 +44,7 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient { Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); MOCK_METHOD(ExternalProcessing::ExternalProcessorStream*, stream, ()); MOCK_METHOD(void, setStream, (ExternalProcessing::ExternalProcessorStream * stream)); + ExternalProcessing::ExternalProcessorStream* stream_ = nullptr; }; } // namespace UnitTestFuzz From 1e202d8c51f5851268a8315d919e5009c23b6e8a Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Mon, 23 Sep 2024 17:32:51 +0000 Subject: [PATCH 08/14] removing redundant mock file Signed-off-by: Yanjun Xiang --- .../http/ext_proc/unit_test_fuzz/BUILD | 13 +---- .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 6 +-- .../http/ext_proc/unit_test_fuzz/mocks.h | 54 ------------------- 3 files changed, 4 insertions(+), 69 deletions(-) delete mode 100644 test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index 13649e730933..e90445a16d3c 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -1,7 +1,6 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_fuzz_test", - "envoy_cc_mock", "envoy_package", "envoy_proto_library", ) @@ -10,16 +9,6 @@ licenses(["notice"]) # Apache 2 envoy_package() -envoy_cc_mock( - name = "ext_proc_mocks", - hdrs = ["mocks.h"], - tags = ["skip_on_windows"], - deps = [ - "//source/extensions/filters/http/ext_proc:client_interface", - "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", - ], -) - envoy_proto_library( name = "ext_proc_unit_test_fuzz_proto", srcs = ["ext_proc_unit_test_fuzz.proto"], @@ -36,10 +25,10 @@ envoy_cc_fuzz_test( corpus = "ext_proc_corpus", tags = ["skip_on_windows"], deps = [ - ":ext_proc_mocks", ":ext_proc_unit_test_fuzz_proto_cc_proto", "//source/extensions/filters/http/ext_proc:config", "//test/extensions/filters/http/common/fuzz:http_filter_fuzzer_lib", + "//test/extensions/filters/http/ext_proc:mock_server_lib", "//test/mocks/http:http_mocks", "//test/mocks/network:network_mocks", "//test/mocks/server:server_factory_context_mocks", diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index df321fe20487..dd52ef8ea816 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -1,8 +1,8 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" #include "test/extensions/filters/http/common/fuzz/http_filter_fuzzer.h" +#include "test/extensions/filters/http/ext_proc/mock_server.h" #include "test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.pb.validate.h" -#include "test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h" #include "test/fuzz/fuzz_runner.h" #include "test/mocks/http/mocks.h" #include "test/mocks/network/mocks.h" @@ -88,7 +88,7 @@ DEFINE_PROTO_FUZZER( return; } - MockClient* client = new MockClient(); + ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient(); std::unique_ptr filter = std::make_unique( config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service()); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); @@ -100,7 +100,7 @@ DEFINE_PROTO_FUZZER( const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) -> ExternalProcessing::ExternalProcessorStreamPtr { - auto stream = std::make_unique(); + auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)) .WillRepeatedly(Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&, bool) -> void { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h deleted file mode 100644 index d5b018298a30..000000000000 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include "envoy/service/ext_proc/v3/external_processor.pb.h" - -#include "source/extensions/filters/http/ext_proc/client.h" - -#include "gmock/gmock.h" - -namespace Envoy { -namespace Extensions { -namespace HttpFilters { -namespace ExtProc { -namespace UnitTestFuzz { - -class MockStream : public ExternalProcessing::ExternalProcessorStream { -public: - MockStream() = default; - ~MockStream() override = default; - - MOCK_METHOD(void, send, - (envoy::service::ext_proc::v3::ProcessingRequest && request, bool end_stream)); - MOCK_METHOD(bool, close, ()); - MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); - MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); - MOCK_METHOD(void, notifyFilterDestroy, ()); -}; - -class MockClient : public ExternalProcessing::ExternalProcessorClient { -public: - MockClient() { - EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); - EXPECT_CALL(*this, setStream(testing::_)) - .WillRepeatedly( - testing::Invoke([this](ExternalProcessing::ExternalProcessorStream* stream) -> void { - stream_ = stream; - })); - } - ~MockClient() override = default; - - MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, - (ExternalProcessing::ExternalProcessorCallbacks & callbacks, - const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const Envoy::Http::AsyncClient::StreamOptions&, - Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); - MOCK_METHOD(ExternalProcessing::ExternalProcessorStream*, stream, ()); - MOCK_METHOD(void, setStream, (ExternalProcessing::ExternalProcessorStream * stream)); - ExternalProcessing::ExternalProcessorStream* stream_ = nullptr; -}; - -} // namespace UnitTestFuzz -} // namespace ExtProc -} // namespace HttpFilters -} // namespace Extensions -} // namespace Envoy From 7bc13dbcf723138ff0e2621ef67e4a377b6d3114 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Mon, 23 Sep 2024 17:38:37 +0000 Subject: [PATCH 09/14] removing duplicate Signed-off-by: Yanjun Xiang --- test/extensions/filters/http/ext_proc/ordering_test.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index d03814b1473a..fdc5af548ea8 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -61,9 +61,6 @@ class OrderingTest : public testing::Test { client_ = std::make_unique(); route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_, _, _, _)).WillRepeatedly(Invoke(this, &OrderingTest::doStart)); - EXPECT_CALL(*client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; })); - EXPECT_CALL(*client_, setStream(_)) - .WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; })); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); From 33a8b3d5f9b86d0676880951e90b48930675ebea Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Mon, 23 Sep 2024 19:22:18 +0000 Subject: [PATCH 10/14] remove unused variable Signed-off-by: Yanjun Xiang --- source/extensions/filters/http/ext_proc/ext_proc.cc | 2 +- test/extensions/filters/http/ext_proc/ordering_test.cc | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index ed9313fd8e6b..3f2ebed7986a 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -358,7 +358,7 @@ Filter::StreamOpenState Filter::openStream() { } stats_.streams_started_.inc(); - auto* stream = config_->threadLocalStreamManager().store( + ExternalProcessorStream* stream = config_->threadLocalStreamManager().store( std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); client_->setStream(stream); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index fdc5af548ea8..4b1e4cf1330f 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -202,7 +202,6 @@ class OrderingTest : public testing::Test { } std::unique_ptr client_; - ExternalProcessorStream* stream_ = nullptr; MockStream stream_delegate_; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; NiceMock stats_store_; From 4885e8148c00e38de9e45d0614c8e31ee5ceae4a Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Tue, 24 Sep 2024 03:23:55 +0000 Subject: [PATCH 11/14] change BUILD size Signed-off-by: Yanjun Xiang --- test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index e90445a16d3c..acfd84c9871a 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -21,6 +21,7 @@ envoy_proto_library( envoy_cc_fuzz_test( name = "ext_proc_unit_test_fuzz", + size = "large", srcs = ["ext_proc_unit_test_fuzz.cc"], corpus = "ext_proc_corpus", tags = ["skip_on_windows"], From 7cfc91961ea0129e48525ca52f739315a61d610e Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Tue, 24 Sep 2024 14:26:24 +0000 Subject: [PATCH 12/14] limit the request body size Signed-off-by: Yanjun Xiang --- .../extensions/filters/http/ext_proc/unit_test_fuzz/BUILD | 1 - .../ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 8 ++++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index acfd84c9871a..e90445a16d3c 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -21,7 +21,6 @@ envoy_proto_library( envoy_cc_fuzz_test( name = "ext_proc_unit_test_fuzz", - size = "large", srcs = ["ext_proc_unit_test_fuzz.cc"], corpus = "ext_proc_corpus", tags = ["skip_on_windows"], diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index dd52ef8ea816..25c6b3ac2d76 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -69,6 +69,14 @@ DEFINE_PROTO_FUZZER( return; } + // Limiting the max supported request body size to 128k. + if (input.request().has_proto_body()) { + const uint32_t max_body_size = 128 * 1024; + if (input.request().proto_body().message().value().size() > max_body_size) { + return; + } + } + static FuzzerMocks mocks; NiceMock stats_store; From 60119c47c5881bc0d9b64ad46113ac15bbfed752 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Tue, 24 Sep 2024 16:01:42 +0000 Subject: [PATCH 13/14] Empty-Commit Signed-off-by: Yanjun Xiang From c020e49bda8c31c1ada234cb5dc631e34fae9920 Mon Sep 17 00:00:00 2001 From: Yanjun Xiang Date: Tue, 24 Sep 2024 17:12:45 +0000 Subject: [PATCH 14/14] Empty-Commit Signed-off-by: Yanjun Xiang