diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 7e8961cfaf5e..e13ab82f5fb5 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -68,6 +68,17 @@ minor_behavior_changes: change: | Simplifies integration with the codec by removing translation between nghttp2 callbacks and Http2VisitorInterface events. Guarded by ``envoy.reloadable_features.http2_skip_callback_visitor``. +- area: http3 + change: | + Use GRO (Generic Receive Offload) for reading packets from a client QUIC UDP socket. See + https://www.kernel.org/doc/html/next/networking/segmentation-offloads.html for a description of + GRO. This behavior change can be reverted by setting + ``envoy.reloadable_features.prefer_quic_client_udp_gro`` to ``false``. +- area: http3 + change: | + Disables recvmmsg (multi-message) for reading packets from a client QUIC UDP socket, if GRO + is not set or not supported. recvmsg will be used instead. This behavior change can be + reverted by setting ``envoy.reloadable_features.disallow_quic_client_udp_mmsg`` to ``false``. bug_fixes: # *Changes expected to improve the state of the world and are unlikely to have negative effects* diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index a184eaae7036..2411b860609a 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -101,7 +101,7 @@ void UdpListenerImpl::handleReadCallback() { cb_.onReadReady(); const Api::IoErrorPtr result = Utility::readPacketsFromSocket( socket_->ioHandle(), *socket_->connectionInfoProvider().localAddress(), *this, time_source_, - config_.prefer_gro_, packets_dropped_); + config_.prefer_gro_, /*allow_mmsg=*/true, packets_dropped_); if (result == nullptr) { // No error. The number of reads was limited by read rate. There are more packets to read. // Register to read more in the next event loop. diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index eb7bb2a72951..eeacd295e028 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -598,13 +598,13 @@ void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer, std::move(buffer), receive_time); } -Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, - const Address::Instance& local_address, - UdpPacketProcessor& udp_packet_processor, - MonotonicTime receive_time, bool use_gro, - uint32_t* packets_dropped) { - - if (use_gro) { +Api::IoCallUint64Result +Utility::readFromSocket(IoHandle& handle, const Address::Instance& local_address, + UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time, + UdpRecvMsgMethod recv_msg_method, uint32_t* packets_dropped) { + if (recv_msg_method == UdpRecvMsgMethod::RecvMsgWithGro) { + ASSERT(Api::OsSysCallsSingleton::get().supportsUdpGro(), + "cannot use GRO when the platform doesn't support it."); Buffer::InstancePtr buffer = std::make_unique(); IoHandle::RecvMsgOutput output(1, packets_dropped); @@ -646,7 +646,9 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, return result; } - if (handle.supportsMmsg()) { + if (recv_msg_method == UdpRecvMsgMethod::RecvMmsg) { + ASSERT(Api::OsSysCallsSingleton::get().supportsMmsg(), + "cannot use recvmmsg when the platform doesn't support it."); const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize(); // Buffer::ReservationSingleSlice is always passed by value, and can only be constructed @@ -720,24 +722,40 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle, const Address::Instance& local_address, UdpPacketProcessor& udp_packet_processor, - TimeSource& time_source, bool prefer_gro, - uint32_t& packets_dropped) { + TimeSource& time_source, bool allow_gro, + bool allow_mmsg, uint32_t& packets_dropped) { + UdpRecvMsgMethod recv_msg_method = UdpRecvMsgMethod::RecvMsg; + if (allow_gro && handle.supportsUdpGro()) { + recv_msg_method = UdpRecvMsgMethod::RecvMsgWithGro; + } else if (allow_mmsg && handle.supportsMmsg()) { + recv_msg_method = UdpRecvMsgMethod::RecvMmsg; + } + // Read at least one time, and attempt to read numPacketsExpectedPerEventLoop() packets unless // this goes over MAX_NUM_PACKETS_PER_EVENT_LOOP. size_t num_packets_to_read = std::min( MAX_NUM_PACKETS_PER_EVENT_LOOP, udp_packet_processor.numPacketsExpectedPerEventLoop()); - const bool use_gro = prefer_gro && handle.supportsUdpGro(); - size_t num_reads = - use_gro ? (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE) - : (handle.supportsMmsg() ? (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE) - : num_packets_to_read); + size_t num_reads; + switch (recv_msg_method) { + case UdpRecvMsgMethod::RecvMsgWithGro: + num_reads = (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE); + break; + case UdpRecvMsgMethod::RecvMmsg: + num_reads = (num_packets_to_read / NUM_DATAGRAMS_PER_RECEIVE); + break; + case UdpRecvMsgMethod::RecvMsg: + num_reads = num_packets_to_read; + break; + } // Make sure to read at least once. num_reads = std::max(1, num_reads); + do { const uint32_t old_packets_dropped = packets_dropped; const MonotonicTime receive_time = time_source.monotonicTime(); - Api::IoCallUint64Result result = Utility::readFromSocket( - handle, local_address, udp_packet_processor, receive_time, use_gro, &packets_dropped); + Api::IoCallUint64Result result = + Utility::readFromSocket(handle, local_address, udp_packet_processor, receive_time, + recv_msg_method, &packets_dropped); if (!result.ok()) { // No more to read or encountered a system error. diff --git a/source/common/network/utility.h b/source/common/network/utility.h index 1d2352549191..14aa8fe9b28b 100644 --- a/source/common/network/utility.h +++ b/source/common/network/utility.h @@ -85,6 +85,17 @@ struct ResolvedUdpSocketConfig { bool prefer_gro_; }; +// The different options for receiving UDP packet(s) from system calls. +enum class UdpRecvMsgMethod { + // The `recvmsg` system call. + RecvMsg, + // The `recvmsg` system call using GRO (generic receive offload). This is the preferred method, + // if the platform supports it. + RecvMsgWithGro, + // The `recvmmsg` system call. + RecvMmsg, +}; + /** * Common network utility routines. */ @@ -352,15 +363,15 @@ class Utility { * @param udp_packet_processor is the callback to receive the packet. * @param receive_time is the timestamp passed to udp_packet_processor for the * receive time of the packet. - * @param prefer_gro supplies whether to use GRO if the OS supports it. + * @param recv_msg_method the type of system call and socket options combination to use when + * receiving packets from the kernel. * @param packets_dropped is the output parameter for number of packets dropped in kernel. If the * caller is not interested in it, nullptr can be passed in. */ - static Api::IoCallUint64Result readFromSocket(IoHandle& handle, - const Address::Instance& local_address, - UdpPacketProcessor& udp_packet_processor, - MonotonicTime receive_time, bool use_gro, - uint32_t* packets_dropped); + static Api::IoCallUint64Result + readFromSocket(IoHandle& handle, const Address::Instance& local_address, + UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time, + UdpRecvMsgMethod recv_msg_method, uint32_t* packets_dropped); /** * Read some packets from a given UDP socket and pass the packet to a given @@ -369,7 +380,11 @@ class Utility { * @param local_address is the socket's local address used to populate port. * @param udp_packet_processor is the callback to receive the packets. * @param time_source is the time source used to generate the time stamp of the received packets. - * @param prefer_gro supplies whether to use GRO if the OS supports it. + * @param allow_gro whether to use GRO, iff the platform supports it. This function will check + * the IoHandle to ensure the platform supports GRO before using it. + * @param allow_mmsg whether to use recvmmsg, iff the platform supports it. This function will + * check the IoHandle to ensure the platform supports recvmmsg before using it. If `allow_gro` is + * true and the platform supports GRO, then it will take precedence over using recvmmsg. * @param packets_dropped is the output parameter for number of packets dropped in kernel. * Return the io error encountered or nullptr if no io error but read stopped * because of MAX_NUM_PACKETS_PER_EVENT_LOOP. @@ -384,8 +399,8 @@ class Utility { static Api::IoErrorPtr readPacketsFromSocket(IoHandle& handle, const Address::Instance& local_address, UdpPacketProcessor& udp_packet_processor, - TimeSource& time_source, bool prefer_gro, - uint32_t& packets_dropped); + TimeSource& time_source, bool allow_gro, + bool allow_mmsg, uint32_t& packets_dropped); private: static void throwWithMalformedIp(absl::string_view ip_address); diff --git a/source/common/quic/BUILD b/source/common/quic/BUILD index 2b9890ed086f..7aafa7be99f8 100644 --- a/source/common/quic/BUILD +++ b/source/common/quic/BUILD @@ -180,6 +180,7 @@ envoy_cc_library( "//envoy/http:codec_interface", "//envoy/http:persistent_quic_info_interface", "//envoy/registry", + "//source/common/runtime:runtime_lib", "//source/common/tls:ssl_socket_lib", "//source/extensions/quic/crypto_stream:envoy_quic_crypto_client_stream_lib", "@com_github_google_quiche//:quic_core_http_spdy_session_lib", @@ -362,6 +363,7 @@ envoy_cc_library( "//envoy/event:dispatcher_interface", "//source/common/network:socket_option_factory_lib", "//source/common/network:udp_packet_writer_handler_lib", + "//source/common/runtime:runtime_lib", "@com_github_google_quiche//:quic_core_connection_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], diff --git a/source/common/quic/client_connection_factory_impl.cc b/source/common/quic/client_connection_factory_impl.cc index 3317e11463bc..08d7ce0fa05c 100644 --- a/source/common/quic/client_connection_factory_impl.cc +++ b/source/common/quic/client_connection_factory_impl.cc @@ -1,5 +1,7 @@ #include "source/common/quic/client_connection_factory_impl.h" +#include "source/common/runtime/runtime_features.h" + namespace Envoy { namespace Quic { @@ -45,7 +47,8 @@ std::unique_ptr createQuicNetworkConnection( ASSERT(!quic_versions.empty()); auto connection = std::make_unique( quic::QuicUtils::CreateRandomConnectionId(), server_addr, info_impl->conn_helper_, - info_impl->alarm_factory_, quic_versions, local_addr, dispatcher, options, generator); + info_impl->alarm_factory_, quic_versions, local_addr, dispatcher, options, generator, + Runtime::runtimeFeatureEnabled("envoy.reloadable_features.prefer_quic_client_udp_gro")); // TODO (danzh) move this temporary config and initial RTT configuration to h3 pool. quic::QuicConfig config = info_impl->quic_config_; diff --git a/source/common/quic/envoy_quic_client_connection.cc b/source/common/quic/envoy_quic_client_connection.cc index aa17972506f5..633f6822068e 100644 --- a/source/common/quic/envoy_quic_client_connection.cc +++ b/source/common/quic/envoy_quic_client_connection.cc @@ -7,6 +7,7 @@ #include "source/common/network/socket_option_factory.h" #include "source/common/network/udp_packet_writer_handler_impl.h" #include "source/common/quic/envoy_quic_utils.h" +#include "source/common/runtime/runtime_features.h" namespace Envoy { namespace Quic { @@ -30,35 +31,38 @@ EnvoyQuicClientConnection::EnvoyQuicClientConnection( const quic::ParsedQuicVersionVector& supported_versions, Network::Address::InstanceConstSharedPtr local_addr, Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, - quic::ConnectionIdGeneratorInterface& generator) + quic::ConnectionIdGeneratorInterface& generator, const bool prefer_gro) : EnvoyQuicClientConnection( server_connection_id, helper, alarm_factory, supported_versions, dispatcher, - createConnectionSocket(initial_peer_address, local_addr, options), generator) {} + createConnectionSocket(initial_peer_address, local_addr, options, prefer_gro), generator, + prefer_gro) {} EnvoyQuicClientConnection::EnvoyQuicClientConnection( const quic::QuicConnectionId& server_connection_id, quic::QuicConnectionHelperInterface& helper, quic::QuicAlarmFactory& alarm_factory, const quic::ParsedQuicVersionVector& supported_versions, Event::Dispatcher& dispatcher, Network::ConnectionSocketPtr&& connection_socket, - quic::ConnectionIdGeneratorInterface& generator) + quic::ConnectionIdGeneratorInterface& generator, const bool prefer_gro) : EnvoyQuicClientConnection( server_connection_id, helper, alarm_factory, new EnvoyQuicPacketWriter( std::make_unique(connection_socket->ioHandle())), /*owns_writer=*/true, supported_versions, dispatcher, std::move(connection_socket), - generator) {} + generator, prefer_gro) {} EnvoyQuicClientConnection::EnvoyQuicClientConnection( const quic::QuicConnectionId& server_connection_id, quic::QuicConnectionHelperInterface& helper, quic::QuicAlarmFactory& alarm_factory, quic::QuicPacketWriter* writer, bool owns_writer, const quic::ParsedQuicVersionVector& supported_versions, Event::Dispatcher& dispatcher, Network::ConnectionSocketPtr&& connection_socket, - quic::ConnectionIdGeneratorInterface& generator) + quic::ConnectionIdGeneratorInterface& generator, const bool prefer_gro) : quic::QuicConnection(server_connection_id, quic::QuicSocketAddress(), envoyIpAddressToQuicSocketAddress( connection_socket->connectionInfoProvider().remoteAddress()->ip()), &helper, &alarm_factory, writer, owns_writer, quic::Perspective::IS_CLIENT, supported_versions, generator), - QuicNetworkConnection(std::move(connection_socket)), dispatcher_(dispatcher) {} + QuicNetworkConnection(std::move(connection_socket)), dispatcher_(dispatcher), + prefer_gro_(prefer_gro), disallow_mmsg_(Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.disallow_quic_client_udp_mmsg")) {} void EnvoyQuicClientConnection::processPacket( Network::Address::InstanceConstSharedPtr local_address, @@ -175,7 +179,7 @@ void EnvoyQuicClientConnection::probeWithNewPort(const quic::QuicSocketAddress& // The probing socket will have the same host but a different port. auto probing_socket = createConnectionSocket(connectionSocket()->connectionInfoProvider().remoteAddress(), - new_local_address, connectionSocket()->options()); + new_local_address, connectionSocket()->options(), prefer_gro_); setUpConnectionSocket(*probing_socket, delegate_); auto writer = std::make_unique( std::make_unique(probing_socket->ioHandle())); @@ -249,7 +253,7 @@ void EnvoyQuicClientConnection::onFileEvent(uint32_t events, if (connected() && (events & Event::FileReadyType::Read)) { Api::IoErrorPtr err = Network::Utility::readPacketsFromSocket( connection_socket.ioHandle(), *connection_socket.connectionInfoProvider().localAddress(), - *this, dispatcher_.timeSource(), /*prefer_gro=*/false, packets_dropped_); + *this, dispatcher_.timeSource(), prefer_gro_, !disallow_mmsg_, packets_dropped_); if (err == nullptr) { // In the case where the path validation fails, the probing socket will be closed and its IO // events are no longer interesting. diff --git a/source/common/quic/envoy_quic_client_connection.h b/source/common/quic/envoy_quic_client_connection.h index 43cbc0d12901..b2faee057fcb 100644 --- a/source/common/quic/envoy_quic_client_connection.h +++ b/source/common/quic/envoy_quic_client_connection.h @@ -60,7 +60,7 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, Network::Address::InstanceConstSharedPtr local_addr, Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, - quic::ConnectionIdGeneratorInterface& generator); + quic::ConnectionIdGeneratorInterface& generator, bool prefer_gro); EnvoyQuicClientConnection(const quic::QuicConnectionId& server_connection_id, quic::QuicConnectionHelperInterface& helper, @@ -69,7 +69,7 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, const quic::ParsedQuicVersionVector& supported_versions, Event::Dispatcher& dispatcher, Network::ConnectionSocketPtr&& connection_socket, - quic::ConnectionIdGeneratorInterface& generator); + quic::ConnectionIdGeneratorInterface& generator, bool prefer_gro); // Network::UdpPacketProcessor void processPacket(Network::Address::InstanceConstSharedPtr local_address, @@ -135,7 +135,7 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, const quic::ParsedQuicVersionVector& supported_versions, Event::Dispatcher& dispatcher, Network::ConnectionSocketPtr&& connection_socket, - quic::ConnectionIdGeneratorInterface& generator); + quic::ConnectionIdGeneratorInterface& generator, bool prefer_gro); void onFileEvent(uint32_t events, Network::ConnectionSocket& connection_socket); @@ -150,6 +150,8 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, bool migrate_port_on_path_degrading_{false}; uint8_t num_socket_switches_{0}; size_t num_packets_with_unknown_dst_address_{0}; + const bool prefer_gro_; + const bool disallow_mmsg_; }; } // namespace Quic diff --git a/source/common/quic/envoy_quic_utils.cc b/source/common/quic/envoy_quic_utils.cc index 1d4b0bc68aa6..2d111361c45c 100644 --- a/source/common/quic/envoy_quic_utils.cc +++ b/source/common/quic/envoy_quic_utils.cc @@ -5,6 +5,7 @@ #include "envoy/common/platform.h" #include "envoy/config/core/v3/base.pb.h" +#include "source/common/api/os_sys_calls_impl.h" #include "source/common/http/utility.h" #include "source/common/network/socket_option_factory.h" #include "source/common/network/utility.h" @@ -137,7 +138,8 @@ Http::StreamResetReason quicErrorCodeToEnvoyRemoteResetReason(quic::QuicErrorCod Network::ConnectionSocketPtr createConnectionSocket(const Network::Address::InstanceConstSharedPtr& peer_addr, Network::Address::InstanceConstSharedPtr& local_addr, - const Network::ConnectionSocket::OptionsSharedPtr& options) { + const Network::ConnectionSocket::OptionsSharedPtr& options, + const bool prefer_gro) { if (local_addr == nullptr) { local_addr = Network::Utility::getLocalAddress(peer_addr->ip()->version()); } @@ -149,6 +151,9 @@ createConnectionSocket(const Network::Address::InstanceConstSharedPtr& peer_addr } connection_socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions()); connection_socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions()); + if (prefer_gro && Api::OsSysCallsSingleton::get().supportsUdpGro()) { + connection_socket->addOptions(Network::SocketOptionFactory::buildUdpGroOptions()); + } if (options != nullptr) { connection_socket->addOptions(options); } diff --git a/source/common/quic/envoy_quic_utils.h b/source/common/quic/envoy_quic_utils.h index ae63d9a78297..1dc6991ca3d3 100644 --- a/source/common/quic/envoy_quic_utils.h +++ b/source/common/quic/envoy_quic_utils.h @@ -161,7 +161,8 @@ Http::StreamResetReason quicErrorCodeToEnvoyRemoteResetReason(quic::QuicErrorCod Network::ConnectionSocketPtr createConnectionSocket(const Network::Address::InstanceConstSharedPtr& peer_addr, Network::Address::InstanceConstSharedPtr& local_addr, - const Network::ConnectionSocket::OptionsSharedPtr& options); + const Network::ConnectionSocket::OptionsSharedPtr& options, + bool prefer_gro = false); // Convert a cert in string form to X509 object. // Return nullptr if the bytes passed cannot be passed. diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 0f2f5862ca25..40e526641425 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -38,6 +38,7 @@ RUNTIME_GUARD(envoy_reloadable_features_copy_response_code_to_downstream_stream_ RUNTIME_GUARD(envoy_reloadable_features_defer_processing_backedup_streams); RUNTIME_GUARD(envoy_reloadable_features_detect_and_raise_rst_tcp_connection); RUNTIME_GUARD(envoy_reloadable_features_dfp_mixed_scheme); +RUNTIME_GUARD(envoy_reloadable_features_disallow_quic_client_udp_mmsg); RUNTIME_GUARD(envoy_reloadable_features_dns_cache_set_first_resolve_complete); RUNTIME_GUARD(envoy_reloadable_features_edf_lb_host_scheduler_init_fix); RUNTIME_GUARD(envoy_reloadable_features_edf_lb_locality_scheduler_init_fix); @@ -74,6 +75,7 @@ RUNTIME_GUARD(envoy_reloadable_features_oauth_make_token_cookie_httponly); RUNTIME_GUARD(envoy_reloadable_features_oauth_use_standard_max_age_value); RUNTIME_GUARD(envoy_reloadable_features_oauth_use_url_encoding); RUNTIME_GUARD(envoy_reloadable_features_original_dst_rely_on_idle_timeout); +RUNTIME_GUARD(envoy_reloadable_features_prefer_quic_client_udp_gro); RUNTIME_GUARD(envoy_reloadable_features_proxy_status_mapping_more_core_response_flags); RUNTIME_GUARD(envoy_reloadable_features_proxy_status_upstream_request_timeout); RUNTIME_GUARD(envoy_reloadable_features_quic_fix_filter_manager_uaf); diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 7e8ba256ebc5..2b0e416a9eba 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -399,7 +399,8 @@ void UdpProxyFilter::UdpActiveSession::onReadReady() { uint32_t packets_dropped = 0; const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( udp_socket_->ioHandle(), *addresses_.local_, *this, cluster_.filter_.config_->timeSource(), - cluster_.filter_.config_->upstreamSocketConfig().prefer_gro_, packets_dropped); + cluster_.filter_.config_->upstreamSocketConfig().prefer_gro_, /*allow_mmsg=*/true, + packets_dropped); if (result == nullptr) { udp_socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); return; diff --git a/source/extensions/upstreams/http/udp/upstream_request.cc b/source/extensions/upstreams/http/udp/upstream_request.cc index 1fd5de949251..fdba4ecc5962 100644 --- a/source/extensions/upstreams/http/udp/upstream_request.cc +++ b/source/extensions/upstreams/http/udp/upstream_request.cc @@ -113,7 +113,7 @@ void UdpUpstream::onSocketReadReady() { uint32_t packets_dropped = 0; const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( socket_->ioHandle(), *socket_->connectionInfoProvider().localAddress(), *this, - dispatcher_.timeSource(), /*prefer_gro=*/true, packets_dropped); + dispatcher_.timeSource(), /*allow_gro=*/true, /*allow_mmsg=*/true, packets_dropped); if (result == nullptr) { socket_->ioHandle().activateFileEvents(Event::FileReadyType::Read); return; diff --git a/test/common/http/conn_pool_grid_test.cc b/test/common/http/conn_pool_grid_test.cc index d9b1fff1fdf6..1a44c6f8a5ac 100644 --- a/test/common/http/conn_pool_grid_test.cc +++ b/test/common/http/conn_pool_grid_test.cc @@ -1033,7 +1033,7 @@ TEST_F(ConnectivityGridTest, ConnectionCloseDuringAysnConnect) { ASSERT_EQ(0, Api::OsSysCallsSingleton::get().getifaddrs(interfaces).return_value_); } - Api::MockOsSysCalls os_sys_calls; + NiceMock os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); EXPECT_CALL(os_sys_calls, supportsGetifaddrs()).WillOnce(Return(supports_getifaddrs)); if (supports_getifaddrs) { diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index 6df91c150355..cc4818cfd43a 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -516,7 +516,7 @@ TEST_P(UdpListenerImplTest, UdpListenerRecvMsgError) { // Inject mocked OsSysCalls implementation to mock a read failure. Api::MockOsSysCalls os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); - EXPECT_CALL(os_sys_calls, supportsMmsg()).Times((2u)); + EXPECT_CALL(os_sys_calls, supportsMmsg()).Times((1u)); EXPECT_CALL(os_sys_calls, recvmsg(_, _, _)) .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_NOT_SUP})); diff --git a/test/common/network/utility_test.cc b/test/common/network/utility_test.cc index f4e3e3886d79..765ab7c4eafb 100644 --- a/test/common/network/utility_test.cc +++ b/test/common/network/utility_test.cc @@ -829,7 +829,12 @@ TEST(PacketLoss, LossTest) { NiceMock processor; MonotonicTime time(std::chrono::seconds(0)); uint32_t packets_dropped = 0; - Utility::readFromSocket(handle, *address, processor, time, false, &packets_dropped); + UdpRecvMsgMethod recv_msg_method = UdpRecvMsgMethod::RecvMsg; + if (Api::OsSysCallsSingleton::get().supportsMmsg()) { + recv_msg_method = UdpRecvMsgMethod::RecvMmsg; + } + + Utility::readFromSocket(handle, *address, processor, time, recv_msg_method, &packets_dropped); EXPECT_EQ(1, packets_dropped); // Send another packet. @@ -837,7 +842,7 @@ TEST(PacketLoss, LossTest) { reinterpret_cast(&storage), sizeof(storage))); // Make sure the drop count is now 2. - Utility::readFromSocket(handle, *address, processor, time, false, &packets_dropped); + Utility::readFromSocket(handle, *address, processor, time, recv_msg_method, &packets_dropped); EXPECT_EQ(2, packets_dropped); } #endif diff --git a/test/common/quic/BUILD b/test/common/quic/BUILD index 6c0fff13ff16..b488b190353d 100644 --- a/test/common/quic/BUILD +++ b/test/common/quic/BUILD @@ -190,6 +190,7 @@ envoy_cc_test( deps = [ ":test_utils_lib", "//envoy/stats:stats_macros", + "//source/common/api:os_sys_calls_lib", "//source/common/quic:client_codec_lib", "//source/common/quic:envoy_quic_alarm_factory_lib", "//source/common/quic:envoy_quic_client_connection_lib", @@ -202,6 +203,8 @@ envoy_cc_test( "//test/mocks/stats:stats_mocks", "//test/test_common:logging_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:test_runtime_lib", + "//test/test_common:threadsafe_singleton_injector_lib", "@com_github_google_quiche//:quic_test_tools_session_peer_lib", ], ) diff --git a/test/common/quic/envoy_quic_client_session_test.cc b/test/common/quic/envoy_quic_client_session_test.cc index 1a97687d2118..6d1701e7256c 100644 --- a/test/common/quic/envoy_quic_client_session_test.cc +++ b/test/common/quic/envoy_quic_client_session_test.cc @@ -1,5 +1,6 @@ #include "envoy/stats/stats_macros.h" +#include "source/common/api/os_sys_calls_impl.h" #include "source/common/network/transport_socket_options_impl.h" #include "source/common/quic/client_codec_impl.h" #include "source/common/quic/envoy_quic_alarm_factory.h" @@ -18,6 +19,8 @@ #include "test/test_common/logging.h" #include "test/test_common/network_utility.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/test_runtime.h" +#include "test/test_common/threadsafe_singleton_injector.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -29,6 +32,7 @@ using testing::_; using testing::Invoke; +using testing::Return; namespace Envoy { namespace Quic { @@ -45,7 +49,7 @@ class TestEnvoyQuicClientConnection : public EnvoyQuicClientConnection { quic::ConnectionIdGeneratorInterface& generator) : EnvoyQuicClientConnection(server_connection_id, helper, alarm_factory, &writer, false, supported_versions, dispatcher, std::move(connection_socket), - generator) { + generator, /*prefer_gro=*/true) { SetEncrypter(quic::ENCRYPTION_FORWARD_SECURE, std::make_unique(quic::ENCRYPTION_FORWARD_SECURE)); InstallDecrypter(quic::ENCRYPTION_FORWARD_SECURE, @@ -73,45 +77,50 @@ class EnvoyQuicClientSessionTest : public testing::TestWithParam( quic::test::crypto_test_utils::ProofVerifierForTesting())), quic_stat_names_(store_.symbolTable()), transport_socket_options_(std::make_shared()), - envoy_quic_session_( - quic_config_, quic_version_, - std::unique_ptr(quic_connection_), - quic::QuicServerId("example.com", 443, false), crypto_config_, *dispatcher_, - /*send_buffer_limit*/ 1024 * 1024, crypto_stream_factory_, quic_stat_names_, {}, - *store_.rootScope(), transport_socket_options_, {}), stats_({ALL_HTTP3_CODEC_STATS(POOL_COUNTER_PREFIX(store_, "http3."), POOL_GAUGE_PREFIX(store_, "http3."))}) { http3_options_.mutable_quic_protocol_options() ->mutable_num_timeouts_to_trigger_port_migration() ->set_value(1); + } + + void SetUp() override { + quic_connection_ = new TestEnvoyQuicClientConnection( + quic::test::TestConnectionId(), connection_helper_, alarm_factory_, writer_, quic_version_, + *dispatcher_, createConnectionSocket(peer_addr_, self_addr_, nullptr, /*prefer_gro=*/true), + connection_id_generator_); + + OptRef cache; + OptRef uts_factory; + envoy_quic_session_ = std::make_unique( + quic_config_, quic_version_, + std::unique_ptr(quic_connection_), + quic::QuicServerId("example.com", 443, false), crypto_config_, *dispatcher_, + /*send_buffer_limit*/ 1024 * 1024, crypto_stream_factory_, quic_stat_names_, cache, + *store_.rootScope(), transport_socket_options_, uts_factory); + http_connection_ = std::make_unique( - envoy_quic_session_, http_connection_callbacks_, stats_, http3_options_, 64 * 1024, 100); - EXPECT_EQ(time_system_.systemTime(), envoy_quic_session_.streamInfo().startTime()); - EXPECT_EQ(EMPTY_STRING, envoy_quic_session_.nextProtocol()); + *envoy_quic_session_, http_connection_callbacks_, stats_, http3_options_, 64 * 1024, 100); + EXPECT_EQ(time_system_.systemTime(), envoy_quic_session_->streamInfo().startTime()); + EXPECT_EQ(EMPTY_STRING, envoy_quic_session_->nextProtocol()); EXPECT_EQ(Http::Protocol::Http3, http_connection_->protocol()); time_system_.advanceTimeWait(std::chrono::milliseconds(1)); ON_CALL(writer_, WritePacket(_, _, _, _, _, _)) .WillByDefault(testing::Return(quic::WriteResult(quic::WRITE_STATUS_OK, 1))); - } - void SetUp() override { - envoy_quic_session_.Initialize(); - setQuicConfigWithDefaultValues(envoy_quic_session_.config()); + envoy_quic_session_->Initialize(); + setQuicConfigWithDefaultValues(envoy_quic_session_->config()); quic::test::QuicConfigPeer::SetReceivedStatelessResetToken( - envoy_quic_session_.config(), + envoy_quic_session_->config(), quic::QuicUtils::GenerateStatelessResetToken(quic::test::TestConnectionId())); - envoy_quic_session_.OnConfigNegotiated(); - envoy_quic_session_.addConnectionCallbacks(network_connection_callbacks_); - envoy_quic_session_.setConnectionStats( + envoy_quic_session_->OnConfigNegotiated(); + envoy_quic_session_->addConnectionCallbacks(network_connection_callbacks_); + envoy_quic_session_->setConnectionStats( {read_total_, read_current_, write_total_, write_current_, nullptr, nullptr}); EXPECT_EQ(&read_total_, &quic_connection_->connectionStats().read_total_); } @@ -121,7 +130,7 @@ class EnvoyQuicClientSessionTest : public testing::TestWithParamclose(Network::ConnectionCloseType::NoFlush); } peer_socket_->close(); } @@ -163,7 +172,7 @@ class EnvoyQuicClientSessionTest : public testing::TestWithParam envoy_quic_session_; Network::MockConnectionCallbacks network_connection_callbacks_; Http::MockServerConnectionCallbacks http_connection_callbacks_; testing::StrictMock read_total_; @@ -199,8 +208,8 @@ TEST_P(EnvoyQuicClientSessionTest, NewStream) { TEST_P(EnvoyQuicClientSessionTest, PacketLimits) { // We always allow for reading packets, even if there's no stream. - EXPECT_EQ(0, envoy_quic_session_.GetNumActiveStreams()); - EXPECT_EQ(16, envoy_quic_session_.numPacketsExpectedPerEventLoop()); + EXPECT_EQ(0, envoy_quic_session_->GetNumActiveStreams()); + EXPECT_EQ(16, envoy_quic_session_->numPacketsExpectedPerEventLoop()); NiceMock response_decoder; NiceMock stream_callbacks; @@ -217,8 +226,8 @@ TEST_P(EnvoyQuicClientSessionTest, PacketLimits) { })); stream.OnStreamHeaderList(/*fin=*/false, headers.uncompressed_header_bytes(), headers); // With one stream, still read 16 packets. - EXPECT_EQ(1, envoy_quic_session_.GetNumActiveStreams()); - EXPECT_EQ(16, envoy_quic_session_.numPacketsExpectedPerEventLoop()); + EXPECT_EQ(1, envoy_quic_session_->GetNumActiveStreams()); + EXPECT_EQ(16, envoy_quic_session_->numPacketsExpectedPerEventLoop()); EnvoyQuicClientStream& stream2 = sendGetRequest(response_decoder, stream_callbacks); EXPECT_CALL(response_decoder, decodeHeaders_(_, /*end_stream=*/false)) @@ -227,13 +236,13 @@ TEST_P(EnvoyQuicClientSessionTest, PacketLimits) { })); stream2.OnStreamHeaderList(/*fin=*/false, headers.uncompressed_header_bytes(), headers); // With 2 streams, read 32 packets. - EXPECT_EQ(2, envoy_quic_session_.GetNumActiveStreams()); - EXPECT_EQ(32, envoy_quic_session_.numPacketsExpectedPerEventLoop()); + EXPECT_EQ(2, envoy_quic_session_->GetNumActiveStreams()); + EXPECT_EQ(32, envoy_quic_session_->numPacketsExpectedPerEventLoop()); EXPECT_CALL(*quic_connection_, SendConnectionClosePacket(quic::QUIC_NO_ERROR, _, "Closed by application")); EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); - envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); + envoy_quic_session_->close(Network::ConnectionCloseType::NoFlush); } TEST_P(EnvoyQuicClientSessionTest, OnResetFrame) { @@ -246,7 +255,7 @@ TEST_P(EnvoyQuicClientSessionTest, OnResetFrame) { quic::QuicRstStreamFrame rst1(/*control_frame_id=*/1u, stream_id, quic::QUIC_ERROR_PROCESSING_STREAM, /*bytes_written=*/0u); EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::RemoteReset, _)); - envoy_quic_session_.OnRstStream(rst1); + envoy_quic_session_->OnRstStream(rst1); EXPECT_EQ( 1U, TestUtility::findCounter( @@ -263,7 +272,7 @@ TEST_P(EnvoyQuicClientSessionTest, SendResetFrame) { quic::QuicStreamId stream_id = stream.id(); EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::LocalReset, _)); EXPECT_CALL(*quic_connection_, SendControlFrame(_)); - envoy_quic_session_.ResetStream(stream_id, quic::QUIC_ERROR_PROCESSING_STREAM); + envoy_quic_session_->ResetStream(stream_id, quic::QUIC_ERROR_PROCESSING_STREAM); EXPECT_EQ( 1U, TestUtility::findCounter( @@ -276,7 +285,7 @@ TEST_P(EnvoyQuicClientSessionTest, OnGoAwayFrame) { Http::MockStreamCallbacks stream_callbacks; EXPECT_CALL(http_connection_callbacks_, onGoAway(Http::GoAwayErrorCode::NoError)); - envoy_quic_session_.OnHttp3GoAway(4u); + envoy_quic_session_->OnHttp3GoAway(4u); } TEST_P(EnvoyQuicClientSessionTest, ConnectionClose) { @@ -288,8 +297,8 @@ TEST_P(EnvoyQuicClientSessionTest, ConnectionClose) { EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::RemoteClose)); quic_connection_->OnConnectionCloseFrame(frame); EXPECT_EQ(absl::StrCat(quic::QuicErrorCodeToString(error), " with details: ", error_details), - envoy_quic_session_.transportFailureReason()); - EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + envoy_quic_session_->transportFailureReason()); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_->state()); EXPECT_EQ( 1U, TestUtility::findCounter( @@ -305,8 +314,8 @@ TEST_P(EnvoyQuicClientSessionTest, ConnectionCloseWithActiveStream) { SendConnectionClosePacket(quic::QUIC_NO_ERROR, _, "Closed by application")); EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); - envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); - EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + envoy_quic_session_->close(Network::ConnectionCloseType::NoFlush); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_->state()); EXPECT_TRUE(stream.write_side_closed() && stream.reading_stopped()); EXPECT_EQ(1U, TestUtility::findCounter( store_, "http3.upstream.tx.quic_connection_close_error_code_QUIC_NO_ERROR") @@ -321,8 +330,8 @@ TEST_P(EnvoyQuicClientSessionTest, HandshakeTimesOutWithActiveStream) { SendConnectionClosePacket(quic::QUIC_HANDSHAKE_FAILED, _, "fake handshake time out")); EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::LocalConnectionFailure, _)); - envoy_quic_session_.OnStreamError(quic::QUIC_HANDSHAKE_FAILED, "fake handshake time out"); - EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + envoy_quic_session_->OnStreamError(quic::QUIC_HANDSHAKE_FAILED, "fake handshake time out"); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_->state()); EXPECT_TRUE(stream.write_side_closed() && stream.reading_stopped()); EXPECT_EQ(1U, TestUtility::findCounter( @@ -339,8 +348,8 @@ TEST_P(EnvoyQuicClientSessionTest, ConnectionClosePopulatesQuicVersionStats) { EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::RemoteClose)); quic_connection_->OnConnectionCloseFrame(frame); EXPECT_EQ(absl::StrCat(quic::QuicErrorCodeToString(error), " with details: ", error_details), - envoy_quic_session_.transportFailureReason()); - EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + envoy_quic_session_->transportFailureReason()); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_->state()); std::string quic_version_stat_name; switch (GetParam().transport_version) { case quic::QUIC_VERSION_IETF_DRAFT_29: @@ -360,8 +369,8 @@ TEST_P(EnvoyQuicClientSessionTest, ConnectionClosePopulatesQuicVersionStats) { TEST_P(EnvoyQuicClientSessionTest, IncomingUnidirectionalReadStream) { quic::QuicStreamId stream_id = 1u; quic::QuicStreamFrame stream_frame(stream_id, false, 0, "aaa"); - envoy_quic_session_.OnStreamFrame(stream_frame); - EXPECT_FALSE(quic::test::QuicSessionPeer::IsStreamCreated(&envoy_quic_session_, stream_id)); + envoy_quic_session_->OnStreamFrame(stream_frame); + EXPECT_FALSE(quic::test::QuicSessionPeer::IsStreamCreated(envoy_quic_session_.get(), stream_id)); // IETF stream 3 is server initiated uni-directional stream. stream_id = 3u; auto payload = std::make_unique(8); @@ -371,17 +380,18 @@ TEST_P(EnvoyQuicClientSessionTest, IncomingUnidirectionalReadStream) { EXPECT_CALL(*quic_connection_, SendConnectionClosePacket(quic::QUIC_HTTP_RECEIVE_SERVER_PUSH, _, "Received server push stream")); quic::QuicStreamFrame stream_frame2(stream_id, false, 0, absl::string_view(payload.get(), 1)); - envoy_quic_session_.OnStreamFrame(stream_frame2); + envoy_quic_session_->OnStreamFrame(stream_frame2); } TEST_P(EnvoyQuicClientSessionTest, GetRttAndCwnd) { - EXPECT_GT(envoy_quic_session_.lastRoundTripTime().value(), std::chrono::microseconds(0)); + EXPECT_GT(envoy_quic_session_->lastRoundTripTime().value(), std::chrono::microseconds(0)); // Just make sure the CWND is non-zero. We don't want to make strong assertions on what the value // should be in this test, that is the job the congestion controllers' tests. - EXPECT_GT(envoy_quic_session_.congestionWindowInBytes().value(), 500); + EXPECT_GT(envoy_quic_session_->congestionWindowInBytes().value(), 500); - envoy_quic_session_.configureInitialCongestionWindow(8000000, std::chrono::microseconds(1000000)); - EXPECT_GT(envoy_quic_session_.congestionWindowInBytes().value(), + envoy_quic_session_->configureInitialCongestionWindow(8000000, + std::chrono::microseconds(1000000)); + EXPECT_GT(envoy_quic_session_->congestionWindowInBytes().value(), quic::kInitialCongestionWindow * quic::kDefaultTCPMSS); } @@ -463,7 +473,7 @@ TEST_P(EnvoyQuicClientSessionTest, StatelessResetOnProbingSocket) { // Trigger port migration. quic_connection_->OnPathDegradingDetected(); - EXPECT_TRUE(envoy_quic_session_.HasPendingPathValidation()); + EXPECT_TRUE(envoy_quic_session_->HasPendingPathValidation()); auto* path_validation_context = dynamic_cast( quic_connection_->GetPathValidationContext()); @@ -486,7 +496,7 @@ TEST_P(EnvoyQuicClientSessionTest, StatelessResetOnProbingSocket) { // fail the probing. EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::RemoteClose)) .Times(0); - while (envoy_quic_session_.HasPendingPathValidation()) { + while (envoy_quic_session_->HasPendingPathValidation()) { // Running event loop to receive the STATELESS_RESET and following socket reads shouldn't cause // crash. dispatcher_->run(Event::Dispatcher::RunType::NonBlock); @@ -494,5 +504,172 @@ TEST_P(EnvoyQuicClientSessionTest, StatelessResetOnProbingSocket) { EXPECT_EQ(self_addr_->asString(), quic_connection_->self_address().ToString()); } +class MockOsSysCallsImpl : public Api::OsSysCallsImpl { +public: + MOCK_METHOD(Api::SysCallSizeResult, recvmsg, (os_fd_t socket, msghdr* msg, int flags), + (override)); + MOCK_METHOD(Api::SysCallIntResult, recvmmsg, + (os_fd_t socket, struct mmsghdr* msgvec, unsigned int vlen, int flags, + struct timespec* timeout), + (override)); + MOCK_METHOD(bool, supportsUdpGro, (), (const)); +}; + +// Ensures that the Network::Utility::readFromSocket function uses GRO. +// Only Linux platforms support GRO. +TEST_P(EnvoyQuicClientSessionTest, UsesUdpGro) { + if (!Api::OsSysCallsSingleton::get().supportsUdpGro()) { + GTEST_SKIP() << "Platform doesn't support GRO."; + } + + NiceMock os_sys_calls; + TestThreadsafeSingletonInjector singleton_injector{&os_sys_calls}; + + // Have to connect the QUIC session, so that the socket is set up so we can do I/O on it. + envoy_quic_session_->connect(); + + std::string write_data = "abc"; + Buffer::RawSlice slice; + slice.mem_ = write_data.data(); + slice.len_ = write_data.length(); + + // Make sure the option for GRO is set on the socket. +// Windows doesn't have the GRO socket options. +#if !defined(WIN32) + int sock_opt; + socklen_t sock_len = sizeof(int); + EXPECT_EQ(0, quic_connection_->connectionSocket() + ->getSocketOption(SOL_UDP, UDP_GRO, &sock_opt, &sock_len) + .return_value_); + EXPECT_EQ(1, sock_opt); +#endif + + // GRO uses `recvmsg`, not `recvmmsg`. + EXPECT_CALL(os_sys_calls, supportsUdpGro()).WillRepeatedly(Return(true)); + EXPECT_CALL(os_sys_calls, recvmmsg(_, _, _, _, _)).Times(0); + EXPECT_CALL(os_sys_calls, recvmsg(_, _, _)) + .WillOnce( + Invoke([&](os_fd_t /*socket*/, msghdr* /*msg*/, int /*flags*/) -> Api::SysCallSizeResult { + dispatcher_->exit(); + // Return an error so IoSocketHandleImpl::recvmsg() exits early, instead of trying to + // use the msghdr that would normally have been populated by recvmsg but is not + // populated by this mock. + return {-1, SOCKET_ERROR_AGAIN}; + })); + + peer_socket_->ioHandle().sendmsg(&slice, 1, 0, peer_addr_->ip(), *self_addr_); + + EXPECT_LOG_CONTAINS("trace", "starting gro recvmsg with max", + dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit)); +} + +class EnvoyQuicClientSessionDisallowMmsgTest : public EnvoyQuicClientSessionTest { +public: + void SetUp() override { + EXPECT_CALL(os_sys_calls_, supportsUdpGro()).WillRepeatedly(Return(false)); + EnvoyQuicClientSessionTest::SetUp(); + } + +protected: + NiceMock os_sys_calls_; + +private: + TestThreadsafeSingletonInjector singleton_injector_{&os_sys_calls_}; +}; + +INSTANTIATE_TEST_SUITE_P(EnvoyQuicClientSessionDisallowMmsgTests, + EnvoyQuicClientSessionDisallowMmsgTest, + testing::ValuesIn(quic::CurrentSupportedHttp3Versions())); + +// Ensures that the Network::Utility::readFromSocket function uses `recvmsg` for client QUIC +// connections when GRO is not supported. +TEST_P(EnvoyQuicClientSessionDisallowMmsgTest, UsesRecvMsgWhenNoGro) { + // Have to connect the QUIC session, so that the socket is set up so we can do I/O on it. + envoy_quic_session_->connect(); + + std::string write_data = "abc"; + Buffer::RawSlice slice; + slice.mem_ = write_data.data(); + slice.len_ = write_data.length(); + +// Windows doesn't have the GRO socket options. +#if !defined(WIN32) + // Make sure the option for GRO is *not* set on the socket. + int sock_opt; + socklen_t sock_len = sizeof(int); + EXPECT_EQ(0, quic_connection_->connectionSocket() + ->getSocketOption(SOL_UDP, UDP_GRO, &sock_opt, &sock_len) + .return_value_); + EXPECT_EQ(0, sock_opt); +#endif + + // Uses `recvmsg`, not `recvmmsg`. + EXPECT_CALL(os_sys_calls_, recvmmsg(_, _, _, _, _)).Times(0); + EXPECT_CALL(os_sys_calls_, recvmsg(_, _, _)) + .WillOnce( + Invoke([&](os_fd_t /*socket*/, msghdr* /*msg*/, int /*flags*/) -> Api::SysCallSizeResult { + dispatcher_->exit(); + // Return an error so IoSocketHandleImpl::recvmsg() exits early, instead of trying to + // use the msghdr that would normally have been populated by recvmsg but is not + // populated by this mock. + return {-1, SOCKET_ERROR_AGAIN}; + })); + + peer_socket_->ioHandle().sendmsg(&slice, 1, 0, peer_addr_->ip(), *self_addr_); + + EXPECT_LOG_CONTAINS("trace", "starting recvmsg with max", + dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit)); +} + +class EnvoyQuicClientSessionAllowMmsgTest : public EnvoyQuicClientSessionTest { +public: + void SetUp() override { + EXPECT_CALL(os_sys_calls_, supportsUdpGro()).WillRepeatedly(Return(false)); + + scoped_runtime_.mergeValues( + {{"envoy.reloadable_features.disallow_quic_client_udp_mmsg", "false"}}); + EnvoyQuicClientSessionTest::SetUp(); + } + +protected: + NiceMock os_sys_calls_; + +private: + TestScopedRuntime scoped_runtime_; + TestThreadsafeSingletonInjector singleton_injector_{&os_sys_calls_}; +}; + +INSTANTIATE_TEST_SUITE_P(EnvoyQuicClientSessionAllowMmsgTests, EnvoyQuicClientSessionAllowMmsgTest, + testing::ValuesIn(quic::CurrentSupportedHttp3Versions())); + +TEST_P(EnvoyQuicClientSessionAllowMmsgTest, UsesRecvMmsgWhenNoGroAndMmsgAllowed) { + if (!Api::OsSysCallsSingleton::get().supportsMmsg()) { + GTEST_SKIP() << "Platform doesn't support recvmmsg."; + } + + // Have to connect the QUIC session, so that the socket is set up so we can do I/O on it. + envoy_quic_session_->connect(); + + std::string write_data = "abc"; + Buffer::RawSlice slice; + slice.mem_ = write_data.data(); + slice.len_ = write_data.length(); + + // Make sure recvmmsg is used when GRO isn't supported. + EXPECT_CALL(os_sys_calls_, supportsUdpGro()).WillRepeatedly(Return(false)); + EXPECT_CALL(os_sys_calls_, recvmsg(_, _, _)).Times(0); + EXPECT_CALL(os_sys_calls_, recvmmsg(_, _, _, _, _)) + .WillRepeatedly(Invoke([&](os_fd_t, struct mmsghdr*, unsigned int, int, + struct timespec*) -> Api::SysCallIntResult { + dispatcher_->exit(); + return {0, SOCKET_ERROR_AGAIN}; + })); + + peer_socket_->ioHandle().sendmsg(&slice, 1, 0, peer_addr_->ip(), *self_addr_); + + EXPECT_LOG_CONTAINS("trace", "starting recvmmsg with packets", + dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit)); +} + } // namespace Quic } // namespace Envoy diff --git a/test/common/quic/envoy_quic_client_stream_test.cc b/test/common/quic/envoy_quic_client_stream_test.cc index b579c6962130..4ba077ead61d 100644 --- a/test/common/quic/envoy_quic_client_stream_test.cc +++ b/test/common/quic/envoy_quic_client_stream_test.cc @@ -46,7 +46,8 @@ class EnvoyQuicClientStreamTest : public testing::Test { quic_connection_(new MockEnvoyQuicClientConnection( quic::test::TestConnectionId(), connection_helper_, alarm_factory_, &writer_, /*owns_writer=*/false, {quic_version_}, *dispatcher_, - createConnectionSocket(peer_addr_, self_addr_, nullptr), connection_id_generator_)), + createConnectionSocket(peer_addr_, self_addr_, nullptr, /*prefer_gro=*/true), + connection_id_generator_)), quic_session_(quic_config_, {quic_version_}, std::unique_ptr(quic_connection_), *dispatcher_, quic_config_.GetInitialStreamFlowControlWindowToSend() * 2, diff --git a/test/common/quic/test_utils.h b/test/common/quic/test_utils.h index 0a1a5938eab4..fdf3a1d0e587 100644 --- a/test/common/quic/test_utils.h +++ b/test/common/quic/test_utils.h @@ -75,7 +75,7 @@ class MockEnvoyQuicClientConnection : public EnvoyQuicClientConnection { quic::ConnectionIdGeneratorInterface& generator) : EnvoyQuicClientConnection(server_connection_id, helper, alarm_factory, writer, owns_writer, supported_versions, dispatcher, std::move(connection_socket), - generator) {} + generator, /*prefer_gro=*/true) {} MOCK_METHOD(quic::MessageStatus, SendMessage, (quic::QuicMessageId, absl::Span, bool)); diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index 02f4dfff74b0..5cff9cebd328 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -148,7 +148,7 @@ class UdpProxyFilterTest : public UdpProxyFilterBase { if (parent_.expect_gro_) { EXPECT_CALL(*socket_->io_handle_, supportsUdpGro()); } - EXPECT_CALL(*socket_->io_handle_, supportsMmsg()).Times(2u); + EXPECT_CALL(*socket_->io_handle_, supportsMmsg()).Times(1u); // Return the datagram. EXPECT_CALL(*socket_->io_handle_, recvmsg(_, 1, _, _)) .WillOnce( @@ -179,7 +179,6 @@ class UdpProxyFilterTest : public UdpProxyFilterBase { } })); // Return an EAGAIN result. - EXPECT_CALL(*socket_->io_handle_, supportsMmsg()); EXPECT_CALL(*socket_->io_handle_, recvmsg(_, 1, _, _)) .WillOnce(Return(ByMove( Api::IoCallUint64Result(0, Network::IoSocketError::getIoSocketEagainError())))); diff --git a/test/integration/quic_http_integration_test.cc b/test/integration/quic_http_integration_test.cc index af3420839050..e18042946688 100644 --- a/test/integration/quic_http_integration_test.cc +++ b/test/integration/quic_http_integration_test.cc @@ -73,7 +73,8 @@ class TestEnvoyQuicClientConnection : public EnvoyQuicClientConnection { bool validation_failure_on_path_response, quic::ConnectionIdGeneratorInterface& generator) : EnvoyQuicClientConnection(server_connection_id, initial_peer_address, helper, alarm_factory, - supported_versions, local_addr, dispatcher, options, generator), + supported_versions, local_addr, dispatcher, options, generator, + /*prefer_gro=*/true), dispatcher_(dispatcher), validation_failure_on_path_response_(validation_failure_on_path_response) {} @@ -837,7 +838,7 @@ TEST_P(QuicHttpIntegrationTest, PortMigration) { Network::Address::InstanceConstSharedPtr local_addr = Network::Test::getCanonicalLoopbackAddress(version_); quic_connection_->switchConnectionSocket( - createConnectionSocket(server_addr_, local_addr, nullptr)); + createConnectionSocket(server_addr_, local_addr, nullptr, /*prefer_gro=*/true)); EXPECT_NE(old_port, local_addr->ip()->port()); // Send the rest data. codec_client_->sendData(*request_encoder_, 1024u, true); @@ -866,7 +867,7 @@ TEST_P(QuicHttpIntegrationTest, PortMigration) { auto options = std::make_shared(); options->push_back(option); quic_connection_->switchConnectionSocket( - createConnectionSocket(server_addr_, local_addr, options)); + createConnectionSocket(server_addr_, local_addr, options, /*prefer_gro=*/true)); EXPECT_TRUE(codec_client_->disconnected()); cleanupUpstreamAndDownstream(); } diff --git a/test/test_common/network_utility.cc b/test/test_common/network_utility.cc index c873089811fb..40c0abfa0523 100644 --- a/test/test_common/network_utility.cc +++ b/test/test_common/network_utility.cc @@ -226,8 +226,13 @@ Api::IoCallUint64Result readFromSocket(IoHandle& handle, const Address::Instance std::list& data, uint64_t max_rx_datagram_size) { SyncPacketProcessor processor(data, max_rx_datagram_size); + UdpRecvMsgMethod recv_msg_method = UdpRecvMsgMethod::RecvMsg; + if (Api::OsSysCallsSingleton::get().supportsMmsg()) { + recv_msg_method = UdpRecvMsgMethod::RecvMmsg; + } return Network::Utility::readFromSocket(handle, local_address, processor, - MonotonicTime(std::chrono::seconds(0)), false, nullptr); + MonotonicTime(std::chrono::seconds(0)), recv_msg_method, + nullptr); } UdpSyncPeer::UdpSyncPeer(Network::Address::IpVersion version, uint64_t max_rx_datagram_size)