Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: adding an interface to inform filters of local replies #15172

Merged
merged 11 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ enum class FilterMetadataStatus {
Continue,
};

/**
* Return codes for onLocalReply filter invocations.
*/
enum class LocalErrorStatus {
// Continue sending the local reply after onLocalError has been sent to all filters.
Continue,

// Continue sending onLocalReply to all filters, but reset the stream once all filters have been
// informed rather than sending the local reply.
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some way of indicating to filters that the local reply is going to be reset? I can imagine filters wanting to do some mutation of the local reply in some cases, but it doesn't make sense for them to do this if a previous filter has triggered a reset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they can't mutate the local reply here, only from the actual sendlocalReply, but can't hurt to pass on that info

ContinueAndResetStream,
};

/**
* The stream filter callbacks are passed to all filters to use for writing response data and
* interacting with the underlying stream in general.
Expand Down Expand Up @@ -596,6 +608,27 @@ class StreamFilterBase {
* @param action the resulting match action
*/
virtual void onMatchCallback(const Matcher::Action&) {}

struct LocalReplyData {
Http::Code code_;
absl::string_view details_;
};

/**
* Called after sendLocalReply is called, and before any local reply is
* serialized either to filters, or downstream.
*
* Note that in rare circumstances, onLocalReply may be called more than once
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
* for a given stream, because it is possible that a filter call
* sendLocalReply while processing the original local reply response.
*
* Filters implementing onLocalReply are responsible for never calling sendLocalReply
* from onLocalReply, as that has the potential for looping.
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
*
* @param data data associated with the sendLocalReply call.
* @param LocalErrorStatus the action to take after onLocalError completes.
*/
virtual LocalErrorStatus onLocalReply(LocalReplyData&) { return LocalErrorStatus::Continue; }
};

/**
Expand Down
20 changes: 19 additions & 1 deletion source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter,
return std::next(filter->entry());
}

LocalErrorStatus FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) {
filter_manager_callbacks_.onLocalReply(data.code_);

LocalErrorStatus status = LocalErrorStatus::Continue;
for (auto entry : filters_) {
if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
status = LocalErrorStatus::ContinueAndResetStream;
}
}
return status;
}

void FilterManager::sendLocalReply(
bool old_was_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
Expand All @@ -824,7 +836,13 @@ void FilterManager::sendLocalReply(

stream_info_.setResponseCodeDetails(details);

filter_manager_callbacks_.onLocalReply(code);
StreamFilterBase::LocalReplyData data{code, details};
if (FilterManager::onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) {
ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this,
details);
Comment on lines +843 to +844
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can capture which filter is triggering the reset? Or let the filter that triggers a reset include information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we don't really for sendLocalReply logs either, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least with local replies we can infer it from the response code details, but yea I don't think this is necessary, just a nice to have

filter_manager_callbacks_.resetStream();
return;
}

if (!filter_manager_callbacks_.responseHeaders().has_value()) {
// If the response has not started at all, send the response through the filter chain.
Expand Down
12 changes: 12 additions & 0 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ class FilterManager : public ScopeTrackedObject,
// Http::FilterChainFactoryCallbacks
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, false);
filters_.push_back(filter.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document the order in which filters are given the callback? Seems like the order is first added to last added, regardless of whether it was a decoder or encoder filter. I think people might intuitively expect this to follow the decoder or encoder filter chains, but currently this can interleave between the two.

I think with the current impl the order doesn't matter at all, but I can imagine iterations of this feature (like my suggestion of capturing information about which filter is doing the reset) where it starts mattering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added to include.

}
void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -776,6 +777,7 @@ class FilterManager : public ScopeTrackedObject,
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override {
addStreamEncoderFilterWorker(filter, nullptr, false);
filters_.push_back(filter.get());
}
void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand All @@ -793,6 +795,8 @@ class FilterManager : public ScopeTrackedObject,
void addStreamFilter(StreamFilterSharedPtr filter) override {
addStreamDecoderFilterWorker(filter, nullptr, true);
addStreamEncoderFilterWorker(filter, nullptr, true);
StreamDecoderFilter* decoder_filter = filter.get();
filters_.push_back(decoder_filter);
}
void addStreamFilter(StreamFilterSharedPtr filter,
Matcher::MatchTreeSharedPtr<HttpMatchingData> match_tree) override {
Expand Down Expand Up @@ -910,6 +914,13 @@ class FilterManager : public ScopeTrackedObject,
*/
void maybeEndEncode(bool end_stream);

/**
* Called before local reply is made by the filter manager.
* @param data the data associated with the local reply.
* @param LocalErrorStatus the status from the filter chain.
*/
LocalErrorStatus onLocalReply(StreamFilterBase::LocalReplyData& data);

void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
Expand Down Expand Up @@ -1061,6 +1072,7 @@ class FilterManager : public ScopeTrackedObject,

std::list<ActiveStreamDecoderFilterPtr> decoder_filters_;
std::list<ActiveStreamEncoderFilterPtr> encoder_filters_;
std::list<StreamFilterBase*> filters_;
std::list<AccessLog::InstanceSharedPtr> access_log_handlers_;

// Stores metadata added in the decoding filter that is being processed. Will be cleared before
Expand Down
4 changes: 2 additions & 2 deletions test/common/http/conn_manager_impl_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ void HttpConnectionManagerImplTest::setupFilterChain(int num_decoder_filters,
// NOTE: The length/repetition in this routine allows InSequence to work correctly in an outer
// scope.
for (int i = 0; i < num_decoder_filters * num_requests; i++) {
decoder_filters_.push_back(new MockStreamDecoderFilter());
decoder_filters_.push_back(new NiceMock<MockStreamDecoderFilter>());
}

for (int i = 0; i < num_encoder_filters * num_requests; i++) {
encoder_filters_.push_back(new MockStreamEncoderFilter());
encoder_filters_.push_back(new NiceMock<MockStreamEncoderFilter>());
}

InSequence s;
Expand Down
105 changes: 104 additions & 1 deletion test/common/http/filter_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "gtest/gtest.h"

using testing::InSequence;
using testing::Return;

namespace Envoy {
Expand All @@ -36,7 +37,7 @@ class FilterManagerTest : public testing::Test {
Event::MockDispatcher dispatcher_;
NiceMock<Network::MockConnection> connection_;
Envoy::Http::MockFilterChainFactory filter_factory_;
LocalReply::MockLocalReply local_reply_;
NiceMock<LocalReply::MockLocalReply> local_reply_;
Protocol protocol_{Protocol::Http2};
NiceMock<MockTimeSystem> time_source_;
StreamInfo::FilterStateSharedPtr filter_state_ =
Expand Down Expand Up @@ -345,6 +346,108 @@ TEST_F(FilterManagerTest, MatchTreeFilterActionDualFilter) {
filter_manager_->decodeHeaders(*grpc_headers, true);
filter_manager_->destroyFilters();
}

TEST_F(FilterManagerTest, OnLocalReply) {
initialize();

std::shared_ptr<MockStreamDecoderFilter> decoder_filter(new NiceMock<MockStreamDecoderFilter>());
std::shared_ptr<MockStreamEncoderFilter> encoder_filter(new NiceMock<MockStreamEncoderFilter>());
std::shared_ptr<MockStreamFilter> stream_filter(new NiceMock<MockStreamFilter>());

RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};

ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers)));

EXPECT_CALL(filter_factory_, createFilterChain(_))
.WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(decoder_filter);
callbacks.addStreamFilter(stream_filter);
callbacks.addStreamEncoderFilter(encoder_filter);
}));

filter_manager_->createFilterChain();
filter_manager_->requestHeadersInitialized();
filter_manager_->decodeHeaders(*headers, true);

// Make sure all 3 filters get onLocalReply, and that the reset is preserved
// even if not the last return.
EXPECT_CALL(*decoder_filter, onLocalReply(_));
EXPECT_CALL(*stream_filter, onLocalReply(_))
.WillOnce(Return(LocalErrorStatus::ContinueAndResetStream));
EXPECT_CALL(*encoder_filter, onLocalReply(_));
EXPECT_CALL(filter_manager_callbacks_, resetStream());
decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr,
absl::nullopt, "details");

// The reason for the response (in this case the reset) will still be tracked
// but as no response is sent the response code will remain absent.
ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value());
EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details");
EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value());

filter_manager_->destroyFilters();
}

TEST_F(FilterManagerTest, MultipleOnLocalReply) {
initialize();

std::shared_ptr<MockStreamDecoderFilter> decoder_filter(new NiceMock<MockStreamDecoderFilter>());
std::shared_ptr<MockStreamEncoderFilter> encoder_filter(new NiceMock<MockStreamEncoderFilter>());
std::shared_ptr<MockStreamFilter> stream_filter(new NiceMock<MockStreamFilter>());

RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};

ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers)));

EXPECT_CALL(filter_factory_, createFilterChain(_))
.WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(decoder_filter);
callbacks.addStreamFilter(stream_filter);
callbacks.addStreamEncoderFilter(encoder_filter);
}));

filter_manager_->createFilterChain();
filter_manager_->requestHeadersInitialized();
filter_manager_->decodeHeaders(*headers, true);

{
// Set up expectations to be triggered by sendLocalReply at the bottom of
// this block.
InSequence s;

// Make sure all 3 filters get onLocalReply
EXPECT_CALL(*decoder_filter, onLocalReply(_));
EXPECT_CALL(*stream_filter, onLocalReply(_));
EXPECT_CALL(*encoder_filter, onLocalReply(_));

// Now response encoding begins. Assume a filter co-opts the original reply
// with a new local reply.
EXPECT_CALL(*encoder_filter, encodeHeaders(_, _))
.WillOnce(Invoke([&](ResponseHeaderMap&, bool) -> FilterHeadersStatus {
decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body2", nullptr,
absl::nullopt, "details2");
return FilterHeadersStatus::StopIteration;
}));

// All 3 filters should get the second onLocalReply.
EXPECT_CALL(*decoder_filter, onLocalReply(_));
EXPECT_CALL(*stream_filter, onLocalReply(_));
EXPECT_CALL(*encoder_filter, onLocalReply(_));

decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr,
absl::nullopt, "details");
}

// The final details should be details2.
ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value());
EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details2");
EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value());

filter_manager_->destroyFilters();
}

} // namespace
} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ envoy_cc_test(
"//source/extensions/filters/http/buffer:config",
"//source/extensions/filters/http/health_check:config",
"//test/integration/filters:metadata_stop_all_filter_config_lib",
"//test/integration/filters:on_local_reply_filter_config_lib",
"//test/integration/filters:request_metadata_filter_config_lib",
"//test/integration/filters:response_metadata_filter_config_lib",
"//test/integration/filters:set_response_code_filter_config_proto_cc_proto",
Expand Down
14 changes: 14 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "on_local_reply_filter_config_lib",
srcs = [
"on_local_reply_filter.cc",
],
deps = [
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "passthrough_filter_config_lib",
srcs = [
Expand Down
49 changes: 49 additions & 0 deletions test/integration/filters/on_local_reply_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"

namespace Envoy {

class OnLocalReplyFilter : public Http::PassThroughFilter {
public:
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override {
if (!request_headers.get(Http::LowerCaseString("reset")).empty()) {
reset_ = true;
}
decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt,
"details");
return Http::FilterHeadersStatus::StopIteration;
}

Http::LocalErrorStatus onLocalReply(LocalReplyData&) override {
if (reset_) {
return Http::LocalErrorStatus::ContinueAndResetStream;
}
return Http::LocalErrorStatus::Continue;
}

bool reset_{};
};

class OnLocalReplyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
public:
OnLocalReplyFilterConfig() : EmptyHttpFilterConfig("on-local-reply-filter") {}
Http::FilterFactoryCb createFilter(const std::string&,
Server::Configuration::FactoryContext&) override {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<::Envoy::OnLocalReplyFilter>());
};
}
};

// perform static registration
static Registry::RegisterFactory<OnLocalReplyFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
} // namespace Envoy
24 changes: 24 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1603,4 +1603,28 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) {
EXPECT_EQ("200", response->headers().getStatusValue());
}

static std::string on_local_reply_filter = R"EOF(
name: on-local-reply-filter
typed_config:
"@type": type.googleapis.com/google.protobuf.Empty
)EOF";

TEST_P(Http2IntegrationTest, OnLocalReply) {
config_helper_.addFilter(on_local_reply_filter);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
{
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
response->waitForEndStream();
ASSERT_TRUE(response->complete());
}
{
default_request_headers_.addCopy("reset", "yes");
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
response->waitForReset();
ASSERT_FALSE(response->complete());
}
}

} // namespace Envoy
3 changes: 3 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ class MockStreamDecoderFilter : public StreamDecoderFilter {
MOCK_METHOD(void, onStreamComplete, ());
MOCK_METHOD(void, onDestroy, ());
MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&));
MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&));

// Http::StreamDecoderFilter
MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream));
Expand All @@ -348,6 +349,7 @@ class MockStreamEncoderFilter : public StreamEncoderFilter {
MOCK_METHOD(void, onStreamComplete, ());
MOCK_METHOD(void, onDestroy, ());
MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&));
MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&));

// Http::MockStreamEncoderFilter
MOCK_METHOD(FilterHeadersStatus, encode100ContinueHeaders, (ResponseHeaderMap & headers));
Expand All @@ -370,6 +372,7 @@ class MockStreamFilter : public StreamFilter {
MOCK_METHOD(void, onStreamComplete, ());
MOCK_METHOD(void, onDestroy, ());
MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&));
MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&));

// Http::StreamDecoderFilter
MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream));
Expand Down