diff --git a/.github/workflows/_precheck_publish.yml b/.github/workflows/_precheck_publish.yml index 5276446302ad..86181a7bc3d6 100644 --- a/.github/workflows/_precheck_publish.yml +++ b/.github/workflows/_precheck_publish.yml @@ -56,7 +56,7 @@ jobs: --config=cache-envoy-engflow --config=bes-envoy-engflow rbe: false - runs-on: envoy-arm64-medium + runs-on: envoy-arm64-large timeout-minutes: 180 - target: docs name: Docs diff --git a/api/envoy/config/core/v3/protocol.proto b/api/envoy/config/core/v3/protocol.proto index eda87d9408d5..e566278600ab 100644 --- a/api/envoy/config/core/v3/protocol.proto +++ b/api/envoy/config/core/v3/protocol.proto @@ -209,7 +209,7 @@ message AlternateProtocolsCacheOptions { repeated string canonical_suffixes = 5; } -// [#next-free-field: 7] +// [#next-free-field: 8] message HttpProtocolOptions { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.core.HttpProtocolOptions"; @@ -259,11 +259,28 @@ message HttpProtocolOptions { // `. google.protobuf.Duration max_connection_duration = 3; - // The maximum number of headers. If unconfigured, the default - // maximum number of request headers allowed is 100. Requests that exceed this limit will receive - // a 431 response for HTTP/1.x and cause a stream reset for HTTP/2. + // The maximum number of headers (request headers if configured on HttpConnectionManager, + // response headers when configured on a cluster). + // If unconfigured, the default maximum number of headers allowed is 100. + // Downstream requests that exceed this limit will receive a 431 response for HTTP/1.x and cause a stream + // reset for HTTP/2. + // Upstream responses that exceed this limit will result in a 503 response. google.protobuf.UInt32Value max_headers_count = 2 [(validate.rules).uint32 = {gte: 1}]; + // The maximum size of response headers. + // If unconfigured, the default is 60 KiB, except for HTTP/1 response headers which have a default + // of 80KiB. + // Responses that exceed this limit will result in a 503 response. + // In Envoy, this setting is only valid when configured on an upstream cluster, not on the + // :ref:`HTTP Connection Manager + // `. + // + // Note: currently some protocol codecs impose limits on the maximum size of a single header: + // HTTP/2 (when using nghttp2) limits a single header to around 100kb. + // HTTP/3 limits a single header to around 1024kb. + google.protobuf.UInt32Value max_response_headers_kb = 7 + [(validate.rules).uint32 = {lte: 8192 gt: 0}]; + // Total duration to keep alive an HTTP request/response stream. If the time limit is reached the stream will be // reset independent of any other timeouts. If not specified, this value is not set. google.protobuf.Duration max_stream_duration = 4; diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index d1a27657f1b1..13a24ad9fcd7 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -271,18 +271,20 @@ message ExternalProcessor { // The default value is 5000 milliseconds (5 seconds) if not specified. google.protobuf.Duration deferred_close_timeout = 19; - // [#not-implemented-hide:] // Send body to the side stream server once it arrives without waiting for the header response from that server. // It only works for STREAMED body processing mode. For any other body processing modes, it is ignored. - // // The server has two options upon receiving a header request: - // 1. Instant Response: Send the header response as soon as the header request is received. - // 2. Delayed Response: Wait for the body before sending any response. - // If the server chooses the second option, it has two further choices: - // 2.1 Separate Responses: Send the header response first, followed by separate body responses. - // 2.2 Combined Response: Include both the header response and the first chunk of the body response - // in a single body response message, followed by the remaining body responses. + // + // 1. Instant Response: send the header response as soon as the header request is received. + // + // 2. Delayed Response: wait for the body before sending any response. + // // In all scenarios, the header-body ordering must always be maintained. + // + // If enabled Envoy will ignore the + // :ref:`mode_override ` + // value that the server sends in the header response. This is because Envoy may have already + // sent the body to the server, prior to processing the header response. bool send_body_without_waiting_for_header_response = 21; // When :ref:`allow_mode_override diff --git a/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto b/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto index 4cbbbc20d3fb..3d438ae87881 100644 --- a/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto +++ b/api/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto @@ -494,6 +494,10 @@ message HttpConnectionManager { // The maximum request headers size for incoming connections. // If unconfigured, the default max request headers allowed is 60 KiB. // Requests that exceed this limit will receive a 431 response. + // + // Note: currently some protocol codecs impose limits on the maximum size of a single header: + // HTTP/2 (when using nghttp2) limits a single header to around 100kb. + // HTTP/3 limits a single header to around 1024kb. google.protobuf.UInt32Value max_request_headers_kb = 29 [(validate.rules).uint32 = {lte: 8192 gt: 0}]; diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index 5f0d66e65735..6ae58c3c7248 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -180,7 +180,10 @@ message ProcessingResponse { // It is also ignored by Envoy when the ext_proc filter config // :ref:`allow_mode_override // ` - // is set to false. + // is set to false, or + // :ref:`send_body_without_waiting_for_header_response + // ` + // is set to true. envoy.extensions.filters.http.ext_proc.v3.ProcessingMode mode_override = 9; // When ext_proc server receives a request message, in case it needs more @@ -285,9 +288,6 @@ message CommonResponse { // Instructions on how to manipulate the headers. When responding to an // HttpBody request, header mutations will only take effect if // the current processing mode for the body is BUFFERED. - // [#comment:TODO(yanjunxiang-google) rephrase last sentence once send_body_without_waiting_for_header_response is not hidden: - // the current processing mode for the body is: 1) BUFFERED; 2) or STREAMED and - // the :ref:`send_body_without_waiting_for_header_response ` is enabled.] HeaderMutation header_mutation = 2; // Replace the body of the last message sent to the remote server on this diff --git a/bazel/BUILD b/bazel/BUILD index 77da1e1fce10..011b1c88a334 100644 --- a/bazel/BUILD +++ b/bazel/BUILD @@ -517,6 +517,11 @@ config_setting( values = {"define": "perf_annotation=enabled"}, ) +config_setting( + name = "enable_execution_context", + values = {"define": "execution_context=enabled"}, +) + config_setting( name = "enable_perf_tracing", values = {"define": "perf_tracing=enabled"}, diff --git a/bazel/envoy_internal.bzl b/bazel/envoy_internal.bzl index b84d93ef7a5b..015659851c1b 100644 --- a/bazel/envoy_internal.bzl +++ b/bazel/envoy_internal.bzl @@ -125,6 +125,7 @@ def envoy_copts(repository, test = False): envoy_select_static_extension_registration(["-DENVOY_STATIC_EXTENSION_REGISTRATION"], repository) + \ envoy_select_disable_logging(["-DENVOY_DISABLE_LOGGING"], repository) + \ _envoy_select_perf_annotation(["-DENVOY_PERF_ANNOTATION"]) + \ + _envoy_select_execution_context() + \ _envoy_select_perfetto(["-DENVOY_PERFETTO"]) + \ envoy_select_google_grpc(["-DENVOY_GOOGLE_GRPC"], repository) + \ envoy_select_signal_trace(["-DENVOY_HANDLE_SIGNALS"], repository) + \ @@ -190,6 +191,12 @@ def _envoy_select_perf_annotation(xs): "//conditions:default": [], }) +def _envoy_select_execution_context(): + return select({ + "@envoy//bazel:enable_execution_context": ["-DENVOY_ENABLE_EXECUTION_CONTEXT"], + "//conditions:default": [], + }) + def _envoy_select_perfetto(xs): return select({ "@envoy//bazel:enable_perf_tracing": xs, diff --git a/changelogs/current.yaml b/changelogs/current.yaml index ffffa0f1d12e..72fca5a24c44 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -54,6 +54,10 @@ behavior_changes: `. This change can be disabled by setting the runtime guard flag ``envoy.reloadable_features.filter_access_loggers_first`` to ``false``. +- area: monitoring + change: | + Removed runtime feature flag ``envoy.restart_features.enable_execution_context``. The execution context feature + now could be enabled only by setting compile option ``--define=execution_context=enabled``. minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* @@ -77,6 +81,10 @@ minor_behavior_changes: change: | When Lua script executes httpCall, backpressure is exercised when receiving body from downstream client. This behavior can be reverted by setting the runtime guard ``envoy.reloadable_features.lua_flow_control_while_http_call`` to false. +- area: ext_proc + change: | + Added support for :ref:`send_body_without_waiting_for_header_response + `. - area: http change: | Modified the authority header value validator to allow the same characters as oghttp2 @@ -292,6 +300,10 @@ new_features: change: | Added full feature absl::FormatTime() support to the DateFormatter. This allows the timepoint formatters (like ``%START_TIME%``) to use ``%E#S``, ``%E*S``, ``%E#f`` and ``%E*f`` to format the subsecond part of the timepoint. +- area: http + change: | + Added configuration setting for the :ref:`maximum size of response headers + ` in responses. - area: http_11_proxy change: | Added the option to configure the transport socket via locality or endpoint metadata. diff --git a/docs/root/intro/arch_overview/advanced/attributes.rst b/docs/root/intro/arch_overview/advanced/attributes.rst index eae665abf3b5..f3f8e6fb84c0 100644 --- a/docs/root/intro/arch_overview/advanced/attributes.rst +++ b/docs/root/intro/arch_overview/advanced/attributes.rst @@ -93,6 +93,7 @@ Response attributes are only available after the request completes. response.trailers, "map", All response trailers indexed by the lower-cased trailer name response.size, int, Size of the response body response.total_size, int, Total size of the response including the approximate uncompressed size of the headers and the trailers + response.backend_latency, duration, Duration between the first byte sent to and the last byte received from the upstream backend Connection attributes --------------------- diff --git a/envoy/common/BUILD b/envoy/common/BUILD index 3c8680576e4b..12c71b554bbf 100644 --- a/envoy/common/BUILD +++ b/envoy/common/BUILD @@ -124,15 +124,19 @@ envoy_cc_library( envoy_cc_library( name = "execution_context", hdrs = ["execution_context.h"], - deps = [":pure_lib"], + deps = [ + ":pure_lib", + ":scope_tracker_interface", + ], ) envoy_cc_library( name = "scope_tracker_interface", hdrs = ["scope_tracker.h"], deps = [ - ":execution_context", + ":optref_lib", ":pure_lib", + "//envoy/stream_info:stream_info_interface", ], ) diff --git a/envoy/common/execution_context.h b/envoy/common/execution_context.h index fb227a05af41..b723a221bcae 100644 --- a/envoy/common/execution_context.h +++ b/envoy/common/execution_context.h @@ -3,22 +3,25 @@ #include #include "envoy/common/pure.h" +#include "envoy/common/scope_tracker.h" +#include "envoy/stream_info/stream_info.h" #include "source/common/common/non_copyable.h" namespace Envoy { +#ifdef ENVOY_ENABLE_EXECUTION_CONTEXT + +static constexpr absl::string_view kConnectionExecutionContextFilterStateName = + "envoy.network.connection_execution_context"; + class ScopedExecutionContext; // ExecutionContext can be inherited by subclasses to represent arbitrary information associated // with the execution of a piece of code. activate/deactivate are called when the said execution // starts/ends. For an example usage, please see // https://github.com/envoyproxy/envoy/issues/32012. -class ExecutionContext : NonCopyable { -public: - ExecutionContext() = default; - virtual ~ExecutionContext() = default; - +class ExecutionContext : public StreamInfo::FilterState::Object, NonCopyable { protected: // Called when the current thread starts to run code on behalf of the owner of this object. // protected because it should only be called by ScopedExecutionContext. @@ -43,7 +46,8 @@ class ExecutionContext : NonCopyable { class ScopedExecutionContext : NonCopyable { public: ScopedExecutionContext() : ScopedExecutionContext(nullptr) {} - ScopedExecutionContext(ExecutionContext* context) : context_(context) { + ScopedExecutionContext(const ScopeTrackedObject* object) + : context_(object != nullptr ? getExecutionContext(object->trackedStream()) : nullptr) { if (context_ != nullptr) { context_->activate(); } @@ -62,7 +66,18 @@ class ScopedExecutionContext : NonCopyable { bool isNull() const { return context_ == nullptr; } private: + ExecutionContext* getExecutionContext(OptRef info) { + if (!info.has_value()) { + return nullptr; + } + const auto* const_context = info->filterState().getDataReadOnly( + kConnectionExecutionContextFilterStateName); + return const_cast(const_context); + } + ExecutionContext* context_; }; +#endif + } // namespace Envoy diff --git a/envoy/common/scope_tracker.h b/envoy/common/scope_tracker.h index f96425541513..e846f284e412 100644 --- a/envoy/common/scope_tracker.h +++ b/envoy/common/scope_tracker.h @@ -2,16 +2,17 @@ #include -#include "envoy/common/execution_context.h" +#include "envoy/common/optref.h" #include "envoy/common/pure.h" +#include "envoy/stream_info/stream_info.h" namespace Envoy { /* * An interface for tracking the scope of work. Implementors of this interface * can be registered to the dispatcher when they're active on the stack. If a - * fatal error occurs while they were active, the dumpState method will be - * called. + * fatal error occurs while they were active, the dumpState() method will be + * called to output the active state. * * Currently this is only used for the L4 network connection and L7 stream. */ @@ -20,9 +21,11 @@ class ScopeTrackedObject { virtual ~ScopeTrackedObject() = default; /** - * If the tracked object has a ExecutionContext, returns it. Returns nullptr otherwise. + * Return the tracked stream info that related to the scope tracked object (L4 + * network connection or L7 stream). + * @return optional reference to stream info of stream (L4 connection or L7 stream). */ - virtual ExecutionContext* executionContext() const { return nullptr; } + virtual OptRef trackedStream() const { return {}; } /** * Dump debug state of the object in question to the provided ostream. diff --git a/envoy/network/BUILD b/envoy/network/BUILD index f40b97d53073..95ab4e4677e5 100644 --- a/envoy/network/BUILD +++ b/envoy/network/BUILD @@ -25,6 +25,7 @@ envoy_cc_library( ":filter_interface", ":listen_socket_interface", "//envoy/buffer:buffer_interface", + "//envoy/common:scope_tracker_interface", "//envoy/event:deferred_deletable", "//envoy/ssl:connection_interface", "//envoy/stream_info:stream_info_interface", @@ -174,7 +175,6 @@ envoy_cc_library( deps = [ ":io_handle_interface", ":socket_interface", - "//envoy/common:scope_tracker_interface", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], ) diff --git a/envoy/network/listen_socket.h b/envoy/network/listen_socket.h index e600d2d0e42c..08f4a4d5a672 100644 --- a/envoy/network/listen_socket.h +++ b/envoy/network/listen_socket.h @@ -7,7 +7,6 @@ #include "envoy/common/exception.h" #include "envoy/common/pure.h" -#include "envoy/common/scope_tracker.h" #include "envoy/config/core/v3/base.pb.h" #include "envoy/network/address.h" #include "envoy/network/io_handle.h" @@ -26,7 +25,7 @@ namespace Network { * TODO(jrajahalme): Hide internals (e.g., fd) from listener filters by providing callbacks filters * may need (set/getsockopt(), peek(), recv(), etc.) */ -class ConnectionSocket : public virtual Socket, public virtual ScopeTrackedObject { +class ConnectionSocket : public virtual Socket { public: /** * Set detected transport protocol (e.g. RAW_BUFFER, TLS). @@ -83,6 +82,16 @@ class ConnectionSocket : public virtual Socket, public virtual ScopeTrackedObjec * return value is cwnd(in packets) times the connection's MSS. */ virtual absl::optional congestionWindowInBytes() const PURE; + + /** + * Dump debug state of the object in question to the provided ostream. + * + * This is called on Envoy fatal errors, so should do minimal memory allocation. + * + * @param os the ostream to output to. + * @param indent_level how far to indent, for pretty-printed classes and subclasses. + */ + virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE; }; using ConnectionSocketPtr = std::unique_ptr; diff --git a/envoy/upstream/upstream.h b/envoy/upstream/upstream.h index e307de9471f7..ca704d02fdb6 100644 --- a/envoy/upstream/upstream.h +++ b/envoy/upstream/upstream.h @@ -1068,6 +1068,11 @@ class ClusterInfo : public Http::FilterChainFactory { */ virtual uint32_t maxResponseHeadersCount() const PURE; + /** + * @return uint32_t the maximum total size of response headers in KB. + */ + virtual absl::optional maxResponseHeadersKb() const PURE; + /** * @return the human readable name of the cluster. */ diff --git a/source/common/common/scope_tracker.h b/source/common/common/scope_tracker.h index dfe994704245..9deb53381d78 100644 --- a/source/common/common/scope_tracker.h +++ b/source/common/common/scope_tracker.h @@ -18,9 +18,7 @@ namespace Envoy { class ScopeTrackerScopeState { public: ScopeTrackerScopeState(const ScopeTrackedObject* object, Event::ScopeTracker& tracker) - : registered_object_(object), - scoped_execution_context_(executionContextEnabled() ? object->executionContext() : nullptr), - tracker_(tracker) { + : registered_object_(object), tracker_(tracker) { tracker_.pushTrackedObject(registered_object_); } @@ -36,14 +34,13 @@ class ScopeTrackerScopeState { private: friend class ScopeTrackerScopeStateTest; - static bool& executionContextEnabled() { - static bool enabled = - Runtime::runtimeFeatureEnabled("envoy.restart_features.enable_execution_context"); - return enabled; - } + const ScopeTrackedObject* registered_object_; - ScopedExecutionContext scoped_execution_context_; Event::ScopeTracker& tracker_; + +#ifdef ENVOY_ENABLE_EXECUTION_CONTEXT + ScopedExecutionContext scoped_execution_context_{registered_object_}; +#endif }; } // namespace Envoy diff --git a/source/common/http/BUILD b/source/common/http/BUILD index f0dc9190b85c..cf716abd8789 100644 --- a/source/common/http/BUILD +++ b/source/common/http/BUILD @@ -392,7 +392,6 @@ envoy_cc_library( "//source/common/config:utility_lib", "//source/common/http/http1:codec_lib", "//source/common/http/http2:codec_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/network:proxy_protocol_filter_state_lib", "//source/common/network:utility_lib", "//source/common/quic:quic_server_factory_stub_lib", diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index 2f74974d983d..77fb57a814ea 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -282,13 +282,14 @@ CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& } codec_ = std::make_unique( *connection_, host->cluster().http1CodecStats(), *this, host->cluster().http1Settings(), - host->cluster().maxResponseHeadersCount(), proxied); + host->cluster().maxResponseHeadersKb(), host->cluster().maxResponseHeadersCount(), proxied); break; } case CodecType::HTTP2: codec_ = std::make_unique( *connection_, *this, host->cluster().http2CodecStats(), random_generator, - host->cluster().http2Options(), Http::DEFAULT_MAX_REQUEST_HEADERS_KB, + host->cluster().http2Options(), + host->cluster().maxResponseHeadersKb().value_or(Http::DEFAULT_MAX_REQUEST_HEADERS_KB), host->cluster().maxResponseHeadersCount(), Http2::ProdNghttp2SessionFactory::get()); break; case CodecType::HTTP3: { @@ -296,7 +297,8 @@ CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& auto& quic_session = dynamic_cast(*connection_); codec_ = std::make_unique( quic_session, *this, host->cluster().http3CodecStats(), host->cluster().http3Options(), - Http::DEFAULT_MAX_REQUEST_HEADERS_KB, host->cluster().maxResponseHeadersCount()); + host->cluster().maxResponseHeadersKb().value_or(Http::DEFAULT_MAX_REQUEST_HEADERS_KB), + host->cluster().maxResponseHeadersCount()); // Initialize the session after max request header size is changed in above http client // connection creation. quic_session.Initialize(); diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index aef542307ff1..d4f73ccef45d 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -45,7 +45,6 @@ #include "source/common/http/user_agent.h" #include "source/common/http/utility.h" #include "source/common/local_reply/local_reply.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/network/proxy_protocol_filter_state.h" #include "source/common/stream_info/stream_info_impl.h" #include "source/common/tracing/http_tracer_impl.h" @@ -218,10 +217,9 @@ class ConnectionManagerImpl : Logger::Loggable, } // ScopeTrackedObject - ExecutionContext* executionContext() const override { - return getConnectionExecutionContext(connection_manager_.read_callbacks_->connection()); + OptRef trackedStream() const override { + return filter_manager_.trackedStream(); } - void dumpState(std::ostream& os, int indent_level = 0) const override { const char* spaces = spacesForLevel(indent_level); os << spaces << "ActiveStream " << this << DUMP_MEMBER(stream_id_); diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index f1698144b390..1798106f864a 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -651,6 +651,7 @@ class FilterManager : public ScopeTrackedObject, } // ScopeTrackedObject + OptRef trackedStream() const override { return streamInfo(); } void dumpState(std::ostream& os, int indent_level = 0) const override { const char* spaces = spacesForLevel(indent_level); os << spaces << "FilterManager " << this << DUMP_MEMBER(state_.has_1xx_headers_) diff --git a/source/common/http/http1/BUILD b/source/common/http/http1/BUILD index 0e31c65919c8..564dfd31f354 100644 --- a/source/common/http/http1/BUILD +++ b/source/common/http/http1/BUILD @@ -60,7 +60,6 @@ envoy_cc_library( "//source/common/http:headers_lib", "//source/common/http:status_lib", "//source/common/http:utility_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/runtime:runtime_features_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 5e054058a996..aea743c5f67f 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -24,7 +24,6 @@ #include "source/common/http/http1/header_formatter.h" #include "source/common/http/http1/legacy_parser_impl.h" #include "source/common/http/utility.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/runtime/runtime_features.h" #include "absl/container/fixed_array.h" @@ -976,8 +975,8 @@ void ConnectionImpl::onResetStreamBase(StreamResetReason reason) { onResetStream(reason); } -ExecutionContext* ConnectionImpl::executionContext() const { - return getConnectionExecutionContext(connection_); +OptRef ConnectionImpl::trackedStream() const { + return connection_.trackedStream(); } void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const { @@ -1428,9 +1427,11 @@ void ServerConnectionImpl::ActiveRequest::dumpState(std::ostream& os, int indent ClientConnectionImpl::ClientConnectionImpl(Network::Connection& connection, CodecStats& stats, ConnectionCallbacks&, const Http1Settings& settings, + absl::optional max_response_headers_kb, const uint32_t max_response_headers_count, bool passing_through_proxy) - : ConnectionImpl(connection, stats, settings, MessageType::Response, MAX_RESPONSE_HEADERS_KB, + : ConnectionImpl(connection, stats, settings, MessageType::Response, + max_response_headers_kb.value_or(MAX_RESPONSE_HEADERS_KB), max_response_headers_count), owned_output_buffer_(connection.dispatcher().getWatermarkFactory().createBuffer( [&]() -> void { this->onBelowLowWatermark(); }, diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 26e3bfe82d9f..0efd6cdc7f7d 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -272,7 +272,7 @@ class ConnectionImpl : public virtual Connection, Envoy::Http::Status codec_status_; // ScopeTrackedObject - ExecutionContext* executionContext() const override; + OptRef trackedStream() const override; void dumpState(std::ostream& os, int indent_level) const override; protected: @@ -590,6 +590,7 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { public: ClientConnectionImpl(Network::Connection& connection, CodecStats& stats, ConnectionCallbacks& callbacks, const Http1Settings& settings, + absl::optional max_response_headers_kb, const uint32_t max_response_headers_count, bool passing_through_proxy = false); // Http::ClientConnection diff --git a/source/common/http/http2/BUILD b/source/common/http/http2/BUILD index 281377db831e..34b9dadd3868 100644 --- a/source/common/http/http2/BUILD +++ b/source/common/http/http2/BUILD @@ -56,7 +56,6 @@ envoy_cc_library( "//source/common/http:headers_lib", "//source/common/http:status_lib", "//source/common/http:utility_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/runtime:runtime_features_lib", "@com_github_google_quiche//:http2_adapter", "@com_google_absl//absl/algorithm", diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 92ce3565409d..d707a6072590 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -25,7 +25,6 @@ #include "source/common/http/headers.h" #include "source/common/http/http2/codec_stats.h" #include "source/common/http/utility.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/runtime/runtime_features.h" #include "absl/cleanup/cleanup.h" @@ -2066,8 +2065,8 @@ ConnectionImpl::ClientHttp2Options::ClientHttp2Options( #endif } -ExecutionContext* ConnectionImpl::executionContext() const { - return getConnectionExecutionContext(connection_); +OptRef ConnectionImpl::trackedStream() const { + return connection_.trackedStream(); } void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const { diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index e7603e24d690..f93f33477348 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -179,7 +179,7 @@ class ConnectionImpl : public virtual Connection, } // ScopeTrackedObject - ExecutionContext* executionContext() const override; + OptRef trackedStream() const override; void dumpState(std::ostream& os, int indent_level) const override; protected: diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 7eefab3fa379..bb9b174fff67 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -80,7 +80,6 @@ envoy_cc_library( srcs = ["connection_impl_base.cc"], hdrs = ["connection_impl_base.h"], deps = [ - ":common_connection_filter_states_lib", ":connection_socket_lib", ":filter_manager_lib", "//envoy/common:scope_tracker_interface", @@ -624,14 +623,3 @@ envoy_cc_library( "//envoy/network:filter_interface", ], ) - -envoy_cc_library( - name = "common_connection_filter_states_lib", - srcs = ["common_connection_filter_states.cc"], - hdrs = ["common_connection_filter_states.h"], - deps = [ - "//envoy/common:execution_context", - "//envoy/network:connection_interface", - "//envoy/stream_info:filter_state_interface", - ], -) diff --git a/source/common/network/common_connection_filter_states.cc b/source/common/network/common_connection_filter_states.cc deleted file mode 100644 index 6aef205ac184..000000000000 --- a/source/common/network/common_connection_filter_states.cc +++ /dev/null @@ -1,14 +0,0 @@ -#include "source/common/network/common_connection_filter_states.h" - -namespace Envoy { -namespace Network { - -ExecutionContext* getConnectionExecutionContext(const Network::Connection& connection) { - const ConnectionExecutionContextFilterState* filter_state = - connection.streamInfo().filterState().getDataReadOnly( - kConnectionExecutionContextFilterStateName); - return filter_state == nullptr ? nullptr : filter_state->executionContext(); -} - -} // namespace Network -} // namespace Envoy diff --git a/source/common/network/common_connection_filter_states.h b/source/common/network/common_connection_filter_states.h deleted file mode 100644 index 4de4e97647d1..000000000000 --- a/source/common/network/common_connection_filter_states.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include "envoy/common/execution_context.h" -#include "envoy/network/connection.h" -#include "envoy/stream_info/filter_state.h" - -namespace Envoy { -namespace Network { - -static constexpr absl::string_view kConnectionExecutionContextFilterStateName = - "envoy.network.connection_execution_context"; - -// ConnectionExecutionContextFilterState is an optional connection-level filter state that goes by -// the name kConnectionExecutionContextFilterStateName. It owns a ExecutionContext, whose -// activate/deactivate methods will be called when a thread starts/finishes running code on behalf -// of the corresponding connection. -class ConnectionExecutionContextFilterState : public Envoy::StreamInfo::FilterState::Object { -public: - // It is safe, although useless, to set execution_context to nullptr. - explicit ConnectionExecutionContextFilterState( - std::unique_ptr execution_context) - : execution_context_(std::move(execution_context)) {} - - ExecutionContext* executionContext() const { return execution_context_.get(); } - -private: - std::unique_ptr execution_context_; -}; - -// Returns the ExecutionContext of a connection, if any. Or nullptr if not found. -ExecutionContext* getConnectionExecutionContext(const Network::Connection& connection); - -} // namespace Network -} // namespace Envoy diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index a66ffeb1866c..237f027e9311 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -148,6 +148,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // ScopeTrackedObject void dumpState(std::ostream& os, int indent_level) const override; + DetectedCloseType detectedCloseType() const override { return detected_close_type_; } protected: diff --git a/source/common/network/connection_impl_base.cc b/source/common/network/connection_impl_base.cc index e0586db6a832..e047afd0382e 100644 --- a/source/common/network/connection_impl_base.cc +++ b/source/common/network/connection_impl_base.cc @@ -1,7 +1,5 @@ #include "source/common/network/connection_impl_base.h" -#include "source/common/network/common_connection_filter_states.h" - namespace Envoy { namespace Network { @@ -30,6 +28,10 @@ void ConnectionImplBase::removeConnectionCallbacks(ConnectionCallbacks& callback } } +OptRef ConnectionImplBase::trackedStream() const { + return streamInfo(); +} + void ConnectionImplBase::hashKey(std::vector& hash) const { addIdToHashKey(hash, id()); } void ConnectionImplBase::setConnectionStats(const ConnectionStats& stats) { @@ -42,10 +44,6 @@ void ConnectionImplBase::setDelayedCloseTimeout(std::chrono::milliseconds timeou delayed_close_timeout_ = timeout; } -ExecutionContext* ConnectionImplBase::executionContext() const { - return getConnectionExecutionContext(*this); -} - void ConnectionImplBase::initializeDelayedCloseTimer() { const auto timeout = delayed_close_timeout_.count(); ASSERT(delayed_close_timer_ == nullptr && timeout > 0); diff --git a/source/common/network/connection_impl_base.h b/source/common/network/connection_impl_base.h index 4b0104adbecd..32660da1b127 100644 --- a/source/common/network/connection_impl_base.h +++ b/source/common/network/connection_impl_base.h @@ -29,7 +29,8 @@ class ConnectionImplBase : public FilterManagerConnection, void setConnectionStats(const ConnectionStats& stats) override; void setDelayedCloseTimeout(std::chrono::milliseconds timeout) override; - ExecutionContext* executionContext() const override; + // ScopeTrackedObject + OptRef trackedStream() const override; protected: void initializeDelayedCloseTimer(); diff --git a/source/common/protobuf/utility.h b/source/common/protobuf/utility.h index 501cc723a872..2d427e7d9e7a 100644 --- a/source/common/protobuf/utility.h +++ b/source/common/protobuf/utility.h @@ -23,6 +23,12 @@ #define PROTOBUF_GET_WRAPPED_OR_DEFAULT(message, field_name, default_value) \ ((message).has_##field_name() ? (message).field_name().value() : (default_value)) +// Obtain the value of a wrapped field (e.g. google.protobuf.UInt32Value) if set. Otherwise, return +// absl::nullopt. +#define PROTOBUF_GET_OPTIONAL_WRAPPED(message, field_name) \ + ((message).has_##field_name() ? absl::make_optional((message).field_name().value()) \ + : absl::nullopt) + // Obtain the value of a wrapped field (e.g. google.protobuf.UInt32Value) if set. Otherwise, throw // a EnvoyException. diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 0d09985c9383..54f067e2c63a 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -115,9 +115,6 @@ RUNTIME_GUARD(envoy_restart_features_use_fast_protobuf_hash); // Begin false flags. Most of them should come with a TODO to flip true. -// Execution context is optional and must be enabled explicitly. -// See https://github.com/envoyproxy/envoy/issues/32012. -FALSE_RUNTIME_GUARD(envoy_restart_features_enable_execution_context); // Sentinel and test flag. FALSE_RUNTIME_GUARD(envoy_reloadable_features_test_feature_false); // TODO(paul-r-gall) Make this enabled by default after additional soak time. diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index bfee8e6b2baa..195d2ba59acc 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -1229,6 +1229,8 @@ ClusterInfoImpl::ClusterInfoImpl( http_protocol_options_->common_http_protocol_options_, max_headers_count, runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey, Http::DEFAULT_MAX_HEADERS_COUNT))), + max_response_headers_kb_(PROTOBUF_GET_OPTIONAL_WRAPPED( + http_protocol_options_->common_http_protocol_options_, max_response_headers_kb)), type_(config.type()), drain_connections_on_host_removal_(config.ignore_health_on_host_removal()), connection_pool_per_downstream_connection_( diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 47d32468571d..a3d43f7b969e 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -913,6 +913,9 @@ class ClusterInfoImpl : public ClusterInfo, bool maintenanceMode() const override; uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; } uint32_t maxResponseHeadersCount() const override { return max_response_headers_count_; } + absl::optional maxResponseHeadersKb() const override { + return max_response_headers_kb_; + } const std::string& name() const override { return name_; } const std::string& observabilityName() const override { if (observability_name_ != nullptr) { @@ -1126,6 +1129,7 @@ class ClusterInfoImpl : public ClusterInfo, // overhead via alignment const uint32_t per_connection_buffer_limit_bytes_; const uint32_t max_response_headers_count_; + const absl::optional max_response_headers_kb_; const envoy::config::cluster::v3::Cluster::DiscoveryType type_; const bool drain_connections_on_host_removal_ : 1; const bool connection_pool_per_downstream_connection_ : 1; diff --git a/source/extensions/filters/common/expr/context.cc b/source/extensions/filters/common/expr/context.cc index cc30fb794a95..10438dff4df2 100644 --- a/source/extensions/filters/common/expr/context.cc +++ b/source/extensions/filters/common/expr/context.cc @@ -184,6 +184,15 @@ absl::optional ResponseWrapper::operator[](CelValue key) const { return CelValue::CreateString(&details.value()); } return {}; + } else if (value == BackendLatency) { + Envoy::StreamInfo::TimingUtility timing(info_); + const auto last_upstream_rx_byte_received = timing.lastUpstreamRxByteReceived(); + const auto first_upstream_tx_byte_sent = timing.firstUpstreamTxByteSent(); + if (last_upstream_rx_byte_received.has_value() && first_upstream_tx_byte_sent.has_value()) { + return CelValue::CreateDuration(absl::FromChrono(last_upstream_rx_byte_received.value() - + first_upstream_tx_byte_sent.value())); + } + return {}; } return {}; } diff --git a/source/extensions/filters/common/expr/context.h b/source/extensions/filters/common/expr/context.h index 7e5f5d1defec..867776641ee2 100644 --- a/source/extensions/filters/common/expr/context.h +++ b/source/extensions/filters/common/expr/context.h @@ -8,6 +8,7 @@ #include "source/common/http/headers.h" #include "source/common/runtime/runtime_features.h" #include "source/common/singleton/const_singleton.h" +#include "source/common/stream_info/utility.h" #include "eval/public/cel_value.h" #include "eval/public/containers/container_backed_list_impl.h" @@ -47,6 +48,7 @@ constexpr absl::string_view CodeDetails = "code_details"; constexpr absl::string_view Trailers = "trailers"; constexpr absl::string_view Flags = "flags"; constexpr absl::string_view GrpcStatus = "grpc_status"; +constexpr absl::string_view BackendLatency = "backend_latency"; // Per-request or per-connection metadata constexpr absl::string_view Metadata = "metadata"; diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 54493c094fe3..413bbcac7730 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -48,6 +48,8 @@ class ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE; + virtual ExternalProcessorStream* stream() PURE; + virtual void setStream(ExternalProcessorStream* stream) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 745bd3f167c8..8ef177cda00c 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -32,10 +32,15 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override; + ExternalProcessorStream* stream() override { return stream_; } + void setStream(ExternalProcessorStream* stream) override { stream_ = stream; } private: Grpc::AsyncClientManager& client_manager_; Stats::Scope& scope_; + // The gRPC stream to the external processor, which will be opened + // when it's time to send the first message. + ExternalProcessorStream* stream_ = nullptr; }; class ExternalProcessorStreamImpl : public ExternalProcessorStream, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 83dde06362e2..160f51910ff3 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -189,6 +189,8 @@ FilterConfig::FilterConfig( deferred_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, deferred_close_timeout, DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS)), message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms), + send_body_without_waiting_for_header_response_( + config.send_body_without_waiting_for_header_response()), stats_(generateStats(stats_prefix, config.stat_prefix(), scope)), processing_mode_(config.processing_mode()), mutation_checker_(config.mutation_rules(), context.regexEngine()), @@ -339,7 +341,7 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream"); return StreamOpenState::IgnoreError; } - if (!stream_) { + if (!client_->stream()) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); Http::AsyncClient::ParentContext grpc_context; @@ -354,32 +356,33 @@ Filter::StreamOpenState Filter::openStream() { if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called - // Asserts that `stream_` is nullptr since it is not valid to be used any further + // Asserts that `stream_object` is nullptr since it is not valid to be used any further // beyond this point. ASSERT(stream_object == nullptr); return sent_immediate_response_ ? StreamOpenState::Error : StreamOpenState::IgnoreError; } stats_.streams_started_.inc(); - stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), - config_->deferredCloseTimeout()); + ExternalProcessorStream* stream = config_->threadLocalStreamManager().store( + std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); + client_->setStream(stream); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); + logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo()); } } return StreamOpenState::Ok; } void Filter::closeStream() { - if (stream_) { + if (client_->stream()) { ENVOY_LOG(debug, "Calling close on stream"); - if (stream_->close()) { + if (client_->stream()->close()) { stats_.streams_closed_.inc(); } - config_->threadLocalStreamManager().erase(stream_); - stream_ = nullptr; + config_->threadLocalStreamManager().erase(client_->stream()); + client_->setStream(nullptr); } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -387,7 +390,8 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(client_->stream(), + filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -405,8 +409,8 @@ void Filter::onDestroy() { // closure is deferred upon filter destruction with a timer. // First, release the referenced filter resource. - if (stream_ != nullptr) { - stream_->notifyFilterDestroy(); + if (client_->stream() != nullptr) { + client_->stream()->notifyFilterDestroy(); } // Second, perform stream deferred closure. @@ -436,7 +440,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); state.setPaused(true); return FilterHeadersStatus::StopIteration; @@ -493,16 +497,22 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b } if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { - ENVOY_LOG(trace, "Header processing still in progress -- holding body data"); - // We don't know what to do with the body until the response comes back. - // We must buffer it in case we need it when that happens. - // Raise a watermark to prevent a buffer overflow until the response comes back. - // When end_stream is true, we need to StopIterationAndWatermark as well to stop the - // ActiveStream from returning error when the last chunk added to stream buffer exceeds the - // buffer limit. - state.setPaused(true); - state.requestWatermark(); - return FilterDataStatus::StopIterationAndWatermark; + if (state.bodyMode() == ProcessingMode::STREAMED && + config_->sendBodyWithoutWaitingForHeaderResponse()) { + ENVOY_LOG(trace, "Sending body data even header processing is still in progress as body mode " + "is STREAMED and send_body_without_waiting_for_header_response is enabled"); + } else { + ENVOY_LOG(trace, "Header processing still in progress -- holding body data"); + // We don't know what to do with the body until the response comes back. + // We must buffer it in case we need it when that happens. + // Raise a watermark to prevent a buffer overflow until the response comes back. + // When end_stream is true, we need to StopIterationAndWatermark as well to stop the + // ActiveStream from returning error when the last chunk added to stream buffer exceeds the + // buffer limit. + state.setPaused(true); + state.requestWatermark(); + return FilterDataStatus::StopIterationAndWatermark; + } } FilterDataStatus result; @@ -564,11 +574,13 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b // Need to first enqueue the data into the chunk queue before sending. auto req = setupBodyChunk(state, data, end_stream); state.enqueueStreamingChunk(data, end_stream); - sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req); - - // At this point we will continue, but with no data, because that will come later - if (end_stream) { - // But we need to stop iteration for the last chunk because it's our last chance to do stuff + // If the current state is HeadersCallback, stays in that state. + if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { + sendBodyChunk(state, ProcessorState::CallbackState::HeadersCallback, req); + } else { + sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req); + } + if (end_stream || state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { state.setPaused(true); result = FilterDataStatus::StopIterationNoBuffer; } else { @@ -661,7 +673,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers ProcessingRequest req = buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true); ENVOY_LOG(debug, "Sending headers message in observability mode"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); return FilterHeadersStatus::Continue; @@ -686,7 +698,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat // Set up the the body chunk and send. auto req = setupBodyChunk(state, data, end_stream); req.set_observability_mode(true); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); ENVOY_LOG(debug, "Sending body message in ObservabilityMode"); } else if (state.bodyMode() != ProcessingMode::NONE) { @@ -878,7 +890,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState ProcessingRequest& req) { state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } @@ -894,20 +906,21 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::TrailersCallback); ENVOY_LOG(debug, "Sending trailers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } void Filter::logGrpcStreamInfo() { - if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); + if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter(); if (upstream_meter != nullptr) { logging_info_->setBytesSent(upstream_meter->wireBytesSent()); logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); } // Only set upstream host in logging info once. if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); + logging_info_->setUpstreamHost( + client_->stream()->streamInfo().upstreamInfo()->upstreamHost()); } } } @@ -1068,9 +1081,11 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { // Update processing mode now because filter callbacks check it // and the various "handle" methods below may result in callbacks // being invoked in line. This only happens when filter has allow_mode_override - // set to true and filter is waiting for header processing response. + // set to true, send_body_without_waiting_for_header_response set to false, + // and filter is waiting for header processing response. // Otherwise, the response mode_override proto field is ignored. - if (config_->allowModeOverride() && inHeaderProcessState() && response->has_mode_override()) { + if (config_->allowModeOverride() && !config_->sendBodyWithoutWaitingForHeaderResponse() && + inHeaderProcessState() && response->has_mode_override()) { bool mode_override_allowed = true; const auto& mode_overide = response->mode_override(); // First, check if mode override allow-list is configured diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index a908a1f6f775..ce5a9ed4b195 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -216,6 +216,10 @@ class FilterConfig { uint32_t maxMessageTimeout() const { return max_message_timeout_ms_; } + bool sendBodyWithoutWaitingForHeaderResponse() const { + return send_body_without_waiting_for_header_response_; + } + const ExtProcFilterStats& stats() const { return stats_; } const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& processingMode() const { @@ -283,6 +287,7 @@ class FilterConfig { const std::chrono::milliseconds deferred_close_timeout_; const std::chrono::milliseconds message_timeout_; const uint32_t max_message_timeout_ms_; + const bool send_body_without_waiting_for_header_response_; ExtProcFilterStats stats_; const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode processing_mode_; @@ -486,10 +491,6 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; - // The gRPC stream to the external processor, which will be opened - // when it's time to send the first message. - ExternalProcessorStream* stream_ = nullptr; - // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 7b773d391524..b0c569c7d146 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -81,24 +81,57 @@ bool ProcessorState::restartMessageTimer(const uint32_t message_timeout_ms) { } } +void ProcessorState::sendBufferedDataInStreamedMode(bool end_stream) { + // Process the data being buffered in streaming mode. + // Move the current buffer into the queue for remote processing and clear the buffered data. + if (hasBufferedData()) { + Buffer::OwnedImpl buffered_chunk; + modifyBufferedData([&buffered_chunk](Buffer::Instance& data) { buffered_chunk.move(data); }); + ENVOY_LOG(debug, "Sending a chunk of buffered data ({})", buffered_chunk.length()); + // Need to first enqueue the data into the chunk queue before sending. + auto req = filter_.setupBodyChunk(*this, buffered_chunk, end_stream); + enqueueStreamingChunk(buffered_chunk, end_stream); + filter_.sendBodyChunk(*this, ProcessorState::CallbackState::StreamedBodyCallback, req); + } + if (queueBelowLowLimit()) { + clearWatermark(); + } +} + +absl::Status ProcessorState::processHeaderMutation(const CommonResponse& common_response) { + ENVOY_LOG(debug, "Applying header mutations"); + const auto mut_status = MutationUtils::applyHeaderMutations( + common_response.header_mutation(), *headers_, + common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, + filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, + shouldRemoveContentLength()); + return mut_status; +} + +ProcessorState::CallbackState +ProcessorState::getCallbackStateAfterHeaderResp(const CommonResponse& common_response) const { + if (bodyMode() == ProcessingMode::STREAMED && + filter_.config().sendBodyWithoutWaitingForHeaderResponse() && !chunk_queue_.empty() && + (common_response.status() != CommonResponse::CONTINUE_AND_REPLACE)) { + return ProcessorState::CallbackState::StreamedBodyCallback; + } + return ProcessorState::CallbackState::Idle; +} + absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& response) { if (callback_state_ == CallbackState::HeadersCallback) { ENVOY_LOG(debug, "applying headers response. body mode = {}", ProcessingMode::BodySendMode_Name(body_mode_)); const auto& common_response = response.response(); if (common_response.has_header_mutation()) { - const auto mut_status = MutationUtils::applyHeaderMutations( - common_response.header_mutation(), *headers_, - common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, - filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, - shouldRemoveContentLength()); + const auto mut_status = processHeaderMutation(common_response); if (!mut_status.ok()) { return mut_status; } } clearRouteCache(common_response); - onFinishProcessorCall(Grpc::Status::Ok); + onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response)); if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { ENVOY_LOG(debug, "Replacing complete message"); @@ -119,6 +152,9 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon }); } } + + // In case any data left over in the chunk queue, clear them. + clearStreamingChunk(); // Once this message is received, we won't send anything more on this request // or response to the processor. Clear flags to make sure. body_mode_ = ProcessingMode::NONE; @@ -129,17 +165,26 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon // Fall through if there was never a body in the first place. ENVOY_LOG(debug, "The message had no body"); } else if (complete_body_available_ && body_mode_ != ProcessingMode::NONE) { - // If we get here, then all the body data came in before the header message - // was complete, and the server wants the body. It doesn't matter whether the - // processing mode is buffered, streamed, or partially buffered. - if (bufferedData()) { - // Get here, no_body_ = false, and complete_body_available_ = true, the end_stream - // flag of decodeData() can be determined by whether the trailers are received. - // Also, bufferedData() is not nullptr means decodeData() is called, even though - // the data can be an empty chunk. - auto req = filter_.setupBodyChunk(*this, *bufferedData(), !trailers_available_); - filter_.sendBodyChunk(*this, ProcessorState::CallbackState::BufferedBodyCallback, req); - clearWatermark(); + if (callback_state_ != CallbackState::StreamedBodyCallback) { + // If we get here, then all the body data came in before the header message + // was complete, and the server wants the body. It doesn't matter whether the + // processing mode is buffered, streamed, or partially buffered. + if (bufferedData()) { + // Get here, no_body_ = false, and complete_body_available_ = true, the end_stream + // flag of decodeData() can be determined by whether the trailers are received. + // Also, bufferedData() is not nullptr means decodeData() is called, even though + // the data can be an empty chunk. + auto req = filter_.setupBodyChunk(*this, *bufferedData(), !trailers_available_); + filter_.sendBodyChunk(*this, ProcessorState::CallbackState::BufferedBodyCallback, req); + clearWatermark(); + return absl::OkStatus(); + } + } else { + // StreamedBodyCallback state. There is pending body response. + // Check whether there is buffered data. If there is, send them. + // Do not continue filter chain here so the pending body response have chance to be + // served. + sendBufferedDataInStreamedMode(!trailers_available_); return absl::OkStatus(); } } else if (body_mode_ == ProcessingMode::BUFFERED) { @@ -149,22 +194,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon clearWatermark(); return absl::OkStatus(); } else if (body_mode_ == ProcessingMode::STREAMED) { - if (hasBufferedData()) { - // We now know that we need to process what we have buffered in streaming mode. - // Move the current buffer into the queue for remote processing and clear the - // buffered data. - Buffer::OwnedImpl buffered_chunk; - modifyBufferedData( - [&buffered_chunk](Buffer::Instance& data) { buffered_chunk.move(data); }); - ENVOY_LOG(debug, "Sending first chunk using buffered data ({})", buffered_chunk.length()); - // Need to first enqueue the data into the chunk queue before sending. - auto req = filter_.setupBodyChunk(*this, buffered_chunk, false); - enqueueStreamingChunk(buffered_chunk, false); - filter_.sendBodyChunk(*this, ProcessorState::CallbackState::StreamedBodyCallback, req); - } - if (queueBelowLowLimit()) { - clearWatermark(); - } + sendBufferedDataInStreamedMode(false); continueIfNecessary(); return absl::OkStatus(); } else if (body_mode_ == ProcessingMode::BUFFERED_PARTIAL) { @@ -226,12 +256,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { if (callback_state_ == CallbackState::BufferedBodyCallback) { if (common_response.has_header_mutation()) { if (headers_ != nullptr) { - ENVOY_LOG(debug, "Applying header mutations to buffered body message"); - const auto mut_status = MutationUtils::applyHeaderMutations( - common_response.header_mutation(), *headers_, - common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, - filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, - shouldRemoveContentLength()); + const auto mut_status = processHeaderMutation(common_response); if (!mut_status.ok()) { return mut_status; } @@ -291,12 +316,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { ENVOY_BUG(chunk != nullptr, "Bad partial body callback state"); if (common_response.has_header_mutation()) { if (headers_ != nullptr) { - ENVOY_LOG(debug, "Applying header mutations to buffered body message"); - const auto mut_status = MutationUtils::applyHeaderMutations( - common_response.header_mutation(), *headers_, - common_response.status() == CommonResponse::CONTINUE_AND_REPLACE, - filter_.config().mutationChecker(), filter_.stats().rejected_header_mutations_, - shouldRemoveContentLength()); + const auto mut_status = processHeaderMutation(common_response); if (!mut_status.ok()) { return mut_status; } @@ -523,6 +543,13 @@ const QueuedChunk& ChunkQueue::consolidate() { return chunk; } +void ChunkQueue::clear() { + if (queue_.size() > 1) { + received_data_.drain(received_data_.length()); + queue_.clear(); + } +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 01e9bc57ae59..51d8aa791891 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -40,6 +40,7 @@ class ChunkQueue { uint32_t bytesEnqueued() const { return bytes_enqueued_; } bool empty() const { return queue_.empty(); } void push(Buffer::Instance& data, bool end_stream); + void clear(); QueuedChunkPtr pop(Buffer::OwnedImpl& out_data); const QueuedChunk& consolidate(); Buffer::OwnedImpl& receivedData() { return received_data_; } @@ -272,6 +273,12 @@ class ProcessorState : public Logger::Loggable { private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} + void sendBufferedDataInStreamedMode(bool end_stream); + absl::Status + processHeaderMutation(const envoy::service::ext_proc::v3::CommonResponse& common_response); + void clearStreamingChunk() { chunk_queue_.clear(); } + CallbackState getCallbackStateAfterHeaderResp( + const envoy::service::ext_proc::v3::CommonResponse& common_response) const; }; class DecodingProcessorState : public ProcessorState { diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index 563ece44c917..757c2439d422 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -443,6 +443,12 @@ HttpConnectionManagerConfig::HttpConnectionManagerConfig( idle_timeout_ = absl::nullopt; } + if (config.common_http_protocol_options().has_max_response_headers_kb()) { + creation_status = absl::InvalidArgumentError( + fmt::format("Error: max_response_headers_kb cannot be set on http_connection_manager.")); + return; + } + if (config.strip_any_host_port() && config.strip_matching_host_port()) { creation_status = absl::InvalidArgumentError(fmt::format( "Error: Only one of `strip_matching_host_port` or `strip_any_host_port` can be set.")); diff --git a/test/common/common/BUILD b/test/common/common/BUILD index 591dc9a8a6cc..d15e11f00fc6 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -596,8 +596,14 @@ envoy_benchmark_test( envoy_cc_test( name = "execution_context_test", srcs = ["execution_context_test.cc"], + copts = [ + "-DENVOY_ENABLE_EXECUTION_CONTEXT", + ], rbe_pool = "2core", deps = [ "//envoy/common:execution_context", + "//source/common/api:api_lib", + "//test/mocks:common_lib", + "//test/mocks/stream_info:stream_info_mocks", ], ) diff --git a/test/common/common/execution_context_test.cc b/test/common/common/execution_context_test.cc index 25d886314e1f..8aed2f00eaf8 100644 --- a/test/common/common/execution_context_test.cc +++ b/test/common/common/execution_context_test.cc @@ -1,5 +1,15 @@ +#include + #include "envoy/common/execution_context.h" +#include "source/common/api/api_impl.h" +#include "source/common/common/scope_tracker.h" + +#include "test/mocks/common.h" +#include "test/mocks/stream_info/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" #include "gtest/gtest.h" namespace Envoy { @@ -27,45 +37,104 @@ class TestExecutionContext : public ExecutionContext { int activation_generations_ = 0; }; -TEST(ExecutionContextTest, NullContext) { - ScopedExecutionContext scoped_context(nullptr); - EXPECT_TRUE(scoped_context.isNull()); +class ExecutionContextTest : public testing::Test { +public: + ExecutionContextTest() { + ON_CALL(tracked_object_, trackedStream()) + .WillByDefault(testing::Return(OptRef(stream_info_))); + } - ScopedExecutionContext scoped_context2; - EXPECT_TRUE(scoped_context2.isNull()); + void setWithoutContext() { + context_ = nullptr; + stream_info_.filter_state_ = std::make_shared( + StreamInfo::FilterState::LifeSpan::Connection); + } + void setWithContext() { + context_ = std::make_shared(); + stream_info_.filter_state_ = std::make_shared( + StreamInfo::FilterState::LifeSpan::Connection); + stream_info_.filter_state_->setData(kConnectionExecutionContextFilterStateName, context_, + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::Connection); + } + + testing::NiceMock stream_info_; + testing::NiceMock tracked_object_; + std::shared_ptr context_{}; +}; + +TEST_F(ExecutionContextTest, NullContext) { + { + ScopedExecutionContext scoped_context(nullptr); + EXPECT_TRUE(scoped_context.isNull()); + } + { + ScopedExecutionContext scoped_context; + EXPECT_TRUE(scoped_context.isNull()); + } + { + setWithoutContext(); + ScopedExecutionContext scoped_context(&tracked_object_); + EXPECT_TRUE(scoped_context.isNull()); + } } -TEST(ExecutionContextTest, NestedScopes) { - TestExecutionContext context; - EXPECT_EQ(context.activationDepth(), 0); - EXPECT_EQ(context.activationGenerations(), 0); +TEST_F(ExecutionContextTest, NestedScopes) { + setWithContext(); + + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 0); { - ScopedExecutionContext scoped_context(&context); - EXPECT_EQ(context.activationDepth(), 1); - EXPECT_EQ(context.activationGenerations(), 1); + ScopedExecutionContext scoped_context(&tracked_object_); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), 1); { - ScopedExecutionContext nested_scoped_context(&context); - EXPECT_EQ(context.activationDepth(), 2); - EXPECT_EQ(context.activationGenerations(), 1); + ScopedExecutionContext nested_scoped_context(&tracked_object_); + EXPECT_EQ(context_->activationDepth(), 2); + EXPECT_EQ(context_->activationGenerations(), 1); } - EXPECT_EQ(context.activationDepth(), 1); - EXPECT_EQ(context.activationGenerations(), 1); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), 1); } - EXPECT_EQ(context.activationDepth(), 0); - EXPECT_EQ(context.activationGenerations(), 1); + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 1); } -TEST(ExecutionContextTest, DisjointScopes) { - TestExecutionContext context; +TEST_F(ExecutionContextTest, DisjointScopes) { + setWithContext(); for (int i = 1; i < 5; i++) { - ScopedExecutionContext scoped_context(&context); - EXPECT_EQ(context.activationDepth(), 1); - EXPECT_EQ(context.activationGenerations(), i); + ScopedExecutionContext scoped_context(&tracked_object_); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), i); + } + + EXPECT_EQ(context_->activationDepth(), 0); +} + +TEST_F(ExecutionContextTest, InScopeTrackerScopeState) { + + Api::ApiPtr api(Api::createApiForTest()); + Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); + EXPECT_CALL(tracked_object_, trackedStream()) + .Times(2) + .WillRepeatedly(testing::Return(OptRef(stream_info_))); + + setWithContext(); + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 0); + { + ScopeTrackerScopeState scope(&tracked_object_, *dispatcher); + EXPECT_EQ(context_->activationDepth(), 1); + EXPECT_EQ(context_->activationGenerations(), 1); } - EXPECT_EQ(context.activationDepth(), 0); + EXPECT_EQ(context_->activationDepth(), 0); + EXPECT_EQ(context_->activationGenerations(), 1); + + setWithoutContext(); + { ScopeTrackerScopeState scope(&tracked_object_, *dispatcher); } } } // namespace Envoy diff --git a/test/common/common/scope_tracker_test.cc b/test/common/common/scope_tracker_test.cc index 7d928b18019e..76722b08de81 100644 --- a/test/common/common/scope_tracker_test.cc +++ b/test/common/common/scope_tracker_test.cc @@ -14,15 +14,7 @@ namespace Envoy { using testing::_; -class ScopeTrackerScopeStateTest : public testing::Test { -protected: - void setExecutionContextEnabled(bool enabled) { - ScopeTrackerScopeState::executionContextEnabled() = enabled; - } -}; - -TEST_F(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { - setExecutionContextEnabled(false); +TEST(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { Api::ApiPtr api(Api::createApiForTest()); Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); MockScopeTrackedObject tracked_object; @@ -40,13 +32,4 @@ TEST_F(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { static_cast(dispatcher.get())->onFatalError(std::cerr); } -TEST_F(ScopeTrackerScopeStateTest, ExecutionContextEnabled) { - setExecutionContextEnabled(true); - Api::ApiPtr api(Api::createApiForTest()); - Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); - MockScopeTrackedObject tracked_object; - EXPECT_CALL(tracked_object, executionContext()); - ScopeTrackerScopeState scope(&tracked_object, *dispatcher); -} - } // namespace Envoy diff --git a/test/common/http/codec_impl_fuzz_test.cc b/test/common/http/codec_impl_fuzz_test.cc index 8554be417b01..bca32e84542e 100644 --- a/test/common/http/codec_impl_fuzz_test.cc +++ b/test/common/http/codec_impl_fuzz_test.cc @@ -616,7 +616,7 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi } else { client = std::make_unique( client_connection, Http1::CodecStats::atomicGet(http1_stats, scope), client_callbacks, - client_http1settings, max_response_headers_count); + client_http1settings, max_request_headers_kb, max_response_headers_count); } if (http2) { diff --git a/test/common/http/http1/codec_impl_test.cc b/test/common/http/http1/codec_impl_test.cc index 1fca99b17e7b..0dca1c8abc16 100644 --- a/test/common/http/http1/codec_impl_test.cc +++ b/test/common/http/http1/codec_impl_test.cc @@ -2544,7 +2544,8 @@ class Http1ClientConnectionImplTest : public Http1CodecTestBase { public: void initialize() { codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_, + connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_kb_, + max_response_headers_count_, /* passing_through_proxy=*/false); } @@ -2564,6 +2565,7 @@ class Http1ClientConnectionImplTest : public Http1CodecTestBase { protected: Stats::TestUtil::TestStore store_; uint32_t max_response_headers_count_{Http::DEFAULT_MAX_HEADERS_COUNT}; + uint32_t max_response_headers_kb_{Http::Http1::MAX_RESPONSE_HEADERS_KB}; }; void Http1ClientConnectionImplTest::testClientAllowChunkedContentLength( @@ -2571,8 +2573,9 @@ void Http1ClientConnectionImplTest::testClientAllowChunkedContentLength( // Response validation is not implemented in UHV yet #ifndef ENVOY_ENABLE_UHV codec_settings_.allow_chunked_length_ = allow_chunked_length; - codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_); + codec_ = std::make_unique(connection_, http1CodecStats(), callbacks_, + codec_settings_, max_response_headers_kb_, + max_response_headers_count_); NiceMock response_decoder; Http::RequestEncoder& request_encoder = codec_->newStream(response_decoder); @@ -3706,6 +3709,28 @@ TEST_P(Http1ClientConnectionImplTest, LargeResponseHeadersAccepted) { std::string long_header = "big: " + std::string(79 * 1024, 'q') + "\r\n"; buffer = Buffer::OwnedImpl(long_header); status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); +} + +// Tests that the size of response headers for HTTP/1 can be configured higher than the default of +// 80kB. +TEST_P(Http1ClientConnectionImplTest, LargeResponseHeadersAcceptedConfigurable) { + constexpr uint32_t size_limit_kb = 85; + max_response_headers_kb_ = size_limit_kb; + initialize(); + + NiceMock response_decoder; + Http::RequestEncoder& request_encoder = codec_->newStream(response_decoder); + TestRequestHeaderMapImpl headers{{":method", "GET"}, {":path", "/"}, {":authority", "host"}}; + EXPECT_TRUE(request_encoder.encodeHeaders(headers, true).ok()); + + Buffer::OwnedImpl buffer("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n"); + auto status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); + std::string long_header = "big: " + std::string((size_limit_kb - 1) * 1024, 'q') + "\r\n"; + buffer = Buffer::OwnedImpl(long_header); + status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); } // Regression test for CVE-2019-18801. Large method headers should not trigger @@ -3792,6 +3817,7 @@ TEST_P(Http1ClientConnectionImplTest, ManyResponseHeadersAccepted) { // Response already contains one header. buffer = Buffer::OwnedImpl(createHeaderOrTrailerFragment(150) + "\r\n"); status = codec_->dispatch(buffer); + EXPECT_TRUE(status.ok()); } TEST_P(Http1ClientConnectionImplTest, TestResponseSplit0) { @@ -3821,8 +3847,9 @@ TEST_P(Http1ClientConnectionImplTest, TestResponseSplitAllowChunkedLength100) { TEST_P(Http1ClientConnectionImplTest, VerifyResponseHeaderTrailerMapMaxLimits) { codec_settings_.allow_chunked_length_ = true; codec_settings_.enable_trailers_ = true; - codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_); + codec_ = std::make_unique(connection_, http1CodecStats(), callbacks_, + codec_settings_, max_response_headers_kb_, + max_response_headers_count_); NiceMock response_decoder; Http::RequestEncoder& request_encoder = codec_->newStream(response_decoder); diff --git a/test/common/http/http1/http1_connection_fuzz_test.cc b/test/common/http/http1/http1_connection_fuzz_test.cc index 9dae7af6c7b9..37eb6fc64e4a 100644 --- a/test/common/http/http1/http1_connection_fuzz_test.cc +++ b/test/common/http/http1/http1_connection_fuzz_test.cc @@ -45,7 +45,7 @@ class Http1Harness { client_ = std::make_unique( mock_client_connection_, Http1::CodecStats::atomicGet(http1_stats_, *stats_store_.rootScope()), - mock_client_callbacks_, client_settings_, Http::DEFAULT_MAX_HEADERS_COUNT); + mock_client_callbacks_, client_settings_, absl::nullopt, Http::DEFAULT_MAX_HEADERS_COUNT); Status status = client_->dispatch(payload); } diff --git a/test/common/network/BUILD b/test/common/network/BUILD index 16a6dc85fa9f..7b0a24d69f54 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -82,7 +82,6 @@ envoy_cc_test( "//source/common/common:empty_string", "//source/common/event:dispatcher_includes", "//source/common/event:dispatcher_lib", - "//source/common/network:common_connection_filter_states_lib", "//source/common/network:connection_lib", "//source/common/network:listen_socket_lib", "//source/common/network:utility_lib", diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 11c769ead5a7..f5cc602f07a6 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -15,7 +15,6 @@ #include "source/common/common/utility.h" #include "source/common/event/dispatcher_impl.h" #include "source/common/network/address_impl.h" -#include "source/common/network/common_connection_filter_states.h" #include "source/common/network/connection_impl.h" #include "source/common/network/io_socket_handle_impl.h" #include "source/common/network/listen_socket_impl.h" @@ -68,16 +67,6 @@ class MockInternalListenerManager : public InternalListenerManager { MOCK_METHOD(InternalListenerOptRef, findByAddress, (const Address::InstanceConstSharedPtr&), ()); }; -class NoopConnectionExecutionContext : public ExecutionContext { -public: - NoopConnectionExecutionContext() = default; - ~NoopConnectionExecutionContext() override = default; - -protected: - void activate() override {} - void deactivate() override {} -}; - TEST(RawBufferSocket, TestBasics) { TransportSocketPtr raw_buffer_socket(Network::Test::createRawBufferSocket()); EXPECT_FALSE(raw_buffer_socket->ssl()); @@ -348,27 +337,6 @@ TEST_P(ConnectionImplTest, SetSslConnection) { disconnect(false); } -TEST_P(ConnectionImplTest, SetGetExecutionContextFilterState) { - setUpBasicConnection(); - connect(); - - EXPECT_EQ(getConnectionExecutionContext(*client_connection_), nullptr); - - const StreamInfo::FilterStateSharedPtr& filter_state = - client_connection_->streamInfo().filterState(); - auto connection_execution_context = std::make_unique(); - const NoopConnectionExecutionContext* context_pointer = - connection_execution_context.get(); // Not owned. - auto filter_state_object = std::make_shared( - std::move(connection_execution_context)); - filter_state->setData(kConnectionExecutionContextFilterStateName, filter_state_object, - StreamInfo::FilterState::StateType::ReadOnly, - StreamInfo::FilterState::LifeSpan::Connection); - - EXPECT_EQ(getConnectionExecutionContext(*client_connection_), context_pointer); - disconnect(true); -} - TEST_P(ConnectionImplTest, GetCongestionWindow) { setUpBasicConnection(); connect(); diff --git a/test/extensions/filters/common/expr/context_test.cc b/test/extensions/filters/common/expr/context_test.cc index 733c2413ce2c..c766d47e258d 100644 --- a/test/extensions/filters/common/expr/context_test.cc +++ b/test/extensions/filters/common/expr/context_test.cc @@ -411,6 +411,14 @@ TEST(Context, ResponseAttributes) { EXPECT_FALSE(value.has_value()); } + { + info.setUpstreamInfo(std::make_shared()); + StreamInfo::UpstreamTiming& upstream_timing = info.upstreamInfo()->upstreamTiming(); + upstream_timing.onFirstUpstreamTxByteSent(info.timeSource()); + upstream_timing.onLastUpstreamRxByteReceived(info.timeSource()); + EXPECT_TRUE(response[CelValue::CreateStringView(BackendLatency)].has_value()); + } + { Http::TestResponseHeaderMapImpl header_map{{header_name, "a"}, {grpc_status, "7"}}; Http::TestResponseTrailerMapImpl trailer_map{{trailer_name, "b"}}; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 697a773a1ebb..e229fc88b850 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -4652,6 +4652,174 @@ TEST_P(ExtProcIntegrationTest, SidestreamPushbackUpstreamObservabilityMode) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, SendBodyBeforeHeaderRespStreamedBasicTest) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + proto_config_.set_send_body_without_waiting_for_header_response(true); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBody("hello world", [](Http::HeaderMap& headers) { + headers.addCopy(LowerCaseString("x-remove-this"), "yes"); + }); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) { + Http::TestRequestHeaderMapImpl expected_request_headers{ + {":scheme", "http"}, {":method", "POST"}, {"host", "host"}, + {":path", "/"}, {"x-remove-this", "yes"}, {"x-forwarded-proto", "http"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers)); + + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + response_header_mutation->add_remove_headers("x-remove-this"); + return true; + }); + processRequestBodyMessage( + *grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& body_resp) { + EXPECT_TRUE(body.end_of_stream()); + EXPECT_EQ(body.body(), "hello world"); + auto* body_mut = body_resp.mutable_response()->mutable_body_mutation(); + body_mut->set_body("replaced body"); + return true; + }); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this")); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), "replaced body"); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + processResponseHeadersMessage( + *grpc_upstreams_[0], false, [](const HttpHeaders& headers, HeadersResponse&) { + Http::TestRequestHeaderMapImpl expected_response_headers{{":status", "200"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_response_headers)); + return true; + }); + processResponseBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, SendBodyAndTrailerBeforeHeaderRespStreamedMoreDataTest) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_trailer_mode(ProcessingMode::SEND); + proto_config_.set_send_body_without_waiting_for_header_response(true); + + initializeConfig(); + HttpIntegrationTest::initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(*request_encoder_, "hello world", false); + processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt); + processRequestBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + codec_client_->sendData(*request_encoder_, "foo-bar", true); + processRequestBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + + handleUpstreamRequestWithTrailer(); + processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt); + processResponseBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + processResponseTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, ServerWaitForBodyBeforeSendsHeaderRespStreamedTest) { + config_helper_.setBufferLimits(1024, 1024); + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + proto_config_.set_send_body_without_waiting_for_header_response(true); + + initializeConfig(); + HttpIntegrationTest::initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl default_headers; + HttpTestUtility::addDefaultHeaders(default_headers); + + auto encoder_decoder = codec_client_->startRequest(default_headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + // Downstream client sending 16k data. + const std::string body_sent(16 * 1024, 's'); + codec_client_->sendData(*request_encoder_, body_sent, true); + + // The ext_proc server receives the headers. + ProcessingRequest header_request; + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, processor_connection_)); + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, header_request)); + ASSERT_TRUE(header_request.has_request_headers()); + + // The ext_proc server receives 16 chunks of body, each chunk size is 1k. + std::string body_received; + bool end_stream = false; + uint32_t total_body_msg_count = 0; + while (!end_stream) { + ProcessingRequest body_request; + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request)); + ASSERT_TRUE(body_request.has_request_body()); + body_received = absl::StrCat(body_received, body_request.request_body().body()); + end_stream = body_request.request_body().end_of_stream(); + total_body_msg_count++; + } + EXPECT_TRUE(end_stream); + EXPECT_EQ(body_received, body_sent); + + // The ext_proc server sends back the header response. + processor_stream_->startGrpcStream(); + ProcessingResponse response_header; + auto* header_resp = response_header.mutable_request_headers(); + auto header_mutation = header_resp->mutable_response()->mutable_header_mutation(); + auto* mut = header_mutation->add_set_headers(); + mut->mutable_header()->set_key("x-new-header"); + mut->mutable_header()->set_raw_value("new"); + processor_stream_->sendGrpcMessage(response_header); + + // The ext_proc server sends back the body response. + const std::string body_upstream(total_body_msg_count, 'r'); + while (total_body_msg_count) { + ProcessingResponse response_body; + auto* body_resp = response_body.mutable_request_body(); + auto* body_mut = body_resp->mutable_response()->mutable_body_mutation(); + body_mut->set_body("r"); + processor_stream_->sendGrpcMessage(response_body); + total_body_msg_count--; + } + + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), body_upstream); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, SendBodyBeforeHeaderRespStreamedNotSendTrailerTest) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + proto_config_.set_send_body_without_waiting_for_header_response(true); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBodyAndTrailer("hello world"); + processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt); + processRequestBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + handleUpstreamRequest(100); + processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt); + processResponseBodyMessage(*grpc_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + TEST_P(ExtProcIntegrationTest, SendHeaderBodyNotSendTrailerTest) { proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 008974567edc..edb3c2b66492 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -2567,6 +2567,51 @@ TEST_F(HttpFilterTest, ProcessingModeOverrideResponseHeaders) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Set allow_mode_override in filter config to be true. +// Set send_body_without_waiting_for_header_response to be true +// In such case, the mode_override in the response will be ignored. +TEST_F(HttpFilterTest, DisableResponseModeOverrideBySendBodyFlag) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + allow_mode_override: true + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(filter_->config().allowModeOverride(), true); + EXPECT_EQ(filter_->config().sendBodyWithoutWaitingForHeaderResponse(), true); + EXPECT_EQ(filter_->config().processingMode().response_header_mode(), ProcessingMode::SEND); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); + + // When ext_proc server sends back the request header response, it contains the + // mode_override for the response_header_mode to be SKIP. + processRequestHeaders( + false, [](const HttpHeaders&, ProcessingResponse& response, HeadersResponse&) { + response.mutable_mode_override()->set_response_header_mode(ProcessingMode::SKIP); + }); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, true)); + + // Verify such mode_override is ignored. The response header is still sent to the ext_proc server. + processResponseHeaders(false, [](const HttpHeaders& header_resp, ProcessingResponse&, + HeadersResponse&) { + EXPECT_TRUE(header_resp.end_of_stream()); + TestRequestHeaderMapImpl expected_response{{":status", "200"}, {"content-type", "text/plain"}}; + EXPECT_THAT(header_resp.headers(), HeaderProtosEqual(expected_response)); + }); + + TestRequestHeaderMapImpl final_expected_response{{":status", "200"}, + {"content-type", "text/plain"}}; + EXPECT_THAT(&response_headers_, HeaderMapEqualIgnoreOrder(&final_expected_response)); + filter_->onDestroy(); +} + // Leaving the allow_mode_override in filter config to be default, which is false. // In such case, the mode_override in the response will be ignored. TEST_F(HttpFilterTest, DisableResponseModeOverride) { @@ -3960,6 +4005,283 @@ TEST_F(HttpFilterTest, EmitDynamicMetadataUseLast) { filter_->onDestroy(); } +TEST_F(HttpFilterTest, HeaderRespReceivedBeforeBody) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + // Header response arrives before any body data. + processResponseHeaders(false, absl::nullopt); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + // Send body responses + for (int i = 0; i < 5; i++) { + processResponseBody( + [&want_response_body, i](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + std::string new_body = absl::StrCat(" ", std::to_string(i), " "); + body_mut->set_body(new_body); + want_response_body.add(new_body); + }, + false); + } + + // Send the last empty request chunk. + Buffer::OwnedImpl last_resp_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + processResponseBody(absl::nullopt, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, HeaderRespReceivedAfterBodySent) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + // Header response arrives after some amount of body data sent. + auto response = std::make_unique(); + (void)response->mutable_response_headers(); + stream_callbacks_->onReceiveMessage(std::move(response)); + + // Three body responses follows the header response. + for (int i = 0; i < 2; i++) { + processResponseBody( + [&want_response_body, i](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + std::string new_body = absl::StrCat(" ", std::to_string(i), " "); + body_mut->set_body(new_body); + want_response_body.add(new_body); + }, + false); + } + + // Now sends the rest of the body chunks to the server. + for (int i = 5; i < 10; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + // Send body responses + for (int i = 2; i < 10; i++) { + processResponseBody( + [&want_response_body, i](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + std::string new_body = absl::StrCat(" ", std::to_string(i), " "); + body_mut->set_body(new_body); + want_response_body.add(new_body); + }, + false); + } + + // Send the last empty request chunk. + Buffer::OwnedImpl last_resp_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + processResponseBody(absl::nullopt, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, HeaderRespWithStatusContinueAndReplace) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + Buffer::OwnedImpl resp_buffer; + setUpEncodingBuffering(resp_buffer, true); + // Header response arrives with status CONTINUE_AND_REPLACE after some amount of body data sent. + auto response = std::make_unique(); + auto* hdrs_resp = response->mutable_response_headers(); + hdrs_resp->mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + hdrs_resp->mutable_response()->mutable_body_mutation()->set_body("Hello, World!"); + stream_callbacks_->onReceiveMessage(std::move(response)); + + // Ensure buffered data was updated + EXPECT_EQ(resp_buffer.toString(), "Hello, World!"); + + // Since we did CONTINUE_AND_REPLACE, later data is cleared + Buffer::OwnedImpl resp_data_1("test"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data_1, false)); + EXPECT_EQ(resp_data_1.length(), 0); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, StreamedTestInBothDirection) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + request_body_mode: "STREAMED" + response_header_mode: "SEND" + response_body_mode: "STREAMED" + send_body_without_waiting_for_header_response: true + )EOF"); + + EXPECT_EQ(config_->sendBodyWithoutWaitingForHeaderResponse(), true); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + for (int i = 0; i < 5; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(resp_chunk, false)); + } + // Send the last empty request chunk. + Buffer::OwnedImpl last_req_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(last_req_chunk, true)); + // Header response arrives + auto req_response = std::make_unique(); + (void)req_response->mutable_request_headers(); + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + stream_callbacks_->onReceiveMessage(std::move(req_response)); + + // Data response arrives + for (int i = 0; i < 5; i++) { + processRequestBody(absl::nullopt, false); + } + processRequestBody(absl::nullopt, false); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + for (int i = 0; i < 7; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + auto resp_response = std::make_unique(); + (void)resp_response->mutable_response_headers(); + stream_callbacks_->onReceiveMessage(std::move(resp_response)); + + // Send body responses + for (int i = 0; i < 7; i++) { + processResponseBody(absl::nullopt, false); + } + + // Send the last empty request chunk. + Buffer::OwnedImpl last_resp_chunk; + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + processResponseBody(absl::nullopt, true); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + // Verify if ext_proc filter is in the upstream filter chain, and if the ext_proc server // sends back response with clear_route_cache set to true, it is ignored. TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutationUpstreamIgnored) { diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc index 25286be792c3..29637f793f83 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.cc +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -5,7 +5,13 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -MockClient::MockClient() = default; +MockClient::MockClient() { + EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); + + EXPECT_CALL(*this, setStream(testing::_)) + .WillRepeatedly( + testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; })); +} MockClient::~MockClient() = default; MockStream::MockStream() = default; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index d0b0389b0fd4..12c9d7308a93 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -17,6 +17,10 @@ class MockClient : public ExternalProcessorClient { (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); + MOCK_METHOD(ExternalProcessorStream*, stream, ()); + MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); + + ExternalProcessorStream* stream_ = nullptr; }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index f42d0981a65a..66a015876a44 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -1,7 +1,6 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_fuzz_test", - "envoy_cc_mock", "envoy_package", "envoy_proto_library", ) @@ -10,16 +9,6 @@ licenses(["notice"]) # Apache 2 envoy_package() -envoy_cc_mock( - name = "ext_proc_mocks", - hdrs = ["mocks.h"], - tags = ["skip_on_windows"], - deps = [ - "//source/extensions/filters/http/ext_proc:client_interface", - "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", - ], -) - envoy_proto_library( name = "ext_proc_unit_test_fuzz_proto", srcs = ["ext_proc_unit_test_fuzz.proto"], @@ -37,10 +26,10 @@ envoy_cc_fuzz_test( rbe_pool = "2core", tags = ["skip_on_windows"], deps = [ - ":ext_proc_mocks", ":ext_proc_unit_test_fuzz_proto_cc_proto", "//source/extensions/filters/http/ext_proc:config", "//test/extensions/filters/http/common/fuzz:http_filter_fuzzer_lib", + "//test/extensions/filters/http/ext_proc:mock_server_lib", "//test/mocks/http:http_mocks", "//test/mocks/network:network_mocks", "//test/mocks/server:server_factory_context_mocks", diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index df321fe20487..25c6b3ac2d76 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -1,8 +1,8 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" #include "test/extensions/filters/http/common/fuzz/http_filter_fuzzer.h" +#include "test/extensions/filters/http/ext_proc/mock_server.h" #include "test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.pb.validate.h" -#include "test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h" #include "test/fuzz/fuzz_runner.h" #include "test/mocks/http/mocks.h" #include "test/mocks/network/mocks.h" @@ -69,6 +69,14 @@ DEFINE_PROTO_FUZZER( return; } + // Limiting the max supported request body size to 128k. + if (input.request().has_proto_body()) { + const uint32_t max_body_size = 128 * 1024; + if (input.request().proto_body().message().value().size() > max_body_size) { + return; + } + } + static FuzzerMocks mocks; NiceMock stats_store; @@ -88,7 +96,7 @@ DEFINE_PROTO_FUZZER( return; } - MockClient* client = new MockClient(); + ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient(); std::unique_ptr filter = std::make_unique( config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service()); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); @@ -100,7 +108,7 @@ DEFINE_PROTO_FUZZER( const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) -> ExternalProcessing::ExternalProcessorStreamPtr { - auto stream = std::make_unique(); + auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)) .WillRepeatedly(Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&, bool) -> void { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h deleted file mode 100644 index 49ff067dd353..000000000000 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include "envoy/service/ext_proc/v3/external_processor.pb.h" - -#include "source/extensions/filters/http/ext_proc/client.h" - -#include "gmock/gmock.h" - -namespace Envoy { -namespace Extensions { -namespace HttpFilters { -namespace ExtProc { -namespace UnitTestFuzz { - -class MockStream : public ExternalProcessing::ExternalProcessorStream { -public: - MockStream() = default; - ~MockStream() override = default; - - MOCK_METHOD(void, send, - (envoy::service::ext_proc::v3::ProcessingRequest && request, bool end_stream)); - MOCK_METHOD(bool, close, ()); - MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); - MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); - MOCK_METHOD(void, notifyFilterDestroy, ()); -}; - -class MockClient : public ExternalProcessing::ExternalProcessorClient { -public: - MockClient() = default; - ~MockClient() override = default; - - MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, - (ExternalProcessing::ExternalProcessorCallbacks & callbacks, - const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const Envoy::Http::AsyncClient::StreamOptions&, - Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); -}; - -} // namespace UnitTestFuzz -} // namespace ExtProc -} // namespace HttpFilters -} // namespace Extensions -} // namespace Envoy diff --git a/test/extensions/filters/network/http_connection_manager/config_test.cc b/test/extensions/filters/network/http_connection_manager/config_test.cc index 5f39f9935a21..06fa2cf5c194 100644 --- a/test/extensions/filters/network/http_connection_manager/config_test.cc +++ b/test/extensions/filters/network/http_connection_manager/config_test.cc @@ -996,6 +996,27 @@ TEST_F(HttpConnectionManagerConfigTest, MaxRequestHeaderCountConfigurable) { EXPECT_EQ(200, config.maxRequestHeadersCount()); } +// Check that max response header size is invalid on HCM. +TEST_F(HttpConnectionManagerConfigTest, MaxResponseHeaderKbInvalid) { + const std::string yaml_string = R"EOF( + stat_prefix: ingress_http + common_http_protocol_options: + max_response_headers_kb: 200 + route_config: + name: local_route + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + )EOF"; + + HttpConnectionManagerConfig config(parseHttpConnectionManagerFromYaml(yaml_string), context_, + date_provider_, route_config_provider_manager_, + &scoped_routes_config_provider_manager_, tracer_manager_, + filter_config_provider_manager_, creation_status_); + EXPECT_FALSE(creation_status_.ok()); +} + // Checking that default max_requests_per_connection is 0. TEST_F(HttpConnectionManagerConfigTest, DefaultMaxRequestPerConnection) { const std::string yaml_string = R"EOF( diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index d8bc506c181e..c640f452141b 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -266,6 +266,7 @@ IntegrationCodecClientPtr HttpIntegrationTest::makeHttpConnection(uint32_t port) IntegrationCodecClientPtr HttpIntegrationTest::makeRawHttpConnection( Network::ClientConnectionPtr&& conn, absl::optional http2_options, + absl::optional common_http_options, bool wait_till_connected) { std::shared_ptr cluster{new NiceMock()}; cluster->max_response_headers_count_ = 200; @@ -286,6 +287,10 @@ IntegrationCodecClientPtr HttpIntegrationTest::makeRawHttpConnection( cluster->http2_options_ = http2_options.value(); cluster->http1_settings_.enable_trailers_ = true; + if (common_http_options.has_value()) { + cluster->common_http_protocol_options_ = common_http_options.value(); + } + if (!disable_client_header_validation_) { cluster->header_validator_factory_ = IntegrationUtil::makeHeaderValidationFactory( ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig()); @@ -1448,7 +1453,72 @@ void HttpIntegrationTest::testLargeRequestHeaders(uint32_t size, uint32_t count, } else { IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(big_headers); RELEASE_ASSERT(response->waitForEndStream(timeout), - fmt::format("unexpected timeout after ", timeout.count(), " ms")); + fmt::format("unexpected timeout after {}ms", timeout.count())); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + } + if (count > max_count) { + EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("too_many_headers")); + } +} + +void HttpIntegrationTest::testLargeResponseHeaders(uint32_t size, uint32_t count, uint32_t max_size, + uint32_t max_count, + std::chrono::milliseconds timeout) { + autonomous_upstream_ = true; + useAccessLog("%RESPONSE_CODE_DETAILS%"); + // `size` parameter dictates the size of each header that will be added to the response and + // `count` parameter is the number of headers to be added. The actual request byte size will + // exceed `size` due to the keys and other headers. The actual request header count will exceed + // `count` by four due to default headers. + + config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + ConfigHelper::HttpProtocolOptions protocol_options; + auto* http_protocol_options = protocol_options.mutable_common_http_protocol_options(); + http_protocol_options->mutable_max_response_headers_kb()->set_value(max_size); + http_protocol_options->mutable_max_headers_count()->set_value(max_count); + + ConfigHelper::setProtocolOptions(*bootstrap.mutable_static_resources()->mutable_clusters(0), + protocol_options); + }); + + // This test is validating upstream response headers, but the test client will fail to receive the + // request from Envoy if its limits aren't increased. + envoy::config::core::v3::HttpProtocolOptions client_protocol_options; + client_protocol_options.mutable_max_response_headers_kb()->set_value(max_size); + client_protocol_options.mutable_max_headers_count()->set_value(max_count); + + Http::TestRequestHeaderMapImpl big_headers(default_response_headers_); + + // Already added four headers. + for (unsigned int i = 0; i < count; i++) { + big_headers.addCopy(std::to_string(i), std::string(size * 1024, 'a')); + } + + initialize(); + codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), absl::nullopt, + client_protocol_options); + reinterpret_cast(fake_upstreams_.front().get()) + ->setResponseHeaders(std::make_unique(big_headers)); + + if (size >= max_size || count > max_count) { + // header size includes keys too, so expect rejection when equal + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + auto response = std::move(encoder_decoder.second); + + if (downstream_protocol_ == Http::CodecType::HTTP1) { + ASSERT_TRUE(codec_client_->waitForDisconnect()); + ASSERT_TRUE(response->complete()); + EXPECT_EQ("431", response->headers().getStatusValue()); + } else { + ASSERT_TRUE(response->waitForReset()); + codec_client_->close(); + } + } else { + IntegrationStreamDecoderPtr response = + codec_client_->makeHeaderOnlyRequest(default_request_headers_); + RELEASE_ASSERT(response->waitForEndStream(timeout), + fmt::format("unexpected timeout after {}ms", timeout.count())); EXPECT_TRUE(response->complete()); EXPECT_EQ("200", response->headers().getStatusValue()); } diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 06f2dd15a65c..55adabba2eb4 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -158,10 +158,12 @@ class HttpIntegrationTest : public BaseIntegrationTest { IntegrationCodecClientPtr makeHttpConnection(uint32_t port); // Makes a http connection object without checking its connected state. - virtual IntegrationCodecClientPtr - makeRawHttpConnection(Network::ClientConnectionPtr&& conn, - absl::optional http2_options, - bool wait_till_connected = true); + virtual IntegrationCodecClientPtr makeRawHttpConnection( + Network::ClientConnectionPtr&& conn, + absl::optional http2_options, + absl::optional common_http_options = + absl::nullopt, + bool wait_till_connected = true); // Makes a downstream network connection object based on client codec version. Network::ClientConnectionPtr makeClientConnectionWithOptions( uint32_t port, const Network::ConnectionSocket::OptionsSharedPtr& options) override; @@ -271,13 +273,13 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testRouterUpstreamResponseBeforeRequestComplete(uint32_t status_code = 0); void testTwoRequests(bool force_network_backup = false); - void testLargeHeaders(Http::TestRequestHeaderMapImpl request_headers, - Http::TestRequestTrailerMapImpl request_trailers, uint32_t size, - uint32_t max_size); void testLargeRequestUrl(uint32_t url_size, uint32_t max_headers_size); void testLargeRequestHeaders(uint32_t size, uint32_t count, uint32_t max_size = 60, uint32_t max_count = 100, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + void testLargeResponseHeaders(uint32_t size, uint32_t count, uint32_t max_size = 60, + uint32_t max_count = 100, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); void testLargeRequestTrailers(uint32_t size, uint32_t max_size = 60); void testManyRequestHeaders(std::chrono::milliseconds time = TestUtility::DefaultTimeout); diff --git a/test/integration/overload_integration_test.cc b/test/integration/overload_integration_test.cc index d3a62cdb2ff9..d5e808a88832 100644 --- a/test/integration/overload_integration_test.cc +++ b/test/integration/overload_integration_test.cc @@ -403,6 +403,7 @@ TEST_P(OverloadScaledTimerIntegrationTest, HTTP3CloseIdleHttpConnectionsDuringHa test_server_->waitForGaugeGe("overload.envoy.overload_actions.reduce_timeouts.scale_percent", 50); // Create an HTTP connection without finishing the handshake. codec_client_ = makeRawHttpConnection(makeClientConnection((lookupPort("http"))), absl::nullopt, + absl::nullopt, /*wait_till_connected=*/false); EXPECT_FALSE(codec_client_->connected()); @@ -418,8 +419,9 @@ TEST_P(OverloadScaledTimerIntegrationTest, HTTP3CloseIdleHttpConnectionsDuringHa 100); // Create another HTTP connection without finishing handshake. - IntegrationCodecClientPtr codec_client2 = makeRawHttpConnection( - makeClientConnection((lookupPort("http"))), absl::nullopt, /*wait_till_connected=*/false); + IntegrationCodecClientPtr codec_client2 = + makeRawHttpConnection(makeClientConnection((lookupPort("http"))), absl::nullopt, + absl::nullopt, /*wait_till_connected=*/false); EXPECT_FALSE(codec_client2->connected()); // Advancing past the minimum time and wait for the proxy to notice and close both connections. timeSystem().advanceTimeWait(std::chrono::seconds(3)); diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index 3625b0b58d51..921e5ba80925 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -2380,11 +2380,52 @@ TEST_P(DownstreamProtocolIntegrationTest, LargeRequestHeadersAccepted) { testLargeRequestHeaders(100, 1, 8192, 100); } -TEST_P(DownstreamProtocolIntegrationTest, ManyLargeRequestHeadersAccepted) { +TEST_P(ProtocolIntegrationTest, ManyLargeRequestHeadersAccepted) { // Send 70 headers each of size 100 kB with limit 8192 kB (8 MB) and 100 headers. testLargeRequestHeaders(100, 70, 8192, 100, TestUtility::DefaultTimeout); } +namespace { +uint32_t adjustMaxSingleHeaderSizeForCodecLimits(uint32_t size, + const HttpProtocolTestParams& params) { + if (params.http2_implementation == Http2Impl::Nghttp2 && + (params.downstream_protocol == Http::CodecType::HTTP2 || + params.upstream_protocol == Http::CodecType::HTTP2)) { + // nghttp2 has a hard-coded, unconfigurable limit of 64k for a header in it's header + // decompressor, so this test will always fail when using that codec. + // Reduce the size so that it can pass and receive some test coverage. + return 100; + } else if (params.downstream_protocol == Http::CodecType::HTTP3 || + params.upstream_protocol == Http::CodecType::HTTP3) { + // QUICHE has a hard-coded limit of 1024KiB in it's QPACK decoder. + // Reduce the size so that it can pass and receive some test coverage. + return 1023; + } + + return size; +} +} // namespace + +// Test a single header of the maximum allowed size. +TEST_P(ProtocolIntegrationTest, VeryLargeRequestHeadersAccepted) { + uint32_t size = adjustMaxSingleHeaderSizeForCodecLimits(8191, GetParam()); + + testLargeRequestHeaders(size, 1, 8192, 100, TestUtility::DefaultTimeout); +} + +// Test a single header of the maximum allowed size. +TEST_P(ProtocolIntegrationTest, ManyLargeResponseHeadersAccepted) { + // Send 70 headers each of size 100 kB with limit 8192 kB (8 MB) and 100 headers. + testLargeResponseHeaders(100, 70, 8192, 100, TestUtility::DefaultTimeout); +} + +// Test a single header of the maximum allowed size. +TEST_P(ProtocolIntegrationTest, VeryLargeResponseHeadersAccepted) { + uint32_t size = adjustMaxSingleHeaderSizeForCodecLimits(8191, GetParam()); + + testLargeResponseHeaders(size, 1, 8192, 100, TestUtility::DefaultTimeout); +} + TEST_P(DownstreamProtocolIntegrationTest, ManyRequestHeadersRejected) { // Send 101 empty headers with limit 60 kB and 100 headers. testLargeRequestHeaders(0, 101, 60, 80); diff --git a/test/integration/quic_http_integration_test.cc b/test/integration/quic_http_integration_test.cc index efc3e6b9224e..8be734d4747b 100644 --- a/test/integration/quic_http_integration_test.cc +++ b/test/integration/quic_http_integration_test.cc @@ -251,12 +251,15 @@ class QuicHttpIntegrationTestBase : public HttpIntegrationTest { return session; } - IntegrationCodecClientPtr - makeRawHttpConnection(Network::ClientConnectionPtr&& conn, - absl::optional http2_options, - bool wait_till_connected = true) override { + IntegrationCodecClientPtr makeRawHttpConnection( + Network::ClientConnectionPtr&& conn, + absl::optional http2_options, + absl::optional common_http_options = + absl::nullopt, + bool wait_till_connected = true) override { ENVOY_LOG(debug, "Creating a new client {}", conn->connectionInfoProvider().localAddress()->asStringView()); + ASSERT(!common_http_options.has_value(), "Not implemented"); return makeRawHttp3Connection(std::move(conn), http2_options, wait_till_connected); } diff --git a/test/mocks/common.h b/test/mocks/common.h index 246aec0f0068..d5ee5c66078d 100644 --- a/test/mocks/common.h +++ b/test/mocks/common.h @@ -103,7 +103,7 @@ inline bool operator==(const StringViewSaver& saver, const char* str) { class MockScopeTrackedObject : public ScopeTrackedObject { public: MOCK_METHOD(void, dumpState, (std::ostream&, int), (const)); - MOCK_METHOD(ExecutionContext*, executionContext, (), (const)); + MOCK_METHOD(OptRef, trackedStream, (), (const)); }; namespace ConnectionPool { diff --git a/test/mocks/network/connection.h b/test/mocks/network/connection.h index 9c8b7bcbfe44..ba776f5f35c9 100644 --- a/test/mocks/network/connection.h +++ b/test/mocks/network/connection.h @@ -96,7 +96,7 @@ class MockConnectionBase { (uint64_t bandwidth_bits_per_sec, std::chrono::microseconds rtt), ()); \ MOCK_METHOD(absl::optional, congestionWindowInBytes, (), (const)); \ MOCK_METHOD(void, dumpState, (std::ostream&, int), (const)); \ - MOCK_METHOD(ExecutionContext*, executionContext, (), (const)); + MOCK_METHOD(OptRef, trackedStream, (), (const)); class MockConnection : public Connection, public MockConnectionBase { public: diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 483ba8404f28..7174b21667f6 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -411,7 +411,6 @@ class MockConnectionSocket : public ConnectionSocket { MOCK_METHOD(absl::optional, lastRoundTripTime, ()); MOCK_METHOD(absl::optional, congestionWindowInBytes, (), (const)); MOCK_METHOD(void, dumpState, (std::ostream&, int), (const)); - MOCK_METHOD(ExecutionContext*, executionContext, (), (const)); IoHandlePtr io_handle_; std::shared_ptr connection_info_provider_; diff --git a/test/mocks/upstream/cluster_info.cc b/test/mocks/upstream/cluster_info.cc index 380c7a7c8bcd..8177186b0b4e 100644 --- a/test/mocks/upstream/cluster_info.cc +++ b/test/mocks/upstream/cluster_info.cc @@ -96,6 +96,13 @@ MockClusterInfo::MockClusterInfo() ON_CALL(*this, extensionProtocolOptions(_)).WillByDefault(Return(extension_protocol_options_)); ON_CALL(*this, maxResponseHeadersCount()) .WillByDefault(ReturnPointee(&max_response_headers_count_)); + ON_CALL(*this, maxResponseHeadersKb()).WillByDefault(Invoke([this]() -> absl::optional { + if (common_http_protocol_options_.has_max_response_headers_kb()) { + return common_http_protocol_options_.max_response_headers_kb().value(); + } else { + return absl::nullopt; + } + })); ON_CALL(*this, maxRequestsPerConnection()) .WillByDefault(ReturnPointee(&max_requests_per_connection_)); ON_CALL(*this, trafficStats()).WillByDefault(ReturnRef(traffic_stats_)); diff --git a/test/mocks/upstream/cluster_info.h b/test/mocks/upstream/cluster_info.h index 8f5a72973f49..978f9c5826aa 100644 --- a/test/mocks/upstream/cluster_info.h +++ b/test/mocks/upstream/cluster_info.h @@ -128,6 +128,7 @@ class MockClusterInfo : public ClusterInfo { (const)); MOCK_METHOD(bool, maintenanceMode, (), (const)); MOCK_METHOD(uint32_t, maxResponseHeadersCount, (), (const)); + MOCK_METHOD(absl::optional, maxResponseHeadersKb, (), (const)); MOCK_METHOD(uint64_t, maxRequestsPerConnection, (), (const)); MOCK_METHOD(const std::string&, name, (), (const)); MOCK_METHOD(const std::string&, observabilityName, (), (const));