Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ext_proc refactoring: Move stream object from Filter class to client #36228

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ExternalProcessorClient {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
virtual ExternalProcessorStream* stream() PURE;
virtual void setStream(ExternalProcessorStream* stream) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;
ExternalProcessorStream* stream() override { return stream_; }
void setStream(ExternalProcessorStream* stream) override { stream_ = stream; }

private:
Grpc::AsyncClientManager& client_manager_;
Stats::Scope& scope_;
// The gRPC stream to the external processor, which will be opened
// when it's time to send the first message.
ExternalProcessorStream* stream_ = nullptr;
};

class ExternalProcessorStreamImpl : public ExternalProcessorStream,
Expand Down
43 changes: 23 additions & 20 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ Filter::StreamOpenState Filter::openStream() {
ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream");
return StreamOpenState::IgnoreError;
}
if (!stream_) {
if (!client_->stream()) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");

Http::AsyncClient::ParentContext grpc_context;
Expand All @@ -351,40 +351,42 @@ Filter::StreamOpenState Filter::openStream() {

if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
// Asserts that `stream_` is nullptr since it is not valid to be used any further
// Asserts that `stream_object` is nullptr since it is not valid to be used any further
// beyond this point.
ASSERT(stream_object == nullptr);
return sent_immediate_response_ ? StreamOpenState::Error : StreamOpenState::IgnoreError;
}
stats_.streams_started_.inc();

stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(),
config_->deferredCloseTimeout());
auto* stream = config_->threadLocalStreamManager().store(
std::move(stream_object), config_->stats(), config_->deferredCloseTimeout());
client_->setStream(stream);
// For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not
// have a proper implementation of streamInfo.
if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) {
logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo());
logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo());
}
}
return StreamOpenState::Ok;
}

void Filter::closeStream() {
if (stream_) {
if (client_->stream()) {
ENVOY_LOG(debug, "Calling close on stream");
if (stream_->close()) {
if (client_->stream()->close()) {
stats_.streams_closed_.inc();
}
config_->threadLocalStreamManager().erase(stream_);
stream_ = nullptr;
config_->threadLocalStreamManager().erase(client_->stream());
client_->setStream(nullptr);
} else {
ENVOY_LOG(debug, "Stream already closed");
}
}

void Filter::deferredCloseStream() {
ENVOY_LOG(debug, "Calling deferred close on stream");
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
config_->threadLocalStreamManager().deferredErase(client_->stream(),
filter_callbacks_->dispatcher());
}

void Filter::onDestroy() {
Expand All @@ -402,8 +404,8 @@ void Filter::onDestroy() {
// closure is deferred upon filter destruction with a timer.

// First, release the referenced filter resource.
if (stream_ != nullptr) {
stream_->notifyFilterDestroy();
if (client_->stream() != nullptr) {
client_->stream()->notifyFilterDestroy();
}

// Second, perform stream deferred closure.
Expand Down Expand Up @@ -433,7 +435,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
state.setPaused(true);
return FilterHeadersStatus::StopIteration;
Expand Down Expand Up @@ -658,7 +660,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers
ProcessingRequest req =
buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true);
ENVOY_LOG(debug, "Sending headers message in observability mode");
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();

return FilterHeadersStatus::Continue;
Expand All @@ -683,7 +685,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat
// Set up the the body chunk and send.
auto req = setupBodyChunk(state, data, end_stream);
req.set_observability_mode(true);
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
ENVOY_LOG(debug, "Sending body message in ObservabilityMode");
} else if (state.bodyMode() != ProcessingMode::NONE) {
Expand Down Expand Up @@ -875,7 +877,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState
ProcessingRequest& req) {
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
new_state);
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
}

Expand All @@ -891,20 +893,21 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::TrailersCallback);
ENVOY_LOG(debug, "Sending trailers message");
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
}

void Filter::logGrpcStreamInfo() {
if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) {
const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter();
if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) {
const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter();
if (upstream_meter != nullptr) {
logging_info_->setBytesSent(upstream_meter->wireBytesSent());
logging_info_->setBytesReceived(upstream_meter->wireBytesReceived());
}
// Only set upstream host in logging info once.
if (logging_info_->upstreamHost() == nullptr) {
logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost());
logging_info_->setUpstreamHost(
client_->stream()->streamInfo().upstreamInfo()->upstreamHost());
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,6 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
DecodingProcessorState decoding_state_;
EncodingProcessorState encoding_state_;

// The gRPC stream to the external processor, which will be opened
// when it's time to send the first message.
ExternalProcessorStream* stream_ = nullptr;

// Set to true when no more messages need to be sent to the processor.
// This happens when the processor has closed the stream, or when it has
// failed.
Expand Down
8 changes: 7 additions & 1 deletion test/extensions/filters/http/ext_proc/mock_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

MockClient::MockClient() = default;
MockClient::MockClient() {
EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; }));

EXPECT_CALL(*this, setStream(testing::_))
.WillRepeatedly(
testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; }));
}
MockClient::~MockClient() = default;

MockStream::MockStream() = default;
Expand Down
4 changes: 4 additions & 0 deletions test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class MockClient : public ExternalProcessorClient {
(ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&,
const Envoy::Http::AsyncClient::StreamOptions&,
Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&));
MOCK_METHOD(ExternalProcessorStream*, stream, ());
MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream));

ExternalProcessorStream* stream_ = nullptr;
};

class MockStream : public ExternalProcessorStream {
Expand Down
4 changes: 4 additions & 0 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class OrderingTest : public testing::Test {
client_ = std::make_unique<MockClient>();
route_ = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(*client_, start(_, _, _, _)).WillRepeatedly(Invoke(this, &OrderingTest::doStart));
EXPECT_CALL(*client_, stream()).WillRepeatedly(Invoke([this]() { return stream_; }));
EXPECT_CALL(*client_, setStream(_))
.WillRepeatedly(Invoke([this](ExternalProcessorStream* stream) { stream_ = stream; }));
EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_));
Expand Down Expand Up @@ -202,6 +205,7 @@ class OrderingTest : public testing::Test {
}

std::unique_ptr<MockClient> client_;
ExternalProcessorStream* stream_ = nullptr;
MockStream stream_delegate_;
ExternalProcessorCallbacks* stream_callbacks_ = nullptr;
NiceMock<Stats::MockIsolatedStatsStore> stats_store_;
Expand Down

This file was deleted.

12 changes: 11 additions & 1 deletion test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ class MockStream : public ExternalProcessing::ExternalProcessorStream {

class MockClient : public ExternalProcessing::ExternalProcessorClient {
public:
MockClient() = default;
MockClient() {
EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; }));
EXPECT_CALL(*this, setStream(testing::_))
.WillRepeatedly(
testing::Invoke([this](ExternalProcessing::ExternalProcessorStream* stream) -> void {
stream_ = stream;
}));
}
~MockClient() override = default;

MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start,
(ExternalProcessing::ExternalProcessorCallbacks & callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Envoy::Http::AsyncClient::StreamOptions&,
Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&));
MOCK_METHOD(ExternalProcessing::ExternalProcessorStream*, stream, ());
MOCK_METHOD(void, setStream, (ExternalProcessing::ExternalProcessorStream * stream));
ExternalProcessing::ExternalProcessorStream* stream_ = nullptr;
};

} // namespace UnitTestFuzz
Expand Down