From cec0e269637b6e05c30855ba1b42287ab509da59 Mon Sep 17 00:00:00 2001 From: phlax Date: Wed, 25 Sep 2024 20:45:57 +0100 Subject: [PATCH 1/6] ci: Boost arm build machines (#36339) this is an interim measure to reduce build times and mittigate VM overload, pending Engflow arm RBE pools coming online after discussion we felt this was justified as without it we are likely to get jobs being retested, and larger machines should reduce the build times some Signed-off-by: Ryan Northey --- .github/workflows/_precheck_publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/_precheck_publish.yml b/.github/workflows/_precheck_publish.yml index 5276446302ad..86181a7bc3d6 100644 --- a/.github/workflows/_precheck_publish.yml +++ b/.github/workflows/_precheck_publish.yml @@ -56,7 +56,7 @@ jobs: --config=cache-envoy-engflow --config=bes-envoy-engflow rbe: false - runs-on: envoy-arm64-medium + runs-on: envoy-arm64-large timeout-minutes: 180 - target: docs name: Docs From 07a8c4afe8ac83632535bd118f142df70d2335be Mon Sep 17 00:00:00 2001 From: Greg Greenway Date: Wed, 25 Sep 2024 14:30:33 -0700 Subject: [PATCH 2/6] http: add config for max response header size (#36231) Signed-off-by: Greg Greenway --- api/envoy/config/core/v3/protocol.proto | 25 +++++-- .../v3/http_connection_manager.proto | 4 ++ changelogs/current.yaml | 4 ++ envoy/upstream/upstream.h | 5 ++ source/common/http/codec_client.cc | 8 ++- source/common/http/http1/codec_impl.cc | 4 +- source/common/http/http1/codec_impl.h | 1 + source/common/protobuf/utility.h | 6 ++ source/common/upstream/upstream_impl.cc | 2 + source/common/upstream/upstream_impl.h | 4 ++ .../network/http_connection_manager/config.cc | 6 ++ test/common/http/codec_impl_fuzz_test.cc | 2 +- test/common/http/http1/codec_impl_test.cc | 37 ++++++++-- .../http/http1/http1_connection_fuzz_test.cc | 2 +- .../http_connection_manager/config_test.cc | 21 ++++++ test/integration/http_integration.cc | 72 ++++++++++++++++++- test/integration/http_integration.h | 16 +++-- test/integration/overload_integration_test.cc | 6 +- test/integration/protocol_integration_test.cc | 43 ++++++++++- .../integration/quic_http_integration_test.cc | 11 +-- test/mocks/upstream/cluster_info.cc | 7 ++ test/mocks/upstream/cluster_info.h | 1 + 22 files changed, 257 insertions(+), 30 deletions(-) diff --git a/api/envoy/config/core/v3/protocol.proto b/api/envoy/config/core/v3/protocol.proto index eda87d9408d5..e566278600ab 100644 --- a/api/envoy/config/core/v3/protocol.proto +++ b/api/envoy/config/core/v3/protocol.proto @@ -209,7 +209,7 @@ message AlternateProtocolsCacheOptions { repeated string canonical_suffixes = 5; } -// [#next-free-field: 7] +// [#next-free-field: 8] message HttpProtocolOptions { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.core.HttpProtocolOptions"; @@ -259,11 +259,28 @@ message HttpProtocolOptions { // `. google.protobuf.Duration max_connection_duration = 3; - // The maximum number of headers. If unconfigured, the default - // maximum number of request headers allowed is 100. Requests that exceed this limit will receive - // a 431 response for HTTP/1.x and cause a stream reset for HTTP/2. + // The maximum number of headers (request headers if configured on HttpConnectionManager, + // response headers when configured on a cluster). + // If unconfigured, the default maximum number of headers allowed is 100. + // Downstream requests that exceed this limit will receive a 431 response for HTTP/1.x and cause a stream + // reset for HTTP/2. + // Upstream responses that exceed this limit will result in a 503 response. google.protobuf.UInt32Value max_headers_count = 2 [(validate.rules).uint32 = {gte: 1}]; + // The maximum size of response headers. + // If unconfigured, the default is 60 KiB, except for HTTP/1 response headers which have a default + // of 80KiB. + // Responses that exceed this limit will result in a 503 response. + // In Envoy, this setting is only valid when configured on an upstream cluster, not on the + // :ref:`HTTP Connection Manager + // `. + // + // Note: currently some protocol codecs impose limits on the maximum size of a single header: + // HTTP/2 (when using nghttp2) limits a single header to around 100kb. + // HTTP/3 limits a single header to around 1024kb. + google.protobuf.UInt32Value max_response_headers_kb = 7 + [(validate.rules).uint32 = {lte: 8192 gt: 0}]; + // Total duration to keep alive an HTTP request/response stream. If the time limit is reached the stream will be // reset independent of any other timeouts. If not specified, this value is not set. google.protobuf.Duration max_stream_duration = 4; diff --git a/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto b/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto index 4cbbbc20d3fb..3d438ae87881 100644 --- a/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto +++ b/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto @@ -494,6 +494,10 @@ message HttpConnectionManager { // The maximum request headers size for incoming connections. // If unconfigured, the default max request headers allowed is 60 KiB. // Requests that exceed this limit will receive a 431 response. + // + // Note: currently some protocol codecs impose limits on the maximum size of a single header: + // HTTP/2 (when using nghttp2) limits a single header to around 100kb. + // HTTP/3 limits a single header to around 1024kb. google.protobuf.UInt32Value max_request_headers_kb = 29 [(validate.rules).uint32 = {lte: 8192 gt: 0}]; diff --git a/changelogs/current.yaml b/changelogs/current.yaml index dc5877b45824..cb7fb236024a 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -292,6 +292,10 @@ new_features: change: | Added full feature absl::FormatTime() support to the DateFormatter. This allows the timepoint formatters (like ``%START_TIME%``) to use ``%E#S``, ``%E*S``, ``%E#f`` and ``%E*f`` to format the subsecond part of the timepoint. +- area: http + change: | + Added configuration setting for the :ref:`maximum size of response headers + ` in responses. - area: http_11_proxy change: | Added the option to configure the transport socket via locality or endpoint metadata. diff --git a/envoy/upstream/upstream.h b/envoy/upstream/upstream.h index e307de9471f7..ca704d02fdb6 100644 --- a/envoy/upstream/upstream.h +++ b/envoy/upstream/upstream.h @@ -1068,6 +1068,11 @@ class ClusterInfo : public Http::FilterChainFactory { */ virtual uint32_t maxResponseHeadersCount() const PURE; + /** + * @return uint32_t the maximum total size of response headers in KB. + */ + virtual absl::optional maxResponseHeadersKb() const PURE; + /** * @return the human readable name of the cluster. */ diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index 2f74974d983d..77fb57a814ea 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -282,13 +282,14 @@ CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& } codec_ = std::make_unique( *connection_, host->cluster().http1CodecStats(), *this, host->cluster().http1Settings(), - host->cluster().maxResponseHeadersCount(), proxied); + host->cluster().maxResponseHeadersKb(), host->cluster().maxResponseHeadersCount(), proxied); break; } case CodecType::HTTP2: codec_ = std::make_unique( *connection_, *this, host->cluster().http2CodecStats(), random_generator, - host->cluster().http2Options(), Http::DEFAULT_MAX_REQUEST_HEADERS_KB, + host->cluster().http2Options(), + host->cluster().maxResponseHeadersKb().value_or(Http::DEFAULT_MAX_REQUEST_HEADERS_KB), host->cluster().maxResponseHeadersCount(), Http2::ProdNghttp2SessionFactory::get()); break; case CodecType::HTTP3: { @@ -296,7 +297,8 @@ CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& auto& quic_session = dynamic_cast(*connection_); codec_ = std::make_unique( quic_session, *this, host->cluster().http3CodecStats(), host->cluster().http3Options(), - Http::DEFAULT_MAX_REQUEST_HEADERS_KB, host->cluster().maxResponseHeadersCount()); + host->cluster().maxResponseHeadersKb().value_or(Http::DEFAULT_MAX_REQUEST_HEADERS_KB), + host->cluster().maxResponseHeadersCount()); // Initialize the session after max request header size is changed in above http client // connection creation. quic_session.Initialize(); diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 5e054058a996..92fb58bb1a40 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -1428,9 +1428,11 @@ void ServerConnectionImpl::ActiveRequest::dumpState(std::ostream& os, int indent ClientConnectionImpl::ClientConnectionImpl(Network::Connection& connection, CodecStats& stats, ConnectionCallbacks&, const Http1Settings& settings, + absl::optional max_response_headers_kb, const uint32_t max_response_headers_count, bool passing_through_proxy) - : ConnectionImpl(connection, stats, settings, MessageType::Response, MAX_RESPONSE_HEADERS_KB, + : ConnectionImpl(connection, stats, settings, MessageType::Response, + max_response_headers_kb.value_or(MAX_RESPONSE_HEADERS_KB), max_response_headers_count), owned_output_buffer_(connection.dispatcher().getWatermarkFactory().createBuffer( [&]() -> void { this->onBelowLowWatermark(); }, diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 26e3bfe82d9f..875a9c6d99d6 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -590,6 +590,7 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { public: ClientConnectionImpl(Network::Connection& connection, CodecStats& stats, ConnectionCallbacks& callbacks, const Http1Settings& settings, + absl::optional max_response_headers_kb, const uint32_t max_response_headers_count, bool passing_through_proxy = false); // Http::ClientConnection diff --git a/source/common/protobuf/utility.h b/source/common/protobuf/utility.h index 501cc723a872..2d427e7d9e7a 100644 --- a/source/common/protobuf/utility.h +++ b/source/common/protobuf/utility.h @@ -23,6 +23,12 @@ #define PROTOBUF_GET_WRAPPED_OR_DEFAULT(message, field_name, default_value) \ ((message).has_##field_name() ? (message).field_name().value() : (default_value)) +// Obtain the value of a wrapped field (e.g. google.protobuf.UInt32Value) if set. Otherwise, return +// absl::nullopt. +#define PROTOBUF_GET_OPTIONAL_WRAPPED(message, field_name) \ + ((message).has_##field_name() ? absl::make_optional((message).field_name().value()) \ + : absl::nullopt) + // Obtain the value of a wrapped field (e.g. google.protobuf.UInt32Value) if set. Otherwise, throw // a EnvoyException. diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index bfee8e6b2baa..195d2ba59acc 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -1229,6 +1229,8 @@ ClusterInfoImpl::ClusterInfoImpl( http_protocol_options_->common_http_protocol_options_, max_headers_count, runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey, Http::DEFAULT_MAX_HEADERS_COUNT))), + max_response_headers_kb_(PROTOBUF_GET_OPTIONAL_WRAPPED( + http_protocol_options_->common_http_protocol_options_, max_response_headers_kb)), type_(config.type()), drain_connections_on_host_removal_(config.ignore_health_on_host_removal()), connection_pool_per_downstream_connection_( diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 47d32468571d..a3d43f7b969e 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -913,6 +913,9 @@ class ClusterInfoImpl : public ClusterInfo, bool maintenanceMode() const override; uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; } uint32_t maxResponseHeadersCount() const override { return max_response_headers_count_; } + absl::optional maxResponseHeadersKb() const override { + return max_response_headers_kb_; + } const std::string& name() const override { return name_; } const std::string& observabilityName() const override { if (observability_name_ != nullptr) { @@ -1126,6 +1129,7 @@ class ClusterInfoImpl : public ClusterInfo, // overhead via alignment const uint32_t per_connection_buffer_limit_bytes_; const uint32_t max_response_headers_count_; + const absl::optional max_response_headers_kb_; const envoy::config::cluster::v3::Cluster::DiscoveryType type_; const bool drain_connections_on_host_removal_ : 1; const bool connection_pool_per_downstream_connection_ : 1; diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index 563ece44c917..757c2439d422 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -443,6 +443,12 @@ HttpConnectionManagerConfig::HttpConnectionManagerConfig( idle_timeout_ = absl::nullopt; } + if (config.common_http_protocol_options().has_max_response_headers_kb()) { + creation_status = absl::InvalidArgumentError( + fmt::format("Error: max_response_headers_kb cannot be set on http_connection_manager.")); + return; + } + if (config.strip_any_host_port() && config.strip_matching_host_port()) { creation_status = absl::InvalidArgumentError(fmt::format( "Error: Only one of `strip_matching_host_port` or `strip_any_host_port` can be set.")); diff --git a/test/common/http/codec_impl_fuzz_test.cc b/test/common/http/codec_impl_fuzz_test.cc index 8554be417b01..bca32e84542e 100644 --- a/test/common/http/codec_impl_fuzz_test.cc +++ b/test/common/http/codec_impl_fuzz_test.cc @@ -616,7 +616,7 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi } else { client = std::make_unique( client_connection, Http1::CodecStats::atomicGet(http1_stats, scope), client_callbacks, - client_http1settings, max_response_headers_count); + client_http1settings, max_request_headers_kb, max_response_headers_count); } if (http2) { diff --git a/test/common/http/http1/codec_impl_test.cc b/test/common/http/http1/codec_impl_test.cc index 1fca99b17e7b..0dca1c8abc16 100644 --- a/test/common/http/http1/codec_impl_test.cc +++ b/test/common/http/http1/codec_impl_test.cc @@ -2544,7 +2544,8 @@ class Http1ClientConnectionImplTest : public Http1CodecTestBase { public: void initialize() { codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_, + connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_kb_, + max_response_headers_count_, /* passing_through_proxy=*/false); } @@ -2564,6 +2565,7 @@ class Http1ClientConnectionImplTest : public Http1CodecTestBase { protected: Stats::TestUtil::TestStore store_; uint32_t max_response_headers_count_{Http::DEFAULT_MAX_HEADERS_COUNT}; + uint32_t max_response_headers_kb_{Http::Http1::MAX_RESPONSE_HEADERS_KB}; }; void Http1ClientConnectionImplTest::testClientAllowChunkedContentLength( @@ -2571,8 +2573,9 @@ void Http1ClientConnectionImplTest::testClientAllowChunkedContentLength( // Response validation is not implemented in UHV yet #ifndef ENVOY_ENABLE_UHV codec_settings_.allow_chunked_length_ = allow_chunked_length; - codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_); + codec_ = std::make_unique(connection_, http1CodecStats(), callbacks_, + codec_settings_, max_response_headers_kb_, + max_response_headers_count_); NiceMock response_decoder; Http::RequestEncoder& request_encoder = codec_->newStream(response_decoder); @@ -3706,6 +3709,28 @@ TEST_P(Http1ClientConnectionImplTest, LargeResponseHeadersAccepted) { std::string long_header = "big: " + std::string(79 * 1024, 'q') + "\r\n"; buffer = Buffer::OwnedImpl(long_header); status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); +} + +// Tests that the size of response headers for HTTP/1 can be configured higher than the default of +// 80kB. +TEST_P(Http1ClientConnectionImplTest, LargeResponseHeadersAcceptedConfigurable) { + constexpr uint32_t size_limit_kb = 85; + max_response_headers_kb_ = size_limit_kb; + initialize(); + + NiceMock response_decoder; + Http::RequestEncoder& request_encoder = codec_->newStream(response_decoder); + TestRequestHeaderMapImpl headers{{":method", "GET"}, {":path", "/"}, {":authority", "host"}}; + EXPECT_TRUE(request_encoder.encodeHeaders(headers, true).ok()); + + Buffer::OwnedImpl buffer("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"); + auto status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); + std::string long_header = "big: " + std::string((size_limit_kb - 1) * 1024, 'q') + "\r\n"; + buffer = Buffer::OwnedImpl(long_header); + status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); } // Regression test for CVE-2019-18801. Large method headers should not trigger @@ -3792,6 +3817,7 @@ TEST_P(Http1ClientConnectionImplTest, ManyResponseHeadersAccepted) { // Response already contains one header. buffer = Buffer::OwnedImpl(createHeaderOrTrailerFragment(150) + "\r\n"); status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); } TEST_P(Http1ClientConnectionImplTest, TestResponseSplit0) { @@ -3821,8 +3847,9 @@ TEST_P(Http1ClientConnectionImplTest, TestResponseSplitAllowChunkedLength100) { TEST_P(Http1ClientConnectionImplTest, VerifyResponseHeaderTrailerMapMaxLimits) { codec_settings_.allow_chunked_length_ = true; codec_settings_.enable_trailers_ = true; - codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_); + codec_ = std::make_unique(connection_, http1CodecStats(), callbacks_, + codec_settings_, max_response_headers_kb_, + max_response_headers_count_); NiceMock response_decoder; Http::RequestEncoder& request_encoder = codec_->newStream(response_decoder); diff --git a/test/common/http/http1/http1_connection_fuzz_test.cc b/test/common/http/http1/http1_connection_fuzz_test.cc index 9dae7af6c7b9..37eb6fc64e4a 100644 --- a/test/common/http/http1/http1_connection_fuzz_test.cc +++ b/test/common/http/http1/http1_connection_fuzz_test.cc @@ -45,7 +45,7 @@ class Http1Harness { client_ = std::make_unique( mock_client_connection_, Http1::CodecStats::atomicGet(http1_stats_, *stats_store_.rootScope()), - mock_client_callbacks_, client_settings_, Http::DEFAULT_MAX_HEADERS_COUNT); + mock_client_callbacks_, client_settings_, absl::nullopt, Http::DEFAULT_MAX_HEADERS_COUNT); Status status = client_->dispatch(payload); } diff --git a/test/extensions/filters/network/http_connection_manager/config_test.cc b/test/extensions/filters/network/http_connection_manager/config_test.cc index 5f39f9935a21..06fa2cf5c194 100644 --- a/test/extensions/filters/network/http_connection_manager/config_test.cc +++ b/test/extensions/filters/network/http_connection_manager/config_test.cc @@ -996,6 +996,27 @@ TEST_F(HttpConnectionManagerConfigTest, MaxRequestHeaderCountConfigurable) { EXPECT_EQ(200, config.maxRequestHeadersCount()); } +// Check that max response header size is invalid on HCM. +TEST_F(HttpConnectionManagerConfigTest, MaxResponseHeaderKbInvalid) { + const std::string yaml_string = R"EOF( + stat_prefix: ingress_http + common_http_protocol_options: + max_response_headers_kb: 200 + route_config: + name: local_route + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + )EOF"; + + HttpConnectionManagerConfig config(parseHttpConnectionManagerFromYaml(yaml_string), context_, + date_provider_, route_config_provider_manager_, + &scoped_routes_config_provider_manager_, tracer_manager_, + filter_config_provider_manager_, creation_status_); + EXPECT_FALSE(creation_status_.ok()); +} + // Checking that default max_requests_per_connection is 0. TEST_F(HttpConnectionManagerConfigTest, DefaultMaxRequestPerConnection) { const std::string yaml_string = R"EOF( diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index d8bc506c181e..c640f452141b 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -266,6 +266,7 @@ IntegrationCodecClientPtr HttpIntegrationTest::makeHttpConnection(uint32_t port) IntegrationCodecClientPtr HttpIntegrationTest::makeRawHttpConnection( Network::ClientConnectionPtr&& conn, absl::optional http2_options, + absl::optional common_http_options, bool wait_till_connected) { std::shared_ptr cluster{new NiceMock()}; cluster->max_response_headers_count_ = 200; @@ -286,6 +287,10 @@ IntegrationCodecClientPtr HttpIntegrationTest::makeRawHttpConnection( cluster->http2_options_ = http2_options.value(); cluster->http1_settings_.enable_trailers_ = true; + if (common_http_options.has_value()) { + cluster->common_http_protocol_options_ = common_http_options.value(); + } + if (!disable_client_header_validation_) { cluster->header_validator_factory_ = IntegrationUtil::makeHeaderValidationFactory( ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig()); @@ -1448,7 +1453,72 @@ void HttpIntegrationTest::testLargeRequestHeaders(uint32_t size, uint32_t count, } else { IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(big_headers); RELEASE_ASSERT(response->waitForEndStream(timeout), - fmt::format("unexpected timeout after ", timeout.count(), " ms")); + fmt::format("unexpected timeout after {}ms", timeout.count())); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + } + if (count > max_count) { + EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("too_many_headers")); + } +} + +void HttpIntegrationTest::testLargeResponseHeaders(uint32_t size, uint32_t count, uint32_t max_size, + uint32_t max_count, + std::chrono::milliseconds timeout) { + autonomous_upstream_ = true; + useAccessLog("%RESPONSE_CODE_DETAILS%"); + // `size` parameter dictates the size of each header that will be added to the response and + // `count` parameter is the number of headers to be added. The actual request byte size will + // exceed `size` due to the keys and other headers. The actual request header count will exceed + // `count` by four due to default headers. + + config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + ConfigHelper::HttpProtocolOptions protocol_options; + auto* http_protocol_options = protocol_options.mutable_common_http_protocol_options(); + http_protocol_options->mutable_max_response_headers_kb()->set_value(max_size); + http_protocol_options->mutable_max_headers_count()->set_value(max_count); + + ConfigHelper::setProtocolOptions(*bootstrap.mutable_static_resources()->mutable_clusters(0), + protocol_options); + }); + + // This test is validating upstream response headers, but the test client will fail to receive the + // request from Envoy if its limits aren't increased. + envoy::config::core::v3::HttpProtocolOptions client_protocol_options; + client_protocol_options.mutable_max_response_headers_kb()->set_value(max_size); + client_protocol_options.mutable_max_headers_count()->set_value(max_count); + + Http::TestRequestHeaderMapImpl big_headers(default_response_headers_); + + // Already added four headers. + for (unsigned int i = 0; i < count; i++) { + big_headers.addCopy(std::to_string(i), std::string(size * 1024, 'a')); + } + + initialize(); + codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), absl::nullopt, + client_protocol_options); + reinterpret_cast(fake_upstreams_.front().get()) + ->setResponseHeaders(std::make_unique(big_headers)); + + if (size >= max_size || count > max_count) { + // header size includes keys too, so expect rejection when equal + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + + if (downstream_protocol_ == Http::CodecType::HTTP1) { + ASSERT_TRUE(codec_client_->waitForDisconnect()); + ASSERT_TRUE(response->complete()); + EXPECT_EQ("431", response->headers().getStatusValue()); + } else { + ASSERT_TRUE(response->waitForReset()); + codec_client_->close(); + } + } else { + IntegrationStreamDecoderPtr response = + codec_client_->makeHeaderOnlyRequest(default_request_headers_); + RELEASE_ASSERT(response->waitForEndStream(timeout), + fmt::format("unexpected timeout after {}ms", timeout.count())); EXPECT_TRUE(response->complete()); EXPECT_EQ("200", response->headers().getStatusValue()); } diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 06f2dd15a65c..55adabba2eb4 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -158,10 +158,12 @@ class HttpIntegrationTest : public BaseIntegrationTest { IntegrationCodecClientPtr makeHttpConnection(uint32_t port); // Makes a http connection object without checking its connected state. - virtual IntegrationCodecClientPtr - makeRawHttpConnection(Network::ClientConnectionPtr&& conn, - absl::optional http2_options, - bool wait_till_connected = true); + virtual IntegrationCodecClientPtr makeRawHttpConnection( + Network::ClientConnectionPtr&& conn, + absl::optional http2_options, + absl::optional common_http_options = + absl::nullopt, + bool wait_till_connected = true); // Makes a downstream network connection object based on client codec version. Network::ClientConnectionPtr makeClientConnectionWithOptions( uint32_t port, const Network::ConnectionSocket::OptionsSharedPtr& options) override; @@ -271,13 +273,13 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testRouterUpstreamResponseBeforeRequestComplete(uint32_t status_code = 0); void testTwoRequests(bool force_network_backup = false); - void testLargeHeaders(Http::TestRequestHeaderMapImpl request_headers, - Http::TestRequestTrailerMapImpl request_trailers, uint32_t size, - uint32_t max_size); void testLargeRequestUrl(uint32_t url_size, uint32_t max_headers_size); void testLargeRequestHeaders(uint32_t size, uint32_t count, uint32_t max_size = 60, uint32_t max_count = 100, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + void testLargeResponseHeaders(uint32_t size, uint32_t count, uint32_t max_size = 60, + uint32_t max_count = 100, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); void testLargeRequestTrailers(uint32_t size, uint32_t max_size = 60); void testManyRequestHeaders(std::chrono::milliseconds time = TestUtility::DefaultTimeout); diff --git a/test/integration/overload_integration_test.cc b/test/integration/overload_integration_test.cc index d3a62cdb2ff9..d5e808a88832 100644 --- a/test/integration/overload_integration_test.cc +++ b/test/integration/overload_integration_test.cc @@ -403,6 +403,7 @@ TEST_P(OverloadScaledTimerIntegrationTest, HTTP3CloseIdleHttpConnectionsDuringHa test_server_->waitForGaugeGe("overload.envoy.overload_actions.reduce_timeouts.scale_percent", 50); // Create an HTTP connection without finishing the handshake. codec_client_ = makeRawHttpConnection(makeClientConnection((lookupPort("http"))), absl::nullopt, + absl::nullopt, /*wait_till_connected=*/false); EXPECT_FALSE(codec_client_->connected()); @@ -418,8 +419,9 @@ TEST_P(OverloadScaledTimerIntegrationTest, HTTP3CloseIdleHttpConnectionsDuringHa 100); // Create another HTTP connection without finishing handshake. - IntegrationCodecClientPtr codec_client2 = makeRawHttpConnection( - makeClientConnection((lookupPort("http"))), absl::nullopt, /*wait_till_connected=*/false); + IntegrationCodecClientPtr codec_client2 = + makeRawHttpConnection(makeClientConnection((lookupPort("http"))), absl::nullopt, + absl::nullopt, /*wait_till_connected=*/false); EXPECT_FALSE(codec_client2->connected()); // Advancing past the minimum time and wait for the proxy to notice and close both connections. timeSystem().advanceTimeWait(std::chrono::seconds(3)); diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index 3625b0b58d51..921e5ba80925 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -2380,11 +2380,52 @@ TEST_P(DownstreamProtocolIntegrationTest, LargeRequestHeadersAccepted) { testLargeRequestHeaders(100, 1, 8192, 100); } -TEST_P(DownstreamProtocolIntegrationTest, ManyLargeRequestHeadersAccepted) { +TEST_P(ProtocolIntegrationTest, ManyLargeRequestHeadersAccepted) { // Send 70 headers each of size 100 kB with limit 8192 kB (8 MB) and 100 headers. testLargeRequestHeaders(100, 70, 8192, 100, TestUtility::DefaultTimeout); } +namespace { +uint32_t adjustMaxSingleHeaderSizeForCodecLimits(uint32_t size, + const HttpProtocolTestParams& params) { + if (params.http2_implementation == Http2Impl::Nghttp2 && + (params.downstream_protocol == Http::CodecType::HTTP2 || + params.upstream_protocol == Http::CodecType::HTTP2)) { + // nghttp2 has a hard-coded, unconfigurable limit of 64k for a header in it's header + // decompressor, so this test will always fail when using that codec. + // Reduce the size so that it can pass and receive some test coverage. + return 100; + } else if (params.downstream_protocol == Http::CodecType::HTTP3 || + params.upstream_protocol == Http::CodecType::HTTP3) { + // QUICHE has a hard-coded limit of 1024KiB in it's QPACK decoder. + // Reduce the size so that it can pass and receive some test coverage. + return 1023; + } + + return size; +} +} // namespace + +// Test a single header of the maximum allowed size. +TEST_P(ProtocolIntegrationTest, VeryLargeRequestHeadersAccepted) { + uint32_t size = adjustMaxSingleHeaderSizeForCodecLimits(8191, GetParam()); + + testLargeRequestHeaders(size, 1, 8192, 100, TestUtility::DefaultTimeout); +} + +// Test a single header of the maximum allowed size. +TEST_P(ProtocolIntegrationTest, ManyLargeResponseHeadersAccepted) { + // Send 70 headers each of size 100 kB with limit 8192 kB (8 MB) and 100 headers. + testLargeResponseHeaders(100, 70, 8192, 100, TestUtility::DefaultTimeout); +} + +// Test a single header of the maximum allowed size. +TEST_P(ProtocolIntegrationTest, VeryLargeResponseHeadersAccepted) { + uint32_t size = adjustMaxSingleHeaderSizeForCodecLimits(8191, GetParam()); + + testLargeResponseHeaders(size, 1, 8192, 100, TestUtility::DefaultTimeout); +} + TEST_P(DownstreamProtocolIntegrationTest, ManyRequestHeadersRejected) { // Send 101 empty headers with limit 60 kB and 100 headers. testLargeRequestHeaders(0, 101, 60, 80); diff --git a/test/integration/quic_http_integration_test.cc b/test/integration/quic_http_integration_test.cc index efc3e6b9224e..8be734d4747b 100644 --- a/test/integration/quic_http_integration_test.cc +++ b/test/integration/quic_http_integration_test.cc @@ -251,12 +251,15 @@ class QuicHttpIntegrationTestBase : public HttpIntegrationTest { return session; } - IntegrationCodecClientPtr - makeRawHttpConnection(Network::ClientConnectionPtr&& conn, - absl::optional http2_options, - bool wait_till_connected = true) override { + IntegrationCodecClientPtr makeRawHttpConnection( + Network::ClientConnectionPtr&& conn, + absl::optional http2_options, + absl::optional common_http_options = + absl::nullopt, + bool wait_till_connected = true) override { ENVOY_LOG(debug, "Creating a new client {}", conn->connectionInfoProvider().localAddress()->asStringView()); + ASSERT(!common_http_options.has_value(), "Not implemented"); return makeRawHttp3Connection(std::move(conn), http2_options, wait_till_connected); } diff --git a/test/mocks/upstream/cluster_info.cc b/test/mocks/upstream/cluster_info.cc index 380c7a7c8bcd..8177186b0b4e 100644 --- a/test/mocks/upstream/cluster_info.cc +++ b/test/mocks/upstream/cluster_info.cc @@ -96,6 +96,13 @@ MockClusterInfo::MockClusterInfo() ON_CALL(*this, extensionProtocolOptions(_)).WillByDefault(Return(extension_protocol_options_)); ON_CALL(*this, maxResponseHeadersCount()) .WillByDefault(ReturnPointee(&max_response_headers_count_)); + ON_CALL(*this, maxResponseHeadersKb()).WillByDefault(Invoke([this]() -> absl::optional { + if (common_http_protocol_options_.has_max_response_headers_kb()) { + return common_http_protocol_options_.max_response_headers_kb().value(); + } else { + return absl::nullopt; + } + })); ON_CALL(*this, maxRequestsPerConnection()) .WillByDefault(ReturnPointee(&max_requests_per_connection_)); ON_CALL(*this, trafficStats()).WillByDefault(ReturnRef(traffic_stats_)); diff --git a/test/mocks/upstream/cluster_info.h b/test/mocks/upstream/cluster_info.h index 8f5a72973f49..978f9c5826aa 100644 --- a/test/mocks/upstream/cluster_info.h +++ b/test/mocks/upstream/cluster_info.h @@ -128,6 +128,7 @@ class MockClusterInfo : public ClusterInfo { (const)); MOCK_METHOD(bool, maintenanceMode, (), (const)); MOCK_METHOD(uint32_t, maxResponseHeadersCount, (), (const)); + MOCK_METHOD(absl::optional, maxResponseHeadersKb, (), (const)); MOCK_METHOD(uint64_t, maxRequestsPerConnection, (), (const)); MOCK_METHOD(const std::string&, name, (), (const)); MOCK_METHOD(const std::string&, observabilityName, (), (const)); From 029e8080bd9fe5f254f7783881f619c0cb95ed4c Mon Sep 17 00:00:00 2001 From: Xuyang Tao Date: Wed, 25 Sep 2024 21:46:56 +0000 Subject: [PATCH 3/6] cel: add response.backend_latency in CEL attributes (#36292) We will depend on CEL to get those information inside HTTP wasm - unit tests added - doc added Signed-off-by: Xuyang Tao --- docs/root/intro/arch_overview/advanced/attributes.rst | 1 + source/extensions/filters/common/expr/context.cc | 9 +++++++++ source/extensions/filters/common/expr/context.h | 2 ++ test/extensions/filters/common/expr/context_test.cc | 8 ++++++++ 4 files changed, 20 insertions(+) diff --git a/docs/root/intro/arch_overview/advanced/attributes.rst b/docs/root/intro/arch_overview/advanced/attributes.rst index eae665abf3b5..f3f8e6fb84c0 100644 --- a/docs/root/intro/arch_overview/advanced/attributes.rst +++ b/docs/root/intro/arch_overview/advanced/attributes.rst @@ -93,6 +93,7 @@ Response attributes are only available after the request completes. response.trailers, "map", All response trailers indexed by the lower-cased trailer name response.size, int, Size of the response body response.total_size, int, Total size of the response including the approximate uncompressed size of the headers and the trailers + response.backend_latency, duration, Duration between the first byte sent to and the last byte received from the upstream backend Connection attributes --------------------- diff --git a/source/extensions/filters/common/expr/context.cc b/source/extensions/filters/common/expr/context.cc index cc30fb794a95..10438dff4df2 100644 --- a/source/extensions/filters/common/expr/context.cc +++ b/source/extensions/filters/common/expr/context.cc @@ -184,6 +184,15 @@ absl::optional ResponseWrapper::operator[](CelValue key) const { return CelValue::CreateString(&details.value()); } return {}; + } else if (value == BackendLatency) { + Envoy::StreamInfo::TimingUtility timing(info_); + const auto last_upstream_rx_byte_received = timing.lastUpstreamRxByteReceived(); + const auto first_upstream_tx_byte_sent = timing.firstUpstreamTxByteSent(); + if (last_upstream_rx_byte_received.has_value() && first_upstream_tx_byte_sent.has_value()) { + return CelValue::CreateDuration(absl::FromChrono(last_upstream_rx_byte_received.value() - + first_upstream_tx_byte_sent.value())); + } + return {}; } return {}; } diff --git a/source/extensions/filters/common/expr/context.h b/source/extensions/filters/common/expr/context.h index 7e5f5d1defec..867776641ee2 100644 --- a/source/extensions/filters/common/expr/context.h +++ b/source/extensions/filters/common/expr/context.h @@ -8,6 +8,7 @@ #include "source/common/http/headers.h" #include "source/common/runtime/runtime_features.h" #include "source/common/singleton/const_singleton.h" +#include "source/common/stream_info/utility.h" #include "eval/public/cel_value.h" #include "eval/public/containers/container_backed_list_impl.h" @@ -47,6 +48,7 @@ constexpr absl::string_view CodeDetails = "code_details"; constexpr absl::string_view Trailers = "trailers"; constexpr absl::string_view Flags = "flags"; constexpr absl::string_view GrpcStatus = "grpc_status"; +constexpr absl::string_view BackendLatency = "backend_latency"; // Per-request or per-connection metadata constexpr absl::string_view Metadata = "metadata"; diff --git a/test/extensions/filters/common/expr/context_test.cc b/test/extensions/filters/common/expr/context_test.cc index 733c2413ce2c..c766d47e258d 100644 --- a/test/extensions/filters/common/expr/context_test.cc +++ b/test/extensions/filters/common/expr/context_test.cc @@ -411,6 +411,14 @@ TEST(Context, ResponseAttributes) { EXPECT_FALSE(value.has_value()); } + { + info.setUpstreamInfo(std::make_shared()); + StreamInfo::UpstreamTiming& upstream_timing = info.upstreamInfo()->upstreamTiming(); + upstream_timing.onFirstUpstreamTxByteSent(info.timeSource()); + upstream_timing.onLastUpstreamRxByteReceived(info.timeSource()); + EXPECT_TRUE(response[CelValue::CreateStringView(BackendLatency)].has_value()); + } + { Http::TestResponseHeaderMapImpl header_map{{header_name, "a"}, {grpc_status, "7"}}; Http::TestResponseTrailerMapImpl trailer_map{{trailer_name, "b"}}; From 41a378353dd465d9ed4963c2094d5db6b7c5b650 Mon Sep 17 00:00:00 2001 From: yanjunxiang-google <78807980+yanjunxiang-google@users.noreply.github.com> Date: Wed, 25 Sep 2024 19:28:36 -0400 Subject: [PATCH 4/6] Ext_proc refactoring: Move stream object from Filter class to client (#36228) This PR is part of the required refactoring needed to support HTTP client in ext_proc: https://github.com/envoyproxy/envoy/issues/35488 It is also to address a comment of https://github.com/envoyproxy/envoy/pull/35740#discussion_r1765336235 --------- Signed-off-by: Yanjun Xiang --- .../extensions/filters/http/ext_proc/client.h | 2 + .../filters/http/ext_proc/client_impl.h | 5 +++ .../filters/http/ext_proc/ext_proc.cc | 43 +++++++++--------- .../filters/http/ext_proc/ext_proc.h | 4 -- .../filters/http/ext_proc/mock_server.cc | 8 +++- .../filters/http/ext_proc/mock_server.h | 4 ++ .../http/ext_proc/unit_test_fuzz/BUILD | 13 +----- .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 14 ++++-- .../http/ext_proc/unit_test_fuzz/mocks.h | 44 ------------------- 9 files changed, 53 insertions(+), 84 deletions(-) delete mode 100644 test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 54493c094fe3..413bbcac7730 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -48,6 +48,8 @@ class ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE; + virtual ExternalProcessorStream* stream() PURE; + virtual void setStream(ExternalProcessorStream* stream) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 745bd3f167c8..8ef177cda00c 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -32,10 +32,15 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override; + ExternalProcessorStream* stream() override { return stream_; } + void setStream(ExternalProcessorStream* stream) override { stream_ = stream; } private: Grpc::AsyncClientManager& client_manager_; Stats::Scope& scope_; + // The gRPC stream to the external processor, which will be opened + // when it's time to send the first message. + ExternalProcessorStream* stream_ = nullptr; }; class ExternalProcessorStreamImpl : public ExternalProcessorStream, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 83dde06362e2..9175293ca18f 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -339,7 +339,7 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream"); return StreamOpenState::IgnoreError; } - if (!stream_) { + if (!client_->stream()) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); Http::AsyncClient::ParentContext grpc_context; @@ -354,32 +354,33 @@ Filter::StreamOpenState Filter::openStream() { if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called - // Asserts that `stream_` is nullptr since it is not valid to be used any further + // Asserts that `stream_object` is nullptr since it is not valid to be used any further // beyond this point. ASSERT(stream_object == nullptr); return sent_immediate_response_ ? StreamOpenState::Error : StreamOpenState::IgnoreError; } stats_.streams_started_.inc(); - stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), - config_->deferredCloseTimeout()); + ExternalProcessorStream* stream = config_->threadLocalStreamManager().store( + std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); + client_->setStream(stream); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); + logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo()); } } return StreamOpenState::Ok; } void Filter::closeStream() { - if (stream_) { + if (client_->stream()) { ENVOY_LOG(debug, "Calling close on stream"); - if (stream_->close()) { + if (client_->stream()->close()) { stats_.streams_closed_.inc(); } - config_->threadLocalStreamManager().erase(stream_); - stream_ = nullptr; + config_->threadLocalStreamManager().erase(client_->stream()); + client_->setStream(nullptr); } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -387,7 +388,8 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(client_->stream(), + filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -405,8 +407,8 @@ void Filter::onDestroy() { // closure is deferred upon filter destruction with a timer. // First, release the referenced filter resource. - if (stream_ != nullptr) { - stream_->notifyFilterDestroy(); + if (client_->stream() != nullptr) { + client_->stream()->notifyFilterDestroy(); } // Second, perform stream deferred closure. @@ -436,7 +438,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); state.setPaused(true); return FilterHeadersStatus::StopIteration; @@ -661,7 +663,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers ProcessingRequest req = buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true); ENVOY_LOG(debug, "Sending headers message in observability mode"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); return FilterHeadersStatus::Continue; @@ -686,7 +688,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat // Set up the the body chunk and send. auto req = setupBodyChunk(state, data, end_stream); req.set_observability_mode(true); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); ENVOY_LOG(debug, "Sending body message in ObservabilityMode"); } else if (state.bodyMode() != ProcessingMode::NONE) { @@ -878,7 +880,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState ProcessingRequest& req) { state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } @@ -894,20 +896,21 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::TrailersCallback); ENVOY_LOG(debug, "Sending trailers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } void Filter::logGrpcStreamInfo() { - if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); + if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter(); if (upstream_meter != nullptr) { logging_info_->setBytesSent(upstream_meter->wireBytesSent()); logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); } // Only set upstream host in logging info once. if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); + logging_info_->setUpstreamHost( + client_->stream()->streamInfo().upstreamInfo()->upstreamHost()); } } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index a908a1f6f775..3f52bba5094b 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -486,10 +486,6 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; - // The gRPC stream to the external processor, which will be opened - // when it's time to send the first message. - ExternalProcessorStream* stream_ = nullptr; - // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc index 25286be792c3..29637f793f83 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.cc +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -5,7 +5,13 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -MockClient::MockClient() = default; +MockClient::MockClient() { + EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); + + EXPECT_CALL(*this, setStream(testing::_)) + .WillRepeatedly( + testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; })); +} MockClient::~MockClient() = default; MockStream::MockStream() = default; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index d0b0389b0fd4..12c9d7308a93 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -17,6 +17,10 @@ class MockClient : public ExternalProcessorClient { (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); + MOCK_METHOD(ExternalProcessorStream*, stream, ()); + MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); + + ExternalProcessorStream* stream_ = nullptr; }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index f42d0981a65a..66a015876a44 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -1,7 +1,6 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_fuzz_test", - "envoy_cc_mock", "envoy_package", "envoy_proto_library", ) @@ -10,16 +9,6 @@ licenses(["notice"]) # Apache 2 envoy_package() -envoy_cc_mock( - name = "ext_proc_mocks", - hdrs = ["mocks.h"], - tags = ["skip_on_windows"], - deps = [ - "//source/extensions/filters/http/ext_proc:client_interface", - "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", - ], -) - envoy_proto_library( name = "ext_proc_unit_test_fuzz_proto", srcs = ["ext_proc_unit_test_fuzz.proto"], @@ -37,10 +26,10 @@ envoy_cc_fuzz_test( rbe_pool = "2core", tags = ["skip_on_windows"], deps = [ - ":ext_proc_mocks", ":ext_proc_unit_test_fuzz_proto_cc_proto", "//source/extensions/filters/http/ext_proc:config", "//test/extensions/filters/http/common/fuzz:http_filter_fuzzer_lib", + "//test/extensions/filters/http/ext_proc:mock_server_lib", "//test/mocks/http:http_mocks", "//test/mocks/network:network_mocks", "//test/mocks/server:server_factory_context_mocks", diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index df321fe20487..25c6b3ac2d76 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -1,8 +1,8 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" #include "test/extensions/filters/http/common/fuzz/http_filter_fuzzer.h" +#include "test/extensions/filters/http/ext_proc/mock_server.h" #include "test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.pb.validate.h" -#include "test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h" #include "test/fuzz/fuzz_runner.h" #include "test/mocks/http/mocks.h" #include "test/mocks/network/mocks.h" @@ -69,6 +69,14 @@ DEFINE_PROTO_FUZZER( return; } + // Limiting the max supported request body size to 128k. + if (input.request().has_proto_body()) { + const uint32_t max_body_size = 128 * 1024; + if (input.request().proto_body().message().value().size() > max_body_size) { + return; + } + } + static FuzzerMocks mocks; NiceMock stats_store; @@ -88,7 +96,7 @@ DEFINE_PROTO_FUZZER( return; } - MockClient* client = new MockClient(); + ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient(); std::unique_ptr filter = std::make_unique( config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service()); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); @@ -100,7 +108,7 @@ DEFINE_PROTO_FUZZER( const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) -> ExternalProcessing::ExternalProcessorStreamPtr { - auto stream = std::make_unique(); + auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)) .WillRepeatedly(Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&, bool) -> void { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h deleted file mode 100644 index 49ff067dd353..000000000000 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include "envoy/service/ext_proc/v3/external_processor.pb.h" - -#include "source/extensions/filters/http/ext_proc/client.h" - -#include "gmock/gmock.h" - -namespace Envoy { -namespace Extensions { -namespace HttpFilters { -namespace ExtProc { -namespace UnitTestFuzz { - -class MockStream : public ExternalProcessing::ExternalProcessorStream { -public: - MockStream() = default; - ~MockStream() override = default; - - MOCK_METHOD(void, send, - (envoy::service::ext_proc::v3::ProcessingRequest && request, bool end_stream)); - MOCK_METHOD(bool, close, ()); - MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); - MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); - MOCK_METHOD(void, notifyFilterDestroy, ()); -}; - -class MockClient : public ExternalProcessing::ExternalProcessorClient { -public: - MockClient() = default; - ~MockClient() override = default; - - MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, - (ExternalProcessing::ExternalProcessorCallbacks & callbacks, - const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const Envoy::Http::AsyncClient::StreamOptions&, - Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); -}; - -} // namespace UnitTestFuzz -} // namespace ExtProc -} // namespace HttpFilters -} // namespace Extensions -} // namespace Envoy From 1a153166a6d1e9336ee8982d1a00ba98655c9d39 Mon Sep 17 00:00:00 2001 From: yanjunxiang-google <78807980+yanjunxiang-google@users.noreply.github.com> Date: Wed, 25 Sep 2024 19:29:32 -0400 Subject: [PATCH 5/6] Ext_proc: Enable sending body without waiting for header response in STREAMED mode (#35850) --------- Signed-off-by: Yanjun Xiang --- .../filters/http/ext_proc/v3/ext_proc.proto | 18 +- .../ext_proc/v3/external_processor.proto | 8 +- changelogs/current.yaml | 4 + .../filters/http/ext_proc/ext_proc.cc | 46 ++- .../filters/http/ext_proc/ext_proc.h | 5 + .../filters/http/ext_proc/processor_state.cc | 117 ++++--- .../filters/http/ext_proc/processor_state.h | 7 + .../ext_proc/ext_proc_integration_test.cc | 168 +++++++++ .../filters/http/ext_proc/filter_test.cc | 322 ++++++++++++++++++ 9 files changed, 621 insertions(+), 74 deletions(-) diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index d1a27657f1b1..13a24ad9fcd7 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -271,18 +271,20 @@ message ExternalProcessor { // The default value is 5000 milliseconds (5 seconds) if not specified. google.protobuf.Duration deferred_close_timeout = 19; - // [#not-implemented-hide:] // Send body to the side stream server once it arrives without waiting for the header response from that server. // It only works for STREAMED body processing mode. For any other body processing modes, it is ignored. - // // The server has two options upon receiving a header request: - // 1. Instant Response: Send the header response as soon as the header request is received. - // 2. Delayed Response: Wait for the body before sending any response. - // If the server chooses the second option, it has two further choices: - // 2.1 Separate Responses: Send the header response first, followed by separate body responses. - // 2.2 Combined Response: Include both the header response and the first chunk of the body response - // in a single body response message, followed by the remaining body responses. + // + // 1. Instant Response: send the header response as soon as the header request is received. + // + // 2. Delayed Response: wait for the body before sending any response. + // // In all scenarios, the header-body ordering must always be maintained. + // + // If enabled Envoy will ignore the + // :ref:`mode_override ` + // value that the server sends in the header response. This is because Envoy may have already + // sent the body to the server, prior to processing the header response. bool send_body_without_waiting_for_header_response = 21; // When :ref:`allow_mode_override diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index 5f0d66e65735..6ae58c3c7248 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -180,7 +180,10 @@ message ProcessingResponse { // It is also ignored by Envoy when the ext_proc filter config // :ref:`allow_mode_override // ` - // is set to false. + // is set to false, or + // :ref:`send_body_without_waiting_for_header_response + // ` + // is set to true. envoy.extensions.filters.http.ext_proc.v3.ProcessingMode mode_override = 9; // When ext_proc server receives a request message, in case it needs more @@ -285,9 +288,6 @@ message CommonResponse { // Instructions on how to manipulate the headers. When responding to an // HttpBody request, header mutations will only take effect if // the current processing mode for the body is BUFFERED. - // [#comment:TODO(yanjunxiang-google) rephrase last sentence once send_body_without_waiting_for_header_response is not hidden: - // the current processing mode for the body is: 1) BUFFERED; 2) or STREAMED and - // the :ref:`send_body_without_waiting_for_header_response ` is enabled.] HeaderMutation header_mutation = 2; // Replace the body of the last message sent to the remote server on this diff --git a/changelogs/current.yaml b/changelogs/current.yaml index cb7fb236024a..3b161072fbb3 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -77,6 +77,10 @@ minor_behavior_changes: change: | When Lua script executes httpCall, backpressure is exercised when receiving body from downstream client. This behavior can be reverted by setting the runtime guard ``envoy.reloadable_features.lua_flow_control_while_http_call`` to false. +- area: ext_proc + change: | + Added support for :ref:`send_body_without_waiting_for_header_response + `. - area: http change: | Modified the authority header value validator to allow the same characters as oghttp2 diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 9175293ca18f..160f51910ff3 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -189,6 +189,8 @@ FilterConfig::FilterConfig( deferred_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, deferred_close_timeout, DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS)), message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms), + send_body_without_waiting_for_header_response_( + config.send_body_without_waiting_for_header_response()), stats_(generateStats(stats_prefix, config.stat_prefix(), scope)), processing_mode_(config.processing_mode()), mutation_checker_(config.mutation_rules(), context.regexEngine()), @@ -495,16 +497,22 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b } if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { - ENVOY_LOG(trace, "Header processing still in progress -- holding body data"); - // We don't know what to do with the body until the response comes back. - // We must buffer it in case we need it when that happens. - // Raise a watermark to prevent a buffer overflow until the response comes back. - // When end_stream is true, we need to StopIterationAndWatermark as well to stop the - // ActiveStream from returning error when the last chunk added to stream buffer exceeds the - // buffer limit. - state.setPaused(true); - state.requestWatermark(); - return FilterDataStatus::StopIterationAndWatermark; + if (state.bodyMode() == ProcessingMode::STREAMED && + config_->sendBodyWithoutWaitingForHeaderResponse()) { + ENVOY_LOG(trace, "Sending body data even header processing is still in progress as body mode " + "is STREAMED and send_body_without_waiting_for_header_response is enabled"); + } else { + ENVOY_LOG(trace, "Header processing still in progress -- holding body data"); + // We don't know what to do with the body until the response comes back. + // We must buffer it in case we need it when that happens. + // Raise a watermark to prevent a buffer overflow until the response comes back. + // When end_stream is true, we need to StopIterationAndWatermark as well to stop the + // ActiveStream from returning error when the last chunk added to stream buffer exceeds the + // buffer limit. + state.setPaused(true); + state.requestWatermark(); + return FilterDataStatus::StopIterationAndWatermark; + } } FilterDataStatus result; @@ -566,11 +574,13 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b // Need to first enqueue the data into the chunk queue before sending. auto req = setupBodyChunk(state, data, end_stream); state.enqueueStreamingChunk(data, end_stream); - sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req); - - // At this point we will continue, but with no data, because that will come later - if (end_stream) { - // But we need to stop iteration for the last chunk because it's our last chance to do stuff + // If the current state is HeadersCallback, stays in that state. + if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { + sendBodyChunk(state, ProcessorState::CallbackState::HeadersCallback, req); + } else { + sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req); + } + if (end_stream || state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { state.setPaused(true); result = FilterDataStatus::StopIterationNoBuffer; } else { @@ -1071,9 +1081,11 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { // Update processing mode now because filter callbacks check it // and the various "handle" methods below may result in callbacks // being invoked in line. This only happens when filter has allow_mode_override - // set to true and filter is waiting for header processing response. + // set to true, send_body_without_waiting_for_header_response set to false, + // and filter is waiting for header processing response. // Otherwise, the response mode_override proto field is ignored. - if (config_->allowModeOverride() && inHeaderProcessState() && response->has_mode_override()) { + if (config_->allowModeOverride() && !config_->sendBodyWithoutWaitingForHeaderResponse() && + inHeaderProcessState() && response->has_mode_override()) { bool mode_override_allowed = true; const auto& mode_overide = response->mode_override(); // First, check if mode override allow-list is configured diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 3f52bba5094b..ce5a9ed4b195 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -216,6 +216,10 @@ class FilterConfig { uint32_t maxMessageTimeout() const { return max_message_timeout_ms_; } + bool sendBodyWithoutWaitingForHeaderResponse() const { + return send_body_without_waiting_for_header_response_; + } + const ExtProcFilterStats& stats() const { return stats_; } const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& processingMode() const { @@ -283,6 +287,7 @@ class FilterConfig { const std::chrono::milliseconds deferred_close_timeout_; const std::chrono::milliseconds message_timeout_; const uint32_t max_message_timeout_ms_; + const bool send_body_without_waiting_for_header_response_; ExtProcFilterStats stats_; const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode processing_mode_; diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 7b773d391524..b0c569c7d146 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -81,24 +81,57 @@ bool ProcessorState::restartMessageTimer(const uint32_t message_timeout_ms) { } } +void ProcessorState::sendBufferedDataInStreamedMode(bool end_stream) { + // Process the data being buffered in streaming mode. + // Move the current buffer into the queue for remote processing and clear the buffered data. + if (hasBufferedData()) { + Buffer::OwnedImpl buffered_chunk; + modifyBufferedData([&buffered_chunk](Buffer::Instance& data) { buffered_chunk.move(data); }); + ENVOY_LOG(debug, "Sending a chunk of buffered data ({})", buffered_chunk.length()); + // Need to first enqueue the data into the chunk queue before sending. + auto req = filter_.setupBodyChunk(*this, buffered_chunk, end_stream); + enqueueStreamingChunk(buffered_chunk, end_stream); + filter_.sendBodyChunk(*this, ProcessorState::CallbackState::StreamedBodyCallback, req); + } + if (queueBelowLowLimit()) { + clearWatermark(); + } +} + +absl::Status ProcessorState::processHeaderMutation(const CommonResponse& common_response) { + ENVOY_LOG(debug, "Applying header mutations"); + const auto mut_status = MutationUtils::applyHeaderMutations( + common_response.header_mutation(), *headers_, + common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, + filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, + shouldRemoveContentLength()); + return mut_status; +} + +ProcessorState::CallbackState +ProcessorState::getCallbackStateAfterHeaderResp(const CommonResponse& common_response) const { + if (bodyMode() == ProcessingMode::STREAMED && + filter_.config().sendBodyWithoutWaitingForHeaderResponse() && !chunk_queue_.empty() && + (common_response.status() != CommonResponse::CONTINUE_AND_REPLACE)) { + return ProcessorState::CallbackState::StreamedBodyCallback; + } + return ProcessorState::CallbackState::Idle; +} + absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& response) { if (callback_state_ == CallbackState::HeadersCallback) { ENVOY_LOG(debug, "applying headers response. body mode = {}", ProcessingMode::BodySendMode_Name(body_mode_)); const auto& common_response = response.response(); if (common_response.has_header_mutation()) { - const auto mut_status = MutationUtils::applyHeaderMutations( - common_response.header_mutation(), *headers_, - common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, - filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, - shouldRemoveContentLength()); + const auto mut_status = processHeaderMutation(common_response); if (!mut_status.ok()) { return mut_status; } } clearRouteCache(common_response); - onFinishProcessorCall(Grpc::Status::Ok); + onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { ENVOY_LOG(debug, "Replacing complete message"); @@ -119,6 +152,9 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon }); } } + + // In case any data left over in the chunk queue, clear them. + clearStreamingChunk(); // Once this message is received, we won't send anything more on this request // or response to the processor. Clear flags to make sure. body_mode_ = ProcessingMode::NONE; @@ -129,17 +165,26 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon // Fall through if there was never a body in the first place. ENVOY_LOG(debug, "The message had no body"); } else if (complete_body_available_ && body_mode_ != ProcessingMode::NONE) { - // If we get here, then all the body data came in before the header message - // was complete, and the server wants the body. It doesn't matter whether the - // processing mode is buffered, streamed, or partially buffered. - if (bufferedData()) { - // Get here, no_body_ = false, and complete_body_available_ = true, the end_stream - // flag of decodeData() can be determined by whether the trailers are received. - // Also, bufferedData() is not nullptr means decodeData() is called, even though - // the data can be an empty chunk. - auto req = filter_.setupBodyChunk(*this, *bufferedData(), !trailers_available_); - filter_.sendBodyChunk(*this, ProcessorState::CallbackState::BufferedBodyCallback, req); - clearWatermark(); + if (callback_state_ != CallbackState::StreamedBodyCallback) { + // If we get here, then all the body data came in before the header message + // was complete, and the server wants the body. It doesn't matter whether the + // processing mode is buffered, streamed, or partially buffered. + if (bufferedData()) { + // Get here, no_body_ = false, and complete_body_available_ = true, the end_stream + // flag of decodeData() can be determined by whether the trailers are received. + // Also, bufferedData() is not nullptr means decodeData() is called, even though + // the data can be an empty chunk. + auto req = filter_.setupBodyChunk(*this, *bufferedData(), !trailers_available_); + filter_.sendBodyChunk(*this, ProcessorState::CallbackState::BufferedBodyCallback, req); + clearWatermark(); + return absl::OkStatus(); + } + } else { + // StreamedBodyCallback state. There is pending body response. + // Check whether there is buffered data. If there is, send them. + // Do not continue filter chain here so the pending body response have chance to be + // served. + sendBufferedDataInStreamedMode(!trailers_available_); return absl::OkStatus(); } } else if (body_mode_ == ProcessingMode::BUFFERED) { @@ -149,22 +194,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon clearWatermark(); return absl::OkStatus(); } else if (body_mode_ == ProcessingMode::STREAMED) { - if (hasBufferedData()) { - // We now know that we need to process what we have buffered in streaming mode. - // Move the current buffer into the queue for remote processing and clear the - // buffered data. - Buffer::OwnedImpl buffered_chunk; - modifyBufferedData( - [&buffered_chunk](Buffer::Instance& data) { buffered_chunk.move(data); }); - ENVOY_LOG(debug, "Sending first chunk using buffered data ({})", buffered_chunk.length()); - // Need to first enqueue the data into the chunk queue before sending. - auto req = filter_.setupBodyChunk(*this, buffered_chunk, false); - enqueueStreamingChunk(buffered_chunk, false); - filter_.sendBodyChunk(*this, ProcessorState::CallbackState::StreamedBodyCallback, req); - } - if (queueBelowLowLimit()) { - clearWatermark(); - } + sendBufferedDataInStreamedMode(false); continueIfNecessary(); return absl::OkStatus(); } else if (body_mode_ == ProcessingMode::BUFFERED_PARTIAL) { @@ -226,12 +256,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { if (callback_state_ == CallbackState::BufferedBodyCallback) { if (common_response.has_header_mutation()) { if (headers_ != nullptr) { - ENVOY_LOG(debug, "Applying header mutations to buffered body message"); - const auto mut_status = MutationUtils::applyHeaderMutations( - common_response.header_mutation(), *headers_, - common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, - filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, - shouldRemoveContentLength()); + const auto mut_status = processHeaderMutation(common_response); if (!mut_status.ok()) { return mut_status; } @@ -291,12 +316,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { ENVOY_BUG(chunk != nullptr, "Bad partial body callback state"); if (common_response.has_header_mutation()) { if (headers_ != nullptr) { - ENVOY_LOG(debug, "Applying header mutations to buffered body message"); - const auto mut_status = MutationUtils::applyHeaderMutations( - common_response.header_mutation(), *headers_, - common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, - filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, - shouldRemoveContentLength()); + const auto mut_status = processHeaderMutation(common_response); if (!mut_status.ok()) { return mut_status; } @@ -523,6 +543,13 @@ const QueuedChunk& ChunkQueue::consolidate() { return chunk; } +void ChunkQueue::clear() { + if (queue_.size() > 1) { + received_data_.drain(received_data_.length()); + queue_.clear(); + } +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 01e9bc57ae59..51d8aa791891 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -40,6 +40,7 @@ class ChunkQueue { uint32_t bytesEnqueued() const { return bytes_enqueued_; } bool empty() const { return queue_.empty(); } void push(Buffer::Instance& data, bool end_stream); + void clear(); QueuedChunkPtr pop(Buffer::OwnedImpl& out_data); const QueuedChunk& consolidate(); Buffer::OwnedImpl& receivedData() { return received_data_; } @@ -272,6 +273,12 @@ class ProcessorState : public Logger::Loggable { private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} + void sendBufferedDataInStreamedMode(bool end_stream); + absl::Status + processHeaderMutation(const envoy::service::ext_proc::v3::CommonResponse& common_response); + void clearStreamingChunk() { chunk_queue_.clear(); } + CallbackState getCallbackStateAfterHeaderResp( + const envoy::service::ext_proc::v3::CommonResponse& common_response) const; }; class DecodingProcessorState : public ProcessorState { diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 697a773a1ebb..e229fc88b850 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -4652,6 +4652,174 @@ TEST_P(ExtProcIntegrationTest, SidestreamPushbackUpstreamObservabilityMode) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, SendBodyBeforeHeaderRespStreamedBasicTest) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + proto_config_.set_send_body_without_waiting_for_header_response(true); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("hello world", [](Http::HeaderMap& headers) { + headers.addCopy(LowerCaseString("x-remove-this"), "yes"); + }); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) { + Http::TestRequestHeaderMapImpl expected_request_headers{ + {":scheme", "http"}, {":method", "POST"}, {"host", "host"}, + {":path", "/"}, {"x-remove-this", "yes"}, {"x-forwarded-proto", "http"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers)); + + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + response_header_mutation->add_remove_headers("x-remove-this"); + return true; + }); + processRequestBodyMessage( + *grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& body_resp) { + EXPECT_TRUE(body.end_of_stream()); + EXPECT_EQ(body.body(), "hello world"); + auto* body_mut = body_resp.mutable_response()->mutable_body_mutation(); + body_mut->set_body("replaced body"); + return true; + }); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this")); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), "replaced body"); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + processResponseHeadersMessage( + *grpc_upstreams_[0], false, [](const HttpHeaders& headers, HeadersResponse&) { + Http::TestRequestHeaderMapImpl expected_response_headers{{":status", "200"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_response_headers)); + return true; + }); + processResponseBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, SendBodyAndTrailerBeforeHeaderRespStreamedMoreDataTest) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_trailer_mode(ProcessingMode::SEND); + proto_config_.set_send_body_without_waiting_for_header_response(true); + + initializeConfig(); + HttpIntegrationTest::initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(*request_encoder_, "hello world", false); + processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt); + processRequestBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + codec_client_->sendData(*request_encoder_, "foo-bar", true); + processRequestBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + + handleUpstreamRequestWithTrailer(); + processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt); + processResponseBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + processResponseTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, ServerWaitForBodyBeforeSendsHeaderRespStreamedTest) { + config_helper_.setBufferLimits(1024, 1024); + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + proto_config_.set_send_body_without_waiting_for_header_response(true); + + initializeConfig(); + HttpIntegrationTest::initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl default_headers; + HttpTestUtility::addDefaultHeaders(default_headers); + + auto encoder_decoder = codec_client_->startRequest(default_headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + // Downstream client sending 16k data. + const std::string body_sent(16 * 1024, 's'); + codec_client_->sendData(*request_encoder_, body_sent, true); + + // The ext_proc server receives the headers. + ProcessingRequest header_request; + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, processor_connection_)); + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, header_request)); + ASSERT_TRUE(header_request.has_request_headers()); + + // The ext_proc server receives 16 chunks of body, each chunk size is 1k. + std::string body_received; + bool end_stream = false; + uint32_t total_body_msg_count = 0; + while (!end_stream) { + ProcessingRequest body_request; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request)); + ASSERT_TRUE(body_request.has_request_body()); + body_received = absl::StrCat(body_received, body_request.request_body().body()); + end_stream = body_request.request_body().end_of_stream(); + total_body_msg_count++; + } + EXPECT_TRUE(end_stream); + EXPECT_EQ(body_received, body_sent); + + // The ext_proc server sends back the header response. + processor_stream_->startGrpcStream(); + ProcessingResponse response_header; + auto* header_resp = response_header.mutable_request_headers(); + auto header_mutation = header_resp->mutable_response()->mutable_header_mutation(); + auto* mut = header_mutation->add_set_headers(); + mut->mutable_header()->set_key("x-new-header"); + mut->mutable_header()->set_raw_value("new"); + processor_stream_->sendGrpcMessage(response_header); + + // The ext_proc server sends back the body response. + const std::string body_upstream(total_body_msg_count, 'r'); + while (total_body_msg_count) { + ProcessingResponse response_body; + auto* body_resp = response_body.mutable_request_body(); + auto* body_mut = body_resp->mutable_response()->mutable_body_mutation(); + body_mut->set_body("r"); + processor_stream_->sendGrpcMessage(response_body); + total_body_msg_count--; + } + + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), body_upstream); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, SendBodyBeforeHeaderRespStreamedNotSendTrailerTest) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + proto_config_.set_send_body_without_waiting_for_header_response(true); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBodyAndTrailer("hello world"); + processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt); + processRequestBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + handleUpstreamRequest(100); + processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt); + processResponseBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + TEST_P(ExtProcIntegrationTest, SendHeaderBodyNotSendTrailerTest) { proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 008974567edc..edb3c2b66492 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -2567,6 +2567,51 @@ TEST_F(HttpFilterTest, ProcessingModeOverrideResponseHeaders) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Set allow_mode_override in filter config to be true. +// Set send_body_without_waiting_for_header_response to be true +// In such case, the mode_override in the response will be ignored. +TEST_F(HttpFilterTest, DisableResponseModeOverrideBySendBodyFlag) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + allow_mode_override: true + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(filter_->config().allowModeOverride(), true); + EXPECT_EQ(filter_->config().sendBodyWithoutWaitingForHeaderResponse(), true); + EXPECT_EQ(filter_->config().processingMode().response_header_mode(), ProcessingMode::SEND); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); + + // When ext_proc server sends back the request header response, it contains the + // mode_override for the response_header_mode to be SKIP. + processRequestHeaders( + false, [](const HttpHeaders&, ProcessingResponse& response, HeadersResponse&) { + response.mutable_mode_override()->set_response_header_mode(ProcessingMode::SKIP); + }); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, true)); + + // Verify such mode_override is ignored. The response header is still sent to the ext_proc server. + processResponseHeaders(false, [](const HttpHeaders& header_resp, ProcessingResponse&, + HeadersResponse&) { + EXPECT_TRUE(header_resp.end_of_stream()); + TestRequestHeaderMapImpl expected_response{{":status", "200"}, {"content-type", "text/plain"}}; + EXPECT_THAT(header_resp.headers(), HeaderProtosEqual(expected_response)); + }); + + TestRequestHeaderMapImpl final_expected_response{{":status", "200"}, + {"content-type", "text/plain"}}; + EXPECT_THAT(&response_headers_, HeaderMapEqualIgnoreOrder(&final_expected_response)); + filter_->onDestroy(); +} + // Leaving the allow_mode_override in filter config to be default, which is false. // In such case, the mode_override in the response will be ignored. TEST_F(HttpFilterTest, DisableResponseModeOverride) { @@ -3960,6 +4005,283 @@ TEST_F(HttpFilterTest, EmitDynamicMetadataUseLast) { filter_->onDestroy(); } +TEST_F(HttpFilterTest, HeaderRespReceivedBeforeBody) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + // Header response arrives before any body data. + processResponseHeaders(false, absl::nullopt); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + // Send body responses + for (int i = 0; i < 5; i++) { + processResponseBody( + [&want_response_body, i](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + std::string new_body = absl::StrCat(" ", std::to_string(i), " "); + body_mut->set_body(new_body); + want_response_body.add(new_body); + }, + false); + } + + // Send the last empty request chunk. + Buffer::OwnedImpl last_resp_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + processResponseBody(absl::nullopt, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, HeaderRespReceivedAfterBodySent) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + // Header response arrives after some amount of body data sent. + auto response = std::make_unique(); + (void)response->mutable_response_headers(); + stream_callbacks_->onReceiveMessage(std::move(response)); + + // Three body responses follows the header response. + for (int i = 0; i < 2; i++) { + processResponseBody( + [&want_response_body, i](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + std::string new_body = absl::StrCat(" ", std::to_string(i), " "); + body_mut->set_body(new_body); + want_response_body.add(new_body); + }, + false); + } + + // Now sends the rest of the body chunks to the server. + for (int i = 5; i < 10; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + // Send body responses + for (int i = 2; i < 10; i++) { + processResponseBody( + [&want_response_body, i](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + std::string new_body = absl::StrCat(" ", std::to_string(i), " "); + body_mut->set_body(new_body); + want_response_body.add(new_body); + }, + false); + } + + // Send the last empty request chunk. + Buffer::OwnedImpl last_resp_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + processResponseBody(absl::nullopt, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, HeaderRespWithStatusContinueAndReplace) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + Buffer::OwnedImpl resp_buffer; + setUpEncodingBuffering(resp_buffer, true); + // Header response arrives with status CONTINUE_AND_REPLACE after some amount of body data sent. + auto response = std::make_unique(); + auto* hdrs_resp = response->mutable_response_headers(); + hdrs_resp->mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + hdrs_resp->mutable_response()->mutable_body_mutation()->set_body("Hello, World!"); + stream_callbacks_->onReceiveMessage(std::move(response)); + + // Ensure buffered data was updated + EXPECT_EQ(resp_buffer.toString(), "Hello, World!"); + + // Since we did CONTINUE_AND_REPLACE, later data is cleared + Buffer::OwnedImpl resp_data_1("test"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data_1, false)); + EXPECT_EQ(resp_data_1.length(), 0); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, StreamedTestInBothDirection) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + request_body_mode: "STREAMED" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(resp_chunk, false)); + } + // Send the last empty request chunk. + Buffer::OwnedImpl last_req_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(last_req_chunk, true)); + // Header response arrives + auto req_response = std::make_unique(); + (void)req_response->mutable_request_headers(); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + stream_callbacks_->onReceiveMessage(std::move(req_response)); + + // Data response arrives + for (int i = 0; i < 5; i++) { + processRequestBody(absl::nullopt, false); + } + processRequestBody(absl::nullopt, false); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + for (int i = 0; i < 7; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + auto resp_response = std::make_unique(); + (void)resp_response->mutable_response_headers(); + stream_callbacks_->onReceiveMessage(std::move(resp_response)); + + // Send body responses + for (int i = 0; i < 7; i++) { + processResponseBody(absl::nullopt, false); + } + + // Send the last empty request chunk. + Buffer::OwnedImpl last_resp_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + processResponseBody(absl::nullopt, true); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + // Verify if ext_proc filter is in the upstream filter chain, and if the ext_proc server // sends back response with clear_route_cache set to true, it is ignored. TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutationUpstreamIgnored) { From 4dd017e89eace3b9aa6a54cfcb5d9d240d73cce6 Mon Sep 17 00:00:00 2001 From: code Date: Thu, 26 Sep 2024 08:41:24 +0800 Subject: [PATCH 6/6] execution context: refactored the impl to use marco to enable the execution context (#36277) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Commit Message: execution context: refactored the impl to use marco to enable the execution context Additional Description: The execution context is a feature that was contributed by the @wu-bin from google. But it only makes sense in very limited scenarios. After a discussion with @wu-bin , we decided to refactor it to make it more 'independent'. This PR refactored the implementation to make the custom abstraction `ExecutionContext` only be referenced in limited positions of Envoy core code base. More specifically, this PR: 1. move all ExecutionContext related code to single header file `envoy/common/execution_conetxt.h`. 2. No any directly dependency to the ExecutionContext at the Envoy core code base except the `source/common/common/scope_tracker.h`. 3. compiled the ExecutionContext out by default. 4. removed restart feature flag `envoy.restart_features.enable_execution_context`.   Risk Level: low. Testing: unit. Docs Changes: n/a. Release Notes: added. Platform Specific Features: n/a. --------- Signed-off-by: wangbaiping --- bazel/BUILD | 5 + bazel/envoy_internal.bzl | 7 ++ changelogs/current.yaml | 4 + envoy/common/BUILD | 8 +- envoy/common/execution_context.h | 27 +++- envoy/common/scope_tracker.h | 13 +- envoy/network/BUILD | 2 +- envoy/network/listen_socket.h | 13 +- source/common/common/scope_tracker.h | 15 +-- source/common/http/BUILD | 1 - source/common/http/conn_manager_impl.h | 6 +- source/common/http/filter_manager.h | 1 + source/common/http/http1/BUILD | 1 - source/common/http/http1/codec_impl.cc | 5 +- source/common/http/http1/codec_impl.h | 2 +- source/common/http/http2/BUILD | 1 - source/common/http/http2/codec_impl.cc | 5 +- source/common/http/http2/codec_impl.h | 2 +- source/common/network/BUILD | 12 -- .../common_connection_filter_states.cc | 14 --- .../network/common_connection_filter_states.h | 34 ----- source/common/network/connection_impl.h | 1 + source/common/network/connection_impl_base.cc | 10 +- source/common/network/connection_impl_base.h | 3 +- source/common/runtime/runtime_features.cc | 3 - test/common/common/BUILD | 6 + test/common/common/execution_context_test.cc | 119 ++++++++++++++---- test/common/common/scope_tracker_test.cc | 19 +-- test/common/network/BUILD | 1 - test/common/network/connection_impl_test.cc | 32 ----- test/mocks/common.h | 2 +- test/mocks/network/connection.h | 2 +- test/mocks/network/mocks.h | 1 - 33 files changed, 188 insertions(+), 189 deletions(-) delete mode 100644 source/common/network/common_connection_filter_states.cc delete mode 100644 source/common/network/common_connection_filter_states.h diff --git a/bazel/BUILD b/bazel/BUILD index 77da1e1fce10..011b1c88a334 100644 --- a/bazel/BUILD +++ b/bazel/BUILD @@ -517,6 +517,11 @@ config_setting( values = {"define": "perf_annotation=enabled"}, ) +config_setting( + name = "enable_execution_context", + values = {"define": "execution_context=enabled"}, +) + config_setting( name = "enable_perf_tracing", values = {"define": "perf_tracing=enabled"}, diff --git a/bazel/envoy_internal.bzl b/bazel/envoy_internal.bzl index b84d93ef7a5b..015659851c1b 100644 --- a/bazel/envoy_internal.bzl +++ b/bazel/envoy_internal.bzl @@ -125,6 +125,7 @@ def envoy_copts(repository, test = False): envoy_select_static_extension_registration(["-DENVOY_STATIC_EXTENSION_REGISTRATION"], repository) + \ envoy_select_disable_logging(["-DENVOY_DISABLE_LOGGING"], repository) + \ _envoy_select_perf_annotation(["-DENVOY_PERF_ANNOTATION"]) + \ + _envoy_select_execution_context() + \ _envoy_select_perfetto(["-DENVOY_PERFETTO"]) + \ envoy_select_google_grpc(["-DENVOY_GOOGLE_GRPC"], repository) + \ envoy_select_signal_trace(["-DENVOY_HANDLE_SIGNALS"], repository) + \ @@ -190,6 +191,12 @@ def _envoy_select_perf_annotation(xs): "//conditions:default": [], }) +def _envoy_select_execution_context(): + return select({ + "@envoy//bazel:enable_execution_context": ["-DENVOY_ENABLE_EXECUTION_CONTEXT"], + "//conditions:default": [], + }) + def _envoy_select_perfetto(xs): return select({ "@envoy//bazel:enable_perf_tracing": xs, diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 3b161072fbb3..064f4d8e8035 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -54,6 +54,10 @@ behavior_changes: `. This change can be disabled by setting the runtime guard flag ``envoy.reloadable_features.filter_access_loggers_first`` to ``false``. +- area: monitoring + change: | + Removed runtime feature flag ``envoy.restart_features.enable_execution_context``. The execution context feature + now could be enabled only by setting compile option ``--define=execution_context=enabled``. minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* diff --git a/envoy/common/BUILD b/envoy/common/BUILD index 3c8680576e4b..12c71b554bbf 100644 --- a/envoy/common/BUILD +++ b/envoy/common/BUILD @@ -124,15 +124,19 @@ envoy_cc_library( envoy_cc_library( name = "execution_context", hdrs = ["execution_context.h"], - deps = [":pure_lib"], + deps = [ + ":pure_lib", + ":scope_tracker_interface", + ], ) envoy_cc_library( name = "scope_tracker_interface", hdrs = ["scope_tracker.h"], deps = [ - ":execution_context", + ":optref_lib", ":pure_lib", + "//envoy/stream_info:stream_info_interface", ], ) diff --git a/envoy/common/execution_context.h b/envoy/common/execution_context.h index fb227a05af41..b723a221bcae 100644 --- a/envoy/common/execution_context.h +++ b/envoy/common/execution_context.h @@ -3,22 +3,25 @@ #include #include "envoy/common/pure.h" +#include "envoy/common/scope_tracker.h" +#include "envoy/stream_info/stream_info.h" #include "source/common/common/non_copyable.h" namespace Envoy { +#ifdef ENVOY_ENABLE_EXECUTION_CONTEXT + +static constexpr absl::string_view kConnectionExecutionContextFilterStateName = + "envoy.network.connection_execution_context"; + class ScopedExecutionContext; // ExecutionContext can be inherited by subclasses to represent arbitrary information associated // with the execution of a piece of code. activate/deactivate are called when the said execution // starts/ends. For an example usage, please see // https://github.com/envoyproxy/envoy/issues/32012. -class ExecutionContext : NonCopyable { -public: - ExecutionContext() = default; - virtual ~ExecutionContext() = default; - +class ExecutionContext : public StreamInfo::FilterState::Object, NonCopyable { protected: // Called when the current thread starts to run code on behalf of the owner of this object. // protected because it should only be called by ScopedExecutionContext. @@ -43,7 +46,8 @@ class ExecutionContext : NonCopyable { class ScopedExecutionContext : NonCopyable { public: ScopedExecutionContext() : ScopedExecutionContext(nullptr) {} - ScopedExecutionContext(ExecutionContext* context) : context_(context) { + ScopedExecutionContext(const ScopeTrackedObject* object) + : context_(object != nullptr ? getExecutionContext(object->trackedStream()) : nullptr) { if (context_ != nullptr) { context_->activate(); } @@ -62,7 +66,18 @@ class ScopedExecutionContext : NonCopyable { bool isNull() const { return context_ == nullptr; } private: + ExecutionContext* getExecutionContext(OptRef info) { + if (!info.has_value()) { + return nullptr; + } + const auto* const_context = info->filterState().getDataReadOnly( + kConnectionExecutionContextFilterStateName); + return const_cast(const_context); + } + ExecutionContext* context_; }; +#endif + } // namespace Envoy diff --git a/envoy/common/scope_tracker.h b/envoy/common/scope_tracker.h index f96425541513..e846f284e412 100644 --- a/envoy/common/scope_tracker.h +++ b/envoy/common/scope_tracker.h @@ -2,16 +2,17 @@ #include -#include "envoy/common/execution_context.h" +#include "envoy/common/optref.h" #include "envoy/common/pure.h" +#include "envoy/stream_info/stream_info.h" namespace Envoy { /* * An interface for tracking the scope of work. Implementors of this interface * can be registered to the dispatcher when they're active on the stack. If a - * fatal error occurs while they were active, the dumpState method will be - * called. + * fatal error occurs while they were active, the dumpState() method will be + * called to output the active state. * * Currently this is only used for the L4 network connection and L7 stream. */ @@ -20,9 +21,11 @@ class ScopeTrackedObject { virtual ~ScopeTrackedObject() = default; /** - * If the tracked object has a ExecutionContext, returns it. Returns nullptr otherwise. + * Return the tracked stream info that related to the scope tracked object (L4 + * network connection or L7 stream). + * @return optional reference to stream info of stream (L4 connection or L7 stream). */ - virtual ExecutionContext* executionContext() const { return nullptr; } + virtual OptRef trackedStream() const { return {}; } /** * Dump debug state of the object in question to the provided ostream. diff --git a/envoy/network/BUILD b/envoy/network/BUILD index f40b97d53073..95ab4e4677e5 100644 --- a/envoy/network/BUILD +++ b/envoy/network/BUILD @@ -25,6 +25,7 @@ envoy_cc_library( ":filter_interface", ":listen_socket_interface", "//envoy/buffer:buffer_interface", + "//envoy/common:scope_tracker_interface", "//envoy/event:deferred_deletable", "//envoy/ssl:connection_interface", "//envoy/stream_info:stream_info_interface", @@ -174,7 +175,6 @@ envoy_cc_library( deps = [ ":io_handle_interface", ":socket_interface", - "//envoy/common:scope_tracker_interface", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], ) diff --git a/envoy/network/listen_socket.h b/envoy/network/listen_socket.h index e600d2d0e42c..08f4a4d5a672 100644 --- a/envoy/network/listen_socket.h +++ b/envoy/network/listen_socket.h @@ -7,7 +7,6 @@ #include "envoy/common/exception.h" #include "envoy/common/pure.h" -#include "envoy/common/scope_tracker.h" #include "envoy/config/core/v3/base.pb.h" #include "envoy/network/address.h" #include "envoy/network/io_handle.h" @@ -26,7 +25,7 @@ namespace Network { * TODO(jrajahalme): Hide internals (e.g., fd) from listener filters by providing callbacks filters * may need (set/getsockopt(), peek(), recv(), etc.) */ -class ConnectionSocket : public virtual Socket, public virtual ScopeTrackedObject { +class ConnectionSocket : public virtual Socket { public: /** * Set detected transport protocol (e.g. RAW_BUFFER, TLS). @@ -83,6 +82,16 @@ class ConnectionSocket : public virtual Socket, public virtual ScopeTrackedObjec * return value is cwnd(in packets) times the connection's MSS. */ virtual absl::optional congestionWindowInBytes() const PURE; + + /** + * Dump debug state of the object in question to the provided ostream. + * + * This is called on Envoy fatal errors, so should do minimal memory allocation. + * + * @param os the ostream to output to. + * @param indent_level how far to indent, for pretty-printed classes and subclasses. + */ + virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE; }; using ConnectionSocketPtr = std::unique_ptr; diff --git a/source/common/common/scope_tracker.h b/source/common/common/scope_tracker.h index dfe994704245..9deb53381d78 100644 --- a/source/common/common/scope_tracker.h +++ b/source/common/common/scope_tracker.h @@ -18,9 +18,7 @@ namespace Envoy { class ScopeTrackerScopeState { public: ScopeTrackerScopeState(const ScopeTrackedObject* object, Event::ScopeTracker& tracker) - : registered_object_(object), - scoped_execution_context_(executionContextEnabled() ? object->executionContext() : nullptr), - tracker_(tracker) { + : registered_object_(object), tracker_(tracker) { tracker_.pushTrackedObject(registered_object_); } @@ -36,14 +34,13 @@ class ScopeTrackerScopeState { private: friend class ScopeTrackerScopeStateTest; - static bool& executionContextEnabled() { - static bool enabled = - Runtime::runtimeFeatureEnabled("envoy.restart_features.enable_execution_context"); - return enabled; - } + const ScopeTrackedObject* registered_object_; - ScopedExecutionContext scoped_execution_context_; Event::ScopeTracker& tracker_; + +#ifdef ENVOY_ENABLE_EXECUTION_CONTEXT + ScopedExecutionContext scoped_execution_context_{registered_object_}; +#endif }; } // namespace Envoy diff --git a/source/common/http/BUILD b/source/common/http/BUILD index f0dc9190b85c..cf716abd8789 100644 --- a/source/common/http/BUILD +++ b/source/common/http/BUILD @@ -392,7 +392,6 @@ envoy_cc_library( "//source/common/config:utility_lib", "//source/common/http/http1:codec_lib", "//source/common/http/http2:codec_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/network:proxy_protocol_filter_state_lib", "//source/common/network:utility_lib", "//source/common/quic:quic_server_factory_stub_lib", diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index aef542307ff1..d4f73ccef45d 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -45,7 +45,6 @@ #include "source/common/http/user_agent.h" #include "source/common/http/utility.h" #include "source/common/local_reply/local_reply.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/network/proxy_protocol_filter_state.h" #include "source/common/stream_info/stream_info_impl.h" #include "source/common/tracing/http_tracer_impl.h" @@ -218,10 +217,9 @@ class ConnectionManagerImpl : Logger::Loggable, } // ScopeTrackedObject - ExecutionContext* executionContext() const override { - return getConnectionExecutionContext(connection_manager_.read_callbacks_->connection()); + OptRef trackedStream() const override { + return filter_manager_.trackedStream(); } - void dumpState(std::ostream& os, int indent_level = 0) const override { const char* spaces = spacesForLevel(indent_level); os << spaces << "ActiveStream " << this << DUMP_MEMBER(stream_id_); diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index f1698144b390..1798106f864a 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -651,6 +651,7 @@ class FilterManager : public ScopeTrackedObject, } // ScopeTrackedObject + OptRef trackedStream() const override { return streamInfo(); } void dumpState(std::ostream& os, int indent_level = 0) const override { const char* spaces = spacesForLevel(indent_level); os << spaces << "FilterManager " << this << DUMP_MEMBER(state_.has_1xx_headers_) diff --git a/source/common/http/http1/BUILD b/source/common/http/http1/BUILD index 0e31c65919c8..564dfd31f354 100644 --- a/source/common/http/http1/BUILD +++ b/source/common/http/http1/BUILD @@ -60,7 +60,6 @@ envoy_cc_library( "//source/common/http:headers_lib", "//source/common/http:status_lib", "//source/common/http:utility_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/runtime:runtime_features_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 92fb58bb1a40..aea743c5f67f 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -24,7 +24,6 @@ #include "source/common/http/http1/header_formatter.h" #include "source/common/http/http1/legacy_parser_impl.h" #include "source/common/http/utility.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/runtime/runtime_features.h" #include "absl/container/fixed_array.h" @@ -976,8 +975,8 @@ void ConnectionImpl::onResetStreamBase(StreamResetReason reason) { onResetStream(reason); } -ExecutionContext* ConnectionImpl::executionContext() const { - return getConnectionExecutionContext(connection_); +OptRef ConnectionImpl::trackedStream() const { + return connection_.trackedStream(); } void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const { diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 875a9c6d99d6..0efd6cdc7f7d 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -272,7 +272,7 @@ class ConnectionImpl : public virtual Connection, Envoy::Http::Status codec_status_; // ScopeTrackedObject - ExecutionContext* executionContext() const override; + OptRef trackedStream() const override; void dumpState(std::ostream& os, int indent_level) const override; protected: diff --git a/source/common/http/http2/BUILD b/source/common/http/http2/BUILD index 281377db831e..34b9dadd3868 100644 --- a/source/common/http/http2/BUILD +++ b/source/common/http/http2/BUILD @@ -56,7 +56,6 @@ envoy_cc_library( "//source/common/http:headers_lib", "//source/common/http:status_lib", "//source/common/http:utility_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/runtime:runtime_features_lib", "@com_github_google_quiche//:http2_adapter", "@com_google_absl//absl/algorithm", diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 92ce3565409d..d707a6072590 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -25,7 +25,6 @@ #include "source/common/http/headers.h" #include "source/common/http/http2/codec_stats.h" #include "source/common/http/utility.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/runtime/runtime_features.h" #include "absl/cleanup/cleanup.h" @@ -2066,8 +2065,8 @@ ConnectionImpl::ClientHttp2Options::ClientHttp2Options( #endif } -ExecutionContext* ConnectionImpl::executionContext() const { - return getConnectionExecutionContext(connection_); +OptRef ConnectionImpl::trackedStream() const { + return connection_.trackedStream(); } void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const { diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index e7603e24d690..f93f33477348 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -179,7 +179,7 @@ class ConnectionImpl : public virtual Connection, } // ScopeTrackedObject - ExecutionContext* executionContext() const override; + OptRef trackedStream() const override; void dumpState(std::ostream& os, int indent_level) const override; protected: diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 7eefab3fa379..bb9b174fff67 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -80,7 +80,6 @@ envoy_cc_library( srcs = ["connection_impl_base.cc"], hdrs = ["connection_impl_base.h"], deps = [ - ":common_connection_filter_states_lib", ":connection_socket_lib", ":filter_manager_lib", "//envoy/common:scope_tracker_interface", @@ -624,14 +623,3 @@ envoy_cc_library( "//envoy/network:filter_interface", ], ) - -envoy_cc_library( - name = "common_connection_filter_states_lib", - srcs = ["common_connection_filter_states.cc"], - hdrs = ["common_connection_filter_states.h"], - deps = [ - "//envoy/common:execution_context", - "//envoy/network:connection_interface", - "//envoy/stream_info:filter_state_interface", - ], -) diff --git a/source/common/network/common_connection_filter_states.cc b/source/common/network/common_connection_filter_states.cc deleted file mode 100644 index 6aef205ac184..000000000000 --- a/source/common/network/common_connection_filter_states.cc +++ /dev/null @@ -1,14 +0,0 @@ -#include "source/common/network/common_connection_filter_states.h" - -namespace Envoy { -namespace Network { - -ExecutionContext* getConnectionExecutionContext(const Network::Connection& connection) { - const ConnectionExecutionContextFilterState* filter_state = - connection.streamInfo().filterState().getDataReadOnly( - kConnectionExecutionContextFilterStateName); - return filter_state == nullptr ? nullptr : filter_state->executionContext(); -} - -} // namespace Network -} // namespace Envoy diff --git a/source/common/network/common_connection_filter_states.h b/source/common/network/common_connection_filter_states.h deleted file mode 100644 index 4de4e97647d1..000000000000 --- a/source/common/network/common_connection_filter_states.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include "envoy/common/execution_context.h" -#include "envoy/network/connection.h" -#include "envoy/stream_info/filter_state.h" - -namespace Envoy { -namespace Network { - -static constexpr absl::string_view kConnectionExecutionContextFilterStateName = - "envoy.network.connection_execution_context"; - -// ConnectionExecutionContextFilterState is an optional connection-level filter state that goes by -// the name kConnectionExecutionContextFilterStateName. It owns a ExecutionContext, whose -// activate/deactivate methods will be called when a thread starts/finishes running code on behalf -// of the corresponding connection. -class ConnectionExecutionContextFilterState : public Envoy::StreamInfo::FilterState::Object { -public: - // It is safe, although useless, to set execution_context to nullptr. - explicit ConnectionExecutionContextFilterState( - std::unique_ptr execution_context) - : execution_context_(std::move(execution_context)) {} - - ExecutionContext* executionContext() const { return execution_context_.get(); } - -private: - std::unique_ptr execution_context_; -}; - -// Returns the ExecutionContext of a connection, if any. Or nullptr if not found. -ExecutionContext* getConnectionExecutionContext(const Network::Connection& connection); - -} // namespace Network -} // namespace Envoy diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index a66ffeb1866c..237f027e9311 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -148,6 +148,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; + DetectedCloseType detectedCloseType() const override { return detected_close_type_; } protected: diff --git a/source/common/network/connection_impl_base.cc b/source/common/network/connection_impl_base.cc index e0586db6a832..e047afd0382e 100644 --- a/source/common/network/connection_impl_base.cc +++ b/source/common/network/connection_impl_base.cc @@ -1,7 +1,5 @@ #include "source/common/network/connection_impl_base.h" -#include "source/common/network/common_connection_filter_states.h" - namespace Envoy { namespace Network { @@ -30,6 +28,10 @@ void ConnectionImplBase::removeConnectionCallbacks(ConnectionCallbacks& callback } } +OptRef ConnectionImplBase::trackedStream() const { + return streamInfo(); +} + void ConnectionImplBase::hashKey(std::vector& hash) const { addIdToHashKey(hash, id()); } void ConnectionImplBase::setConnectionStats(const ConnectionStats& stats) { @@ -42,10 +44,6 @@ void ConnectionImplBase::setDelayedCloseTimeout(std::chrono::milliseconds timeou delayed_close_timeout_ = timeout; } -ExecutionContext* ConnectionImplBase::executionContext() const { - return getConnectionExecutionContext(*this); -} - void ConnectionImplBase::initializeDelayedCloseTimer() { const auto timeout = delayed_close_timeout_.count(); ASSERT(delayed_close_timer_ == nullptr && timeout > 0); diff --git a/source/common/network/connection_impl_base.h b/source/common/network/connection_impl_base.h index 4b0104adbecd..32660da1b127 100644 --- a/source/common/network/connection_impl_base.h +++ b/source/common/network/connection_impl_base.h @@ -29,7 +29,8 @@ class ConnectionImplBase : public FilterManagerConnection, void setConnectionStats(const ConnectionStats& stats) override; void setDelayedCloseTimeout(std::chrono::milliseconds timeout) override; - ExecutionContext* executionContext() const override; + // ScopeTrackedObject + OptRef trackedStream() const override; protected: void initializeDelayedCloseTimer(); diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 0d09985c9383..54f067e2c63a 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -115,9 +115,6 @@ RUNTIME_GUARD(envoy_restart_features_use_fast_protobuf_hash); // Begin false flags. Most of them should come with a TODO to flip true. -// Execution context is optional and must be enabled explicitly. -// See https://github.com/envoyproxy/envoy/issues/32012. -FALSE_RUNTIME_GUARD(envoy_restart_features_enable_execution_context); // Sentinel and test flag. FALSE_RUNTIME_GUARD(envoy_reloadable_features_test_feature_false); // TODO(paul-r-gall) Make this enabled by default after additional soak time. diff --git a/test/common/common/BUILD b/test/common/common/BUILD index 591dc9a8a6cc..d15e11f00fc6 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -596,8 +596,14 @@ envoy_benchmark_test( envoy_cc_test( name = "execution_context_test", srcs = ["execution_context_test.cc"], + copts = [ + "-DENVOY_ENABLE_EXECUTION_CONTEXT", + ], rbe_pool = "2core", deps = [ "//envoy/common:execution_context", + "//source/common/api:api_lib", + "//test/mocks:common_lib", + "//test/mocks/stream_info:stream_info_mocks", ], ) diff --git a/test/common/common/execution_context_test.cc b/test/common/common/execution_context_test.cc index 25d886314e1f..8aed2f00eaf8 100644 --- a/test/common/common/execution_context_test.cc +++ b/test/common/common/execution_context_test.cc @@ -1,5 +1,15 @@ +#include + #include "envoy/common/execution_context.h" +#include "source/common/api/api_impl.h" +#include "source/common/common/scope_tracker.h" + +#include "test/mocks/common.h" +#include "test/mocks/stream_info/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" #include "gtest/gtest.h" namespace Envoy { @@ -27,45 +37,104 @@ class TestExecutionContext : public ExecutionContext { int activation_generations_ = 0; }; -TEST(ExecutionContextTest, NullContext) { - ScopedExecutionContext scoped_context(nullptr); - EXPECT_TRUE(scoped_context.isNull()); +class ExecutionContextTest : public testing::Test { +public: + ExecutionContextTest() { + ON_CALL(tracked_object_, trackedStream()) + .WillByDefault(testing::Return(OptRef(stream_info_))); + } - ScopedExecutionContext scoped_context2; - EXPECT_TRUE(scoped_context2.isNull()); + void setWithoutContext() { + context_ = nullptr; + stream_info_.filter_state_ = std::make_shared( + StreamInfo::FilterState::LifeSpan::Connection); + } + void setWithContext() { + context_ = std::make_shared(); + stream_info_.filter_state_ = std::make_shared( + StreamInfo::FilterState::LifeSpan::Connection); + stream_info_.filter_state_->setData(kConnectionExecutionContextFilterStateName, context_, + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::Connection); + } + + testing::NiceMock stream_info_; + testing::NiceMock tracked_object_; + std::shared_ptr context_{}; +}; + +TEST_F(ExecutionContextTest, NullContext) { + { + ScopedExecutionContext scoped_context(nullptr); + EXPECT_TRUE(scoped_context.isNull()); + } + { + ScopedExecutionContext scoped_context; + EXPECT_TRUE(scoped_context.isNull()); + } + { + setWithoutContext(); + ScopedExecutionContext scoped_context(&tracked_object_); + EXPECT_TRUE(scoped_context.isNull()); + } } -TEST(ExecutionContextTest, NestedScopes) { - TestExecutionContext context; - EXPECT_EQ(context.activationDepth(), 0); - EXPECT_EQ(context.activationGenerations(), 0); +TEST_F(ExecutionContextTest, NestedScopes) { + setWithContext(); + + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 0); { - ScopedExecutionContext scoped_context(&context); - EXPECT_EQ(context.activationDepth(), 1); - EXPECT_EQ(context.activationGenerations(), 1); + ScopedExecutionContext scoped_context(&tracked_object_); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), 1); { - ScopedExecutionContext nested_scoped_context(&context); - EXPECT_EQ(context.activationDepth(), 2); - EXPECT_EQ(context.activationGenerations(), 1); + ScopedExecutionContext nested_scoped_context(&tracked_object_); + EXPECT_EQ(context_->activationDepth(), 2); + EXPECT_EQ(context_->activationGenerations(), 1); } - EXPECT_EQ(context.activationDepth(), 1); - EXPECT_EQ(context.activationGenerations(), 1); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), 1); } - EXPECT_EQ(context.activationDepth(), 0); - EXPECT_EQ(context.activationGenerations(), 1); + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 1); } -TEST(ExecutionContextTest, DisjointScopes) { - TestExecutionContext context; +TEST_F(ExecutionContextTest, DisjointScopes) { + setWithContext(); for (int i = 1; i < 5; i++) { - ScopedExecutionContext scoped_context(&context); - EXPECT_EQ(context.activationDepth(), 1); - EXPECT_EQ(context.activationGenerations(), i); + ScopedExecutionContext scoped_context(&tracked_object_); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), i); + } + + EXPECT_EQ(context_->activationDepth(), 0); +} + +TEST_F(ExecutionContextTest, InScopeTrackerScopeState) { + + Api::ApiPtr api(Api::createApiForTest()); + Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); + EXPECT_CALL(tracked_object_, trackedStream()) + .Times(2) + .WillRepeatedly(testing::Return(OptRef(stream_info_))); + + setWithContext(); + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 0); + { + ScopeTrackerScopeState scope(&tracked_object_, *dispatcher); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), 1); } - EXPECT_EQ(context.activationDepth(), 0); + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 1); + + setWithoutContext(); + { ScopeTrackerScopeState scope(&tracked_object_, *dispatcher); } } } // namespace Envoy diff --git a/test/common/common/scope_tracker_test.cc b/test/common/common/scope_tracker_test.cc index 7d928b18019e..76722b08de81 100644 --- a/test/common/common/scope_tracker_test.cc +++ b/test/common/common/scope_tracker_test.cc @@ -14,15 +14,7 @@ namespace Envoy { using testing::_; -class ScopeTrackerScopeStateTest : public testing::Test { -protected: - void setExecutionContextEnabled(bool enabled) { - ScopeTrackerScopeState::executionContextEnabled() = enabled; - } -}; - -TEST_F(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { - setExecutionContextEnabled(false); +TEST(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { Api::ApiPtr api(Api::createApiForTest()); Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); MockScopeTrackedObject tracked_object; @@ -40,13 +32,4 @@ TEST_F(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { static_cast(dispatcher.get())->onFatalError(std::cerr); } -TEST_F(ScopeTrackerScopeStateTest, ExecutionContextEnabled) { - setExecutionContextEnabled(true); - Api::ApiPtr api(Api::createApiForTest()); - Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); - MockScopeTrackedObject tracked_object; - EXPECT_CALL(tracked_object, executionContext()); - ScopeTrackerScopeState scope(&tracked_object, *dispatcher); -} - } // namespace Envoy diff --git a/test/common/network/BUILD b/test/common/network/BUILD index 16a6dc85fa9f..7b0a24d69f54 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -82,7 +82,6 @@ envoy_cc_test( "//source/common/common:empty_string", "//source/common/event:dispatcher_includes", "//source/common/event:dispatcher_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/network:connection_lib", "//source/common/network:listen_socket_lib", "//source/common/network:utility_lib", diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 11c769ead5a7..f5cc602f07a6 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -15,7 +15,6 @@ #include "source/common/common/utility.h" #include "source/common/event/dispatcher_impl.h" #include "source/common/network/address_impl.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/network/connection_impl.h" #include "source/common/network/io_socket_handle_impl.h" #include "source/common/network/listen_socket_impl.h" @@ -68,16 +67,6 @@ class MockInternalListenerManager : public InternalListenerManager { MOCK_METHOD(InternalListenerOptRef, findByAddress, (const Address::InstanceConstSharedPtr&), ()); }; -class NoopConnectionExecutionContext : public ExecutionContext { -public: - NoopConnectionExecutionContext() = default; - ~NoopConnectionExecutionContext() override = default; - -protected: - void activate() override {} - void deactivate() override {} -}; - TEST(RawBufferSocket, TestBasics) { TransportSocketPtr raw_buffer_socket(Network::Test::createRawBufferSocket()); EXPECT_FALSE(raw_buffer_socket->ssl()); @@ -348,27 +337,6 @@ TEST_P(ConnectionImplTest, SetSslConnection) { disconnect(false); } -TEST_P(ConnectionImplTest, SetGetExecutionContextFilterState) { - setUpBasicConnection(); - connect(); - - EXPECT_EQ(getConnectionExecutionContext(*client_connection_), nullptr); - - const StreamInfo::FilterStateSharedPtr& filter_state = - client_connection_->streamInfo().filterState(); - auto connection_execution_context = std::make_unique(); - const NoopConnectionExecutionContext* context_pointer = - connection_execution_context.get(); // Not owned. - auto filter_state_object = std::make_shared( - std::move(connection_execution_context)); - filter_state->setData(kConnectionExecutionContextFilterStateName, filter_state_object, - StreamInfo::FilterState::StateType::ReadOnly, - StreamInfo::FilterState::LifeSpan::Connection); - - EXPECT_EQ(getConnectionExecutionContext(*client_connection_), context_pointer); - disconnect(true); -} - TEST_P(ConnectionImplTest, GetCongestionWindow) { setUpBasicConnection(); connect(); diff --git a/test/mocks/common.h b/test/mocks/common.h index 246aec0f0068..d5ee5c66078d 100644 --- a/test/mocks/common.h +++ b/test/mocks/common.h @@ -103,7 +103,7 @@ inline bool operator==(const StringViewSaver& saver, const char* str) { class MockScopeTrackedObject : public ScopeTrackedObject { public: MOCK_METHOD(void, dumpState, (std::ostream&, int), (const)); - MOCK_METHOD(ExecutionContext*, executionContext, (), (const)); + MOCK_METHOD(OptRef, trackedStream, (), (const)); }; namespace ConnectionPool { diff --git a/test/mocks/network/connection.h b/test/mocks/network/connection.h index 9c8b7bcbfe44..ba776f5f35c9 100644 --- a/test/mocks/network/connection.h +++ b/test/mocks/network/connection.h @@ -96,7 +96,7 @@ class MockConnectionBase { (uint64_t bandwidth_bits_per_sec, std::chrono::microseconds rtt), ()); \ MOCK_METHOD(absl::optional, congestionWindowInBytes, (), (const)); \ MOCK_METHOD(void, dumpState, (std::ostream&, int), (const)); \ - MOCK_METHOD(ExecutionContext*, executionContext, (), (const)); + MOCK_METHOD(OptRef, trackedStream, (), (const)); class MockConnection : public Connection, public MockConnectionBase { public: diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 483ba8404f28..7174b21667f6 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -411,7 +411,6 @@ class MockConnectionSocket : public ConnectionSocket { MOCK_METHOD(absl::optional, lastRoundTripTime, ()); MOCK_METHOD(absl::optional, congestionWindowInBytes, (), (const)); MOCK_METHOD(void, dumpState, (std::ostream&, int), (const)); - MOCK_METHOD(ExecutionContext*, executionContext, (), (const)); IoHandlePtr io_handle_; std::shared_ptr connection_info_provider_;