diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 605177c2c8df..9b6f86d018c6 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -163,6 +163,18 @@ enum class FilterMetadataStatus { Continue, }; +/** + * Return codes for onLocalReply filter invocations. + */ +enum class LocalErrorStatus { + // Continue sending the local reply after onLocalError has been sent to all filters. + Continue, + + // Continue sending onLocalReply to all filters, but reset the stream once all filters have been + // informed rather than sending the local reply. + ContinueAndResetStream, +}; + /** * The stream filter callbacks are passed to all filters to use for writing response data and * interacting with the underlying stream in general. @@ -596,6 +608,36 @@ class StreamFilterBase { * @param action the resulting match action */ virtual void onMatchCallback(const Matcher::Action&) {} + + struct LocalReplyData { + // The error code which (barring reset) will be sent to the client. + Http::Code code_; + // The details of why a local reply is being sent. + absl::string_view details_; + // True if a reset will occur rather than the local reply (some prior filter + // has returned ContinueAndResetStream) + bool reset_imminent_; + }; + + /** + * Called after sendLocalReply is called, and before any local reply is + * serialized either to filters, or downstream. + * This will be called on both encoder and decoder filters starting at the + * terminal filter (generally the router filter) and working towards the first filter configured. + * + * Note that in some circumstances, onLocalReply may be called more than once + * for a given stream, because it is possible that a filter call + * sendLocalReply while processing the original local reply response. + * + * Filters implementing onLocalReply are responsible for never calling sendLocalReply + * from onLocalReply, as that has the potential for looping. + * + * @param data data associated with the sendLocalReply call. + * @param LocalErrorStatus the action to take after onLocalError completes. + */ + virtual LocalErrorStatus onLocalReply(const LocalReplyData&) { + return LocalErrorStatus::Continue; + } }; /** diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 6caf33f7536f..3320e4e2b4b4 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -812,10 +812,23 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter, return std::next(filter->entry()); } +void FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) { + state_.under_on_local_reply_ = true; + filter_manager_callbacks_.onLocalReply(data.code_); + + for (auto entry : filters_) { + if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) { + data.reset_imminent_ = true; + } + } + state_.under_on_local_reply_ = false; +} + void FilterManager::sendLocalReply( bool old_was_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, const absl::optional grpc_status, absl::string_view details) { + ASSERT(!state_.under_on_local_reply_); const bool is_head_request = state_.is_head_request_; bool is_grpc_request = old_was_grpc_request; if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unify_grpc_handling")) { @@ -824,7 +837,14 @@ void FilterManager::sendLocalReply( stream_info_.setResponseCodeDetails(details); - filter_manager_callbacks_.onLocalReply(code); + StreamFilterBase::LocalReplyData data{code, details, false}; + FilterManager::onLocalReply(data); + if (data.reset_imminent_) { + ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this, + details); + filter_manager_callbacks_.resetStream(); + return; + } if (!filter_manager_callbacks_.responseHeaders().has_value()) { // If the response has not started at all, send the response through the filter chain. diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 42c176366e35..057d1617a1dd 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -771,6 +771,7 @@ class FilterManager : public ScopeTrackedObject, // Http::FilterChainFactoryCallbacks void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override { addStreamDecoderFilterWorker(filter, nullptr, false); + filters_.push_back(filter.get()); } void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter, Matcher::MatchTreeSharedPtr match_tree) override { @@ -787,6 +788,7 @@ class FilterManager : public ScopeTrackedObject, } void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override { addStreamEncoderFilterWorker(filter, nullptr, false); + filters_.push_back(filter.get()); } void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter, Matcher::MatchTreeSharedPtr match_tree) override { @@ -804,6 +806,8 @@ class FilterManager : public ScopeTrackedObject, void addStreamFilter(StreamFilterSharedPtr filter) override { addStreamDecoderFilterWorker(filter, nullptr, true); addStreamEncoderFilterWorker(filter, nullptr, true); + StreamDecoderFilter* decoder_filter = filter.get(); + filters_.push_back(decoder_filter); } void addStreamFilter(StreamFilterSharedPtr filter, Matcher::MatchTreeSharedPtr match_tree) override { @@ -921,6 +925,12 @@ class FilterManager : public ScopeTrackedObject, */ void maybeEndEncode(bool end_stream); + /** + * Called before local reply is made by the filter manager. + * @param data the data associated with the local reply. + */ + void onLocalReply(StreamFilterBase::LocalReplyData& data); + void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, const absl::optional grpc_status, @@ -1072,6 +1082,7 @@ class FilterManager : public ScopeTrackedObject, std::list decoder_filters_; std::list encoder_filters_; + std::list filters_; std::list access_log_handlers_; // Stores metadata added in the decoding filter that is being processed. Will be cleared before @@ -1121,7 +1132,7 @@ class FilterManager : public ScopeTrackedObject, State() : remote_complete_(false), local_complete_(false), has_continue_headers_(false), created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false), - non_100_response_headers_encoded_(false) {} + non_100_response_headers_encoded_(false), under_on_local_reply_(false) {} uint32_t filter_call_state_{0}; @@ -1139,6 +1150,8 @@ class FilterManager : public ScopeTrackedObject, bool is_grpc_request_ : 1; // Tracks if headers other than 100-Continue have been encoded to the codec. bool non_100_response_headers_encoded_ : 1; + // True under the stack of onLocalReply, false otherwise. + bool under_on_local_reply_ : 1; // The following 3 members are booleans rather than part of the space-saving bitfield as they // are passed as arguments to functions expecting bools. Extend State using the bitfield diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc index 3407d3c6faed..a951418e02e1 100644 --- a/test/common/http/conn_manager_impl_test_base.cc +++ b/test/common/http/conn_manager_impl_test_base.cc @@ -90,11 +90,11 @@ void HttpConnectionManagerImplTest::setupFilterChain(int num_decoder_filters, // NOTE: The length/repetition in this routine allows InSequence to work correctly in an outer // scope. for (int i = 0; i < num_decoder_filters * num_requests; i++) { - decoder_filters_.push_back(new MockStreamDecoderFilter()); + decoder_filters_.push_back(new NiceMock()); } for (int i = 0; i < num_encoder_filters * num_requests; i++) { - encoder_filters_.push_back(new MockStreamEncoderFilter()); + encoder_filters_.push_back(new NiceMock()); } InSequence s; diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index 2b18706e6733..2831ef14f97a 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -17,6 +17,7 @@ #include "gtest/gtest.h" +using testing::InSequence; using testing::Return; namespace Envoy { @@ -36,7 +37,7 @@ class FilterManagerTest : public testing::Test { Event::MockDispatcher dispatcher_; NiceMock connection_; Envoy::Http::MockFilterChainFactory filter_factory_; - LocalReply::MockLocalReply local_reply_; + NiceMock local_reply_; Protocol protocol_{Protocol::Http2}; NiceMock time_source_; StreamInfo::FilterStateSharedPtr filter_state_ = @@ -345,6 +346,108 @@ TEST_F(FilterManagerTest, MatchTreeFilterActionDualFilter) { filter_manager_->decodeHeaders(*grpc_headers, true); filter_manager_->destroyFilters(); } + +TEST_F(FilterManagerTest, OnLocalReply) { + initialize(); + + std::shared_ptr decoder_filter(new NiceMock()); + std::shared_ptr encoder_filter(new NiceMock()); + std::shared_ptr stream_filter(new NiceMock()); + + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + + ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers))); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(decoder_filter); + callbacks.addStreamFilter(stream_filter); + callbacks.addStreamEncoderFilter(encoder_filter); + })); + + filter_manager_->createFilterChain(); + filter_manager_->requestHeadersInitialized(); + filter_manager_->decodeHeaders(*headers, true); + + // Make sure all 3 filters get onLocalReply, and that the reset is preserved + // even if not the last return. + EXPECT_CALL(*decoder_filter, onLocalReply(_)); + EXPECT_CALL(*stream_filter, onLocalReply(_)) + .WillOnce(Return(LocalErrorStatus::ContinueAndResetStream)); + EXPECT_CALL(*encoder_filter, onLocalReply(_)); + EXPECT_CALL(filter_manager_callbacks_, resetStream()); + decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr, + absl::nullopt, "details"); + + // The reason for the response (in this case the reset) will still be tracked + // but as no response is sent the response code will remain absent. + ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value()); + EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details"); + EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value()); + + filter_manager_->destroyFilters(); +} + +TEST_F(FilterManagerTest, MultipleOnLocalReply) { + initialize(); + + std::shared_ptr decoder_filter(new NiceMock()); + std::shared_ptr encoder_filter(new NiceMock()); + std::shared_ptr stream_filter(new NiceMock()); + + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + + ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers))); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(decoder_filter); + callbacks.addStreamFilter(stream_filter); + callbacks.addStreamEncoderFilter(encoder_filter); + })); + + filter_manager_->createFilterChain(); + filter_manager_->requestHeadersInitialized(); + filter_manager_->decodeHeaders(*headers, true); + + { + // Set up expectations to be triggered by sendLocalReply at the bottom of + // this block. + InSequence s; + + // Make sure all 3 filters get onLocalReply + EXPECT_CALL(*decoder_filter, onLocalReply(_)); + EXPECT_CALL(*stream_filter, onLocalReply(_)); + EXPECT_CALL(*encoder_filter, onLocalReply(_)); + + // Now response encoding begins. Assume a filter co-opts the original reply + // with a new local reply. + EXPECT_CALL(*encoder_filter, encodeHeaders(_, _)) + .WillOnce(Invoke([&](ResponseHeaderMap&, bool) -> FilterHeadersStatus { + decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body2", nullptr, + absl::nullopt, "details2"); + return FilterHeadersStatus::StopIteration; + })); + + // All 3 filters should get the second onLocalReply. + EXPECT_CALL(*decoder_filter, onLocalReply(_)); + EXPECT_CALL(*stream_filter, onLocalReply(_)); + EXPECT_CALL(*encoder_filter, onLocalReply(_)); + + decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr, + absl::nullopt, "details"); + } + + // The final details should be details2. + ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value()); + EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details2"); + EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value()); + + filter_manager_->destroyFilters(); +} + } // namespace } // namespace Http } // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index 6e0d327fef55..06770eb2fc9d 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -357,6 +357,7 @@ envoy_cc_test( "//source/extensions/filters/http/buffer:config", "//source/extensions/filters/http/health_check:config", "//test/integration/filters:metadata_stop_all_filter_config_lib", + "//test/integration/filters:on_local_reply_filter_config_lib", "//test/integration/filters:request_metadata_filter_config_lib", "//test/integration/filters:response_metadata_filter_config_lib", "//test/integration/filters:set_response_code_filter_config_proto_cc_proto", diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 1d78984323f3..a1c89ea4996b 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -157,6 +157,20 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "on_local_reply_filter_config_lib", + srcs = [ + "on_local_reply_filter.cc", + ], + deps = [ + "//include/envoy/http:filter_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "passthrough_filter_config_lib", srcs = [ diff --git a/test/integration/filters/on_local_reply_filter.cc b/test/integration/filters/on_local_reply_filter.cc new file mode 100644 index 000000000000..2bb2923d3c73 --- /dev/null +++ b/test/integration/filters/on_local_reply_filter.cc @@ -0,0 +1,49 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" + +namespace Envoy { + +class OnLocalReplyFilter : public Http::PassThroughFilter { +public: + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { + if (!request_headers.get(Http::LowerCaseString("reset")).empty()) { + reset_ = true; + } + decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt, + "details"); + return Http::FilterHeadersStatus::StopIteration; + } + + Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override { + if (reset_) { + return Http::LocalErrorStatus::ContinueAndResetStream; + } + return Http::LocalErrorStatus::Continue; + } + + bool reset_{}; +}; + +class OnLocalReplyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { +public: + OnLocalReplyFilterConfig() : EmptyHttpFilterConfig("on-local-reply-filter") {} + Http::FilterFactoryCb createFilter(const std::string&, + Server::Configuration::FactoryContext&) override { + return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared<::Envoy::OnLocalReplyFilter>()); + }; + } +}; + +// perform static registration +static Registry::RegisterFactory + register_; +} // namespace Envoy diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 95ed212e7e2c..ebc6069eae10 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -1603,4 +1603,28 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) { EXPECT_EQ("200", response->headers().getStatusValue()); } +static std::string on_local_reply_filter = R"EOF( +name: on-local-reply-filter +typed_config: + "@type": type.googleapis.com/google.protobuf.Empty +)EOF"; + +TEST_P(Http2IntegrationTest, OnLocalReply) { + config_helper_.addFilter(on_local_reply_filter); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + { + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + } + { + default_request_headers_.addCopy("reset", "yes"); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + response->waitForReset(); + ASSERT_FALSE(response->complete()); + } +} + } // namespace Envoy diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 378aa1c61a2b..841213daacd6 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -322,6 +322,7 @@ class MockStreamDecoderFilter : public StreamDecoderFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (const LocalReplyData&)); // Http::StreamDecoderFilter MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream)); @@ -348,6 +349,7 @@ class MockStreamEncoderFilter : public StreamEncoderFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (const LocalReplyData&)); // Http::MockStreamEncoderFilter MOCK_METHOD(FilterHeadersStatus, encode100ContinueHeaders, (ResponseHeaderMap & headers)); @@ -370,6 +372,7 @@ class MockStreamFilter : public StreamFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (const LocalReplyData&)); // Http::StreamDecoderFilter MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream));