diff --git a/api/envoy/config/listener/v3/listener.proto b/api/envoy/config/listener/v3/listener.proto index 4e0a857ce256..5461318ada01 100644 --- a/api/envoy/config/listener/v3/listener.proto +++ b/api/envoy/config/listener/v3/listener.proto @@ -247,6 +247,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/api/envoy/config/listener/v4alpha/listener.proto b/api/envoy/config/listener/v4alpha/listener.proto index e2eb3a1e6065..e40dbf9058af 100644 --- a/api/envoy/config/listener/v4alpha/listener.proto +++ b/api/envoy/config/listener/v4alpha/listener.proto @@ -249,6 +249,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 2a465da67027..19db50652b02 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -16,6 +16,9 @@ Minor Behavior Changes (require upstream 1xx or 204 responses to not have Transfer-Encoding or non-zero Content-Length headers) and ``envoy.reloadable_features.send_strict_1xx_and_204_response_headers`` (do not send 1xx or 204 responses with these headers). Both are true by default. +* listener: respect the :ref:`connection balance config ` + defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior. + Bug Fixes --------- diff --git a/generated_api_shadow/envoy/config/listener/v3/listener.proto b/generated_api_shadow/envoy/config/listener/v3/listener.proto index 4e0a857ce256..5461318ada01 100644 --- a/generated_api_shadow/envoy/config/listener/v3/listener.proto +++ b/generated_api_shadow/envoy/config/listener/v3/listener.proto @@ -247,6 +247,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto b/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto index cad42c77a1ff..47611a615efb 100644 --- a/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto +++ b/generated_api_shadow/envoy/config/listener/v4alpha/listener.proto @@ -252,6 +252,12 @@ message Listener { // The listener's connection balancer configuration, currently only applicable to TCP listeners. // If no configuration is specified, Envoy will not attempt to balance active connections between // worker threads. + // + // In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2 + // by setting :ref:`use_original_dst ` in X + // and :ref:`bind_to_port ` to false in Y1 and Y2, + // it is recommended to disable the balance config in listener X to avoid the cost of balancing, and + // enable the balance config in Y1 and Y2 to balance the connections among the workers. ConnectionBalanceConfig connection_balance_config = 20; // When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and diff --git a/source/server/active_tcp_listener.cc b/source/server/active_tcp_listener.cc index 7a5f0a64edcc..4146ed86b39a 100644 --- a/source/server/active_tcp_listener.cc +++ b/source/server/active_tcp_listener.cc @@ -69,7 +69,8 @@ ActiveTcpListener::~ActiveTcpListener() { // for now. If it becomes a problem (developers hitting this assert when using debug builds) we // can revisit. This case, if it happens, should be benign on production builds. This case is // covered in ConnectionHandlerTest::RemoveListenerDuringRebalance. - ASSERT(num_listener_connections_ == 0); + ASSERT(num_listener_connections_ == 0, fmt::format("destroyed listener {} has {} connections", + config_->name(), numConnections())); } void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) { @@ -188,14 +189,12 @@ void ActiveTcpSocket::newConnection() { if (new_listener.has_value()) { // Hands off connections redirected by iptables to the listener associated with the // original destination address. Pass 'hand_off_restored_destination_connections' as false to - // prevent further redirection as well as 'rebalanced' as true since the connection has - // already been balanced if applicable inside onAcceptWorker() when the connection was - // initially accepted. Note also that we must account for the number of connections properly - // across both listeners. + // prevent further redirection. + // Leave the new listener to decide whether to execute re-balance. + // Note also that we must account for the number of connections properly across both listeners. // TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better. listener_.decNumConnections(); - new_listener.value().get().incNumConnections(); - new_listener.value().get().onAcceptWorker(std::move(socket_), false, true); + new_listener.value().get().onAcceptWorker(std::move(socket_), false, false); } else { // Set default transport protocol if none of the listener filters did it. if (socket_->detectedTransportProtocol().empty()) { @@ -250,7 +249,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket, auto active_socket = std::make_unique(*this, std::move(socket), hand_off_restored_destination_connections); - // Create and run the filters + // Create and run the filters. config_->filterChainFactory().createListenerFilterChain(*active_socket); active_socket->continueFilterChain(true); diff --git a/source/server/active_tcp_listener.h b/source/server/active_tcp_listener.h index 01cf11b72408..c698faaa0605 100644 --- a/source/server/active_tcp_listener.h +++ b/source/server/active_tcp_listener.h @@ -30,10 +30,10 @@ using RebalancedSocketSharedPtr = std::shared_ptr; /** * Wrapper for an active tcp listener owned by this handler. */ -class ActiveTcpListener : public Network::TcpListenerCallbacks, - public ActiveListenerImplBase, - public Network::BalancedConnectionHandler, - Logger::Loggable { +class ActiveTcpListener final : public Network::TcpListenerCallbacks, + public ActiveListenerImplBase, + public Network::BalancedConnectionHandler, + Logger::Loggable { public: ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config); ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener, diff --git a/test/common/quic/BUILD b/test/common/quic/BUILD index 38d7fa11a252..b5ed243f8bbb 100644 --- a/test/common/quic/BUILD +++ b/test/common/quic/BUILD @@ -79,6 +79,7 @@ envoy_cc_test( "//source/common/quic:envoy_quic_connection_helper_lib", "//source/common/quic:envoy_quic_server_connection_lib", "//source/common/quic:envoy_quic_server_session_lib", + "//source/server:active_listener_base", "//test/mocks/http:http_mocks", "//test/mocks/http:stream_decoder_mock", "//test/mocks/network:network_mocks", diff --git a/test/integration/BUILD b/test/integration/BUILD index 8ab9096c0c19..7d0ed082a021 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1547,13 +1547,16 @@ envoy_cc_test( "//source/common/network:connection_lib", "//source/common/network:utility_lib", "//source/extensions/filters/http/health_check:config", + "//source/extensions/filters/network/tcp_proxy:config", "//test/common/grpc:grpc_client_integration_lib", + "//test/integration/filters:address_restore_listener_filter_lib", "//test/test_common:resources_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/config/route/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto", "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 5d8c073a5bb7..4722b15a3730 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -467,6 +467,25 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "address_restore_listener_filter_lib", + srcs = [ + "address_restore_listener_filter.cc", + ], + deps = [ + ":common_lib", + "//include/envoy/network:filter_interface", + "//include/envoy/network:listen_socket_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/common/common:assert_lib", + "//source/common/common:minimal_logger_lib", + "//source/common/network:address_lib", + "//source/common/network:upstream_socket_options_filter_state_lib", + "//source/common/network:utility_lib", + ], +) + envoy_cc_test_library( name = "set_route_filter_lib", srcs = [ diff --git a/test/integration/filters/address_restore_listener_filter.cc b/test/integration/filters/address_restore_listener_filter.cc new file mode 100644 index 000000000000..c769994b9644 --- /dev/null +++ b/test/integration/filters/address_restore_listener_filter.cc @@ -0,0 +1,55 @@ + + +#include "envoy/network/filter.h" +#include "envoy/network/listen_socket.h" +#include "envoy/server/filter_config.h" + +#include "common/network/address_impl.h" +#include "common/network/utility.h" + +namespace Envoy { + +// The FakeOriginalDstListenerFilter restore desired local address without the dependency of OS. +class FakeOriginalDstListenerFilter : public Network::ListenerFilter { +public: + // Network::ListenerFilter + Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override { + FANCY_LOG(debug, "in FakeOriginalDstListenerFilter::onAccept"); + Network::ConnectionSocket& socket = cb.socket(); + socket.addressProvider().restoreLocalAddress( + std::make_shared("127.0.0.2", 80)); + FANCY_LOG(debug, "current local socket address is {} restored = {}", + socket.addressProvider().localAddress()->asString(), + socket.addressProvider().localAddressRestored()); + return Network::FilterStatus::Continue; + } +}; + +class FakeOriginalDstListenerFilterConfigFactory + : public Server::Configuration::NamedListenerFilterConfigFactory { +public: + // NamedListenerFilterConfigFactory + Network::ListenerFilterFactoryCb createListenerFilterFactoryFromProto( + const Protobuf::Message&, + const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher, + Server::Configuration::ListenerFactoryContext&) override { + return [listener_filter_matcher](Network::ListenerFilterManager& filter_manager) -> void { + filter_manager.addAcceptFilter(listener_filter_matcher, + std::make_unique()); + }; + } + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Struct()}; + } + + std::string name() const override { + // This fake original_dest should be used only in integration test! + return "envoy.filters.listener.original_dst"; + } +}; + +static Registry::RegisterFactory + register_; +} // namespace Envoy diff --git a/test/integration/listener_lds_integration_test.cc b/test/integration/listener_lds_integration_test.cc index e256e8a09275..2179d9b3c3e5 100644 --- a/test/integration/listener_lds_integration_test.cc +++ b/test/integration/listener_lds_integration_test.cc @@ -4,6 +4,8 @@ #include "envoy/config/route/v3/route.pb.h" #include "envoy/config/route/v3/scoped_route.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h" +#include "envoy/network/connection.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "common/config/api_version.h" @@ -409,5 +411,104 @@ TEST_P(ListenerIntegrationTest, MultipleLdsUpdatesSharingListenSocketFactory) { } } +class RebalancerTest : public testing::TestWithParam, + public BaseIntegrationTest { +public: + RebalancerTest() + : BaseIntegrationTest(GetParam(), ConfigHelper::baseConfig() + R"EOF( + filter_chains: + - filters: + - name: envoy.filters.network.tcp_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy + stat_prefix: tcp_stats + cluster: cluster_0 +)EOF") {} + + void initialize() override { + config_helper_.renameListener("tcp"); + config_helper_.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + auto& src_listener_config = *bootstrap.mutable_static_resources()->mutable_listeners(0); + src_listener_config.mutable_use_original_dst()->set_value(true); + // Note that the below original_dst is replaced by FakeOriginalDstListenerFilter at the + // link time. + src_listener_config.add_listener_filters()->set_name( + "envoy.filters.listener.original_dst"); + auto& virtual_listener_config = *bootstrap.mutable_static_resources()->add_listeners(); + virtual_listener_config = src_listener_config; + virtual_listener_config.mutable_use_original_dst()->set_value(false); + virtual_listener_config.clear_listener_filters(); + virtual_listener_config.mutable_bind_to_port()->set_value(false); + virtual_listener_config.set_name("balanced_target_listener"); + virtual_listener_config.mutable_connection_balance_config()->mutable_exact_balance(); + + // 127.0.0.2 is defined in FakeOriginalDstListenerFilter. This virtual listener does not + // listen on a passive socket so it's safe to use any ip address. + *virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() = + "127.0.0.2"; + virtual_listener_config.mutable_address()->mutable_socket_address()->set_port_value(80); + }); + BaseIntegrationTest::initialize(); + } + + std::unique_ptr createConnectionAndWrite(const std::string& request, + std::string& response) { + Buffer::OwnedImpl buffer(request); + return std::make_unique( + lookupPort("tcp"), buffer, + [&response](Network::ClientConnection&, const Buffer::Instance& data) -> void { + response.append(data.toString()); + }, + version_, *dispatcher_); + } +}; + +struct PerConnection { + std::string response_; + std::unique_ptr client_conn_; + FakeRawConnectionPtr upstream_conn_; +}; + +// Verify the connections are distributed evenly on the 2 worker threads of the redirected +// listener. +TEST_P(RebalancerTest, RedirectConnectionIsBalancedOnDestinationListener) { + concurrency_ = 2; + int repeats = 10; + initialize(); + + // The balancer is balanced as per active connection instead of total connection. + // The below vector maintains all the connections alive. + std::vector connections; + for (uint32_t i = 0; i < repeats * concurrency_; ++i) { + connections.emplace_back(); + connections.back().client_conn_ = + createConnectionAndWrite("dummy", connections.back().response_); + connections.back().client_conn_->waitForConnection(); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_)); + } + for (auto& conn : connections) { + conn.client_conn_->close(); + while (!conn.client_conn_->closed()) { + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + } + + ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(), + "listener.127.0.0.2_80.worker_0.downstream_cx_total") + + ->value(), + repeats); + ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(), + "listener.127.0.0.2_80.worker_1.downstream_cx_total") + + ->value(), + repeats); +} + +INSTANTIATE_TEST_SUITE_P(IpVersions, RebalancerTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + } // namespace } // namespace Envoy diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 7a2ff7458a5c..5575eeb722bb 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -171,7 +171,17 @@ MockListener::MockListener() = default; MockListener::~MockListener() { onDestroy(); } -MockConnectionHandler::MockConnectionHandler() = default; +MockConnectionHandler::MockConnectionHandler() { + ON_CALL(*this, incNumConnections()).WillByDefault(Invoke([this]() { + ++num_handler_connections_; + })); + ON_CALL(*this, decNumConnections()).WillByDefault(Invoke([this]() { + --num_handler_connections_; + })); + ON_CALL(*this, numConnections()).WillByDefault(Invoke([this]() { + return num_handler_connections_; + })); +} MockConnectionHandler::~MockConnectionHandler() = default; MockIp::MockIp() = default; diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 63c4c5e7c2ff..c102194d4016 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -421,7 +421,7 @@ class MockListener : public Listener { MOCK_METHOD(void, setRejectFraction, (UnitFloat)); }; -class MockConnectionHandler : public ConnectionHandler { +class MockConnectionHandler : public virtual ConnectionHandler { public: MockConnectionHandler(); ~MockConnectionHandler() override; @@ -441,6 +441,8 @@ class MockConnectionHandler : public ConnectionHandler { MOCK_METHOD(void, enableListeners, ()); MOCK_METHOD(void, setListenerRejectFraction, (UnitFloat), (override)); MOCK_METHOD(const std::string&, statPrefix, (), (const)); + + uint64_t num_handler_connections_{}; }; class MockIp : public Address::Ip { diff --git a/test/server/BUILD b/test/server/BUILD index 7a591987be6b..498bc7ecef0b 100644 --- a/test/server/BUILD +++ b/test/server/BUILD @@ -92,6 +92,25 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "active_tcp_listener_test", + srcs = ["active_tcp_listener_test.cc"], + deps = [ + "//source/common/common:utility_lib", + "//source/common/config:utility_lib", + "//source/common/network:address_lib", + "//source/common/network:connection_balancer_lib", + "//source/common/stats:stats_lib", + "//source/server:active_raw_udp_listener_config", + "//source/server:connection_handler_lib", + "//test/mocks/access_log:access_log_mocks", + "//test/mocks/api:api_mocks", + "//test/mocks/network:network_mocks", + "//test/test_common:network_utility_lib", + "//test/test_common:threadsafe_singleton_injector_lib", + ], +) + envoy_cc_test( name = "drain_manager_impl_test", srcs = ["drain_manager_impl_test.cc"], diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc new file mode 100644 index 000000000000..cc1e2710f9f2 --- /dev/null +++ b/test/server/active_tcp_listener_test.cc @@ -0,0 +1,181 @@ +#include + +#include "envoy/network/filter.h" +#include "envoy/network/listener.h" +#include "envoy/stats/scope.h" + +#include "common/network/address_impl.h" +#include "common/network/connection_balancer_impl.h" +#include "common/network/raw_buffer_socket.h" +#include "common/network/utility.h" + +#include "server/active_tcp_listener.h" + +#include "test/mocks/api/mocks.h" +#include "test/mocks/common.h" +#include "test/mocks/network/mocks.h" +#include "test/test_common/network_utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Server { +namespace { + +class MockTcpConnectionHandler : public Network::TcpConnectionHandler, + public Network::MockConnectionHandler { +public: + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); + MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByTag, + (uint64_t listener_tag)); + MOCK_METHOD(Network::BalancedConnectionHandlerOptRef, getBalancedHandlerByAddress, + (const Network::Address::Instance& address)); +}; +class ActiveTcpListenerTest : public testing::Test, protected Logger::Loggable { +public: + ActiveTcpListenerTest() { + EXPECT_CALL(conn_handler_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); + EXPECT_CALL(conn_handler_, numConnections()).Times(testing::AnyNumber()); + EXPECT_CALL(conn_handler_, statPrefix()).WillRepeatedly(ReturnRef(listener_stat_prefix_)); + listener_filter_matcher_ = std::make_shared>(); + } + + std::string listener_stat_prefix_{"listener_stat_prefix"}; + std::shared_ptr socket_factory_{ + std::make_shared()}; + NiceMock dispatcher_{"test"}; + BasicResourceLimitImpl resource_limit_; + NiceMock conn_handler_; + Network::MockListener* generic_listener_; + Network::MockListenerConfig listener_config_; + NiceMock manager_; + NiceMock filter_chain_factory_; + std::shared_ptr filter_chain_; + std::shared_ptr> listener_filter_matcher_; +}; + +// Verify that the server connection with recovered address is rebalanced at redirected listener. +TEST_F(ActiveTcpListenerTest, RedirectedRebalancer) { + NiceMock listener_config1; + NiceMock balancer1; + EXPECT_CALL(balancer1, registerHandler(_)); + EXPECT_CALL(balancer1, unregisterHandler(_)); + + Network::Address::InstanceConstSharedPtr normal_address( + new Network::Address::Ipv4Instance("127.0.0.1", 10001)); + EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(normal_address)); + EXPECT_CALL(listener_config1, connectionBalancer()).WillRepeatedly(ReturnRef(balancer1)); + EXPECT_CALL(listener_config1, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(listener_config1, listenerFiltersTimeout()); + EXPECT_CALL(listener_config1, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config1, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config1, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + EXPECT_CALL(listener_config1, handOffRestoredDestinationConnections()) + .WillRepeatedly(Return(true)); + + auto mock_listener_will_be_moved1 = std::make_unique(); + auto& listener1 = *mock_listener_will_be_moved1; + auto active_listener1 = std::make_unique( + conn_handler_, std::move(mock_listener_will_be_moved1), listener_config1); + + NiceMock listener_config2; + Network::MockConnectionBalancer balancer2; + EXPECT_CALL(balancer2, registerHandler(_)); + EXPECT_CALL(balancer2, unregisterHandler(_)); + + Network::Address::InstanceConstSharedPtr alt_address( + new Network::Address::Ipv4Instance("127.0.0.2", 20002)); + EXPECT_CALL(*socket_factory_, localAddress()).WillRepeatedly(ReturnRef(alt_address)); + EXPECT_CALL(listener_config2, listenerFiltersTimeout()); + EXPECT_CALL(listener_config2, connectionBalancer()).WillRepeatedly(ReturnRef(balancer2)); + EXPECT_CALL(listener_config2, listenerScope).Times(testing::AnyNumber()); + EXPECT_CALL(listener_config2, handOffRestoredDestinationConnections()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(listener_config2, continueOnListenerFiltersTimeout()); + EXPECT_CALL(listener_config2, filterChainManager()).WillRepeatedly(ReturnRef(manager_)); + EXPECT_CALL(listener_config2, openConnections()).WillRepeatedly(ReturnRef(resource_limit_)); + auto mock_listener_will_be_moved2 = std::make_unique(); + auto& listener2 = *mock_listener_will_be_moved2; + auto active_listener2 = std::make_shared( + conn_handler_, std::move(mock_listener_will_be_moved2), listener_config2); + + auto* test_filter = new NiceMock(); + EXPECT_CALL(*test_filter, destroy_()); + Network::MockConnectionSocket* accepted_socket = new NiceMock(); + bool redirected = false; + + // 1. Listener1 re-balance. Set the balance target to the the active listener itself. + EXPECT_CALL(balancer1, pickTargetHandler(_)) + .WillOnce(testing::DoAll( + testing::WithArg<0>(Invoke([](auto& target) { target.incNumConnections(); })), + ReturnRef(*active_listener1))); + + EXPECT_CALL(listener_config1, filterChainFactory()) + .WillRepeatedly(ReturnRef(filter_chain_factory_)); + + // Listener1 has a listener filter in the listener filter chain. + EXPECT_CALL(filter_chain_factory_, createListenerFilterChain(_)) + .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { + // Insert the Mock filter. + if (!redirected) { + manager.addAcceptFilter(nullptr, Network::ListenerFilterPtr{test_filter}); + redirected = true; + } + return true; + })); + EXPECT_CALL(*test_filter, onAccept(_)) + .WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus { + cb.socket().addressProvider().restoreLocalAddress(alt_address); + return Network::FilterStatus::Continue; + })); + // Verify that listener1 hands off the connection by not creating network filter chain. + EXPECT_CALL(manager_, findFilterChain(_)).Times(0); + + // 2. Redirect to Listener2. + EXPECT_CALL(conn_handler_, getBalancedHandlerByAddress(_)) + .WillOnce(Return(Network::BalancedConnectionHandlerOptRef(*active_listener2))); + + // 3. Listener2 re-balance. Set the balance target to the the active listener itself. + EXPECT_CALL(balancer2, pickTargetHandler(_)) + .WillOnce(testing::DoAll( + testing::WithArg<0>(Invoke([](auto& target) { target.incNumConnections(); })), + ReturnRef(*active_listener2))); + + auto filter_factory_callback = std::make_shared>(); + auto transport_socket_factory = Network::Test::createRawBufferSocketFactory(); + filter_chain_ = std::make_shared>(); + + EXPECT_CALL(conn_handler_, incNumConnections()); + EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(filter_chain_.get())); + EXPECT_CALL(*filter_chain_, transportSocketFactory) + .WillOnce(testing::ReturnRef(*transport_socket_factory)); + EXPECT_CALL(*filter_chain_, networkFilterFactories).WillOnce(ReturnRef(*filter_factory_callback)); + EXPECT_CALL(listener_config2, filterChainFactory()) + .WillRepeatedly(ReturnRef(filter_chain_factory_)); + + auto* connection = new NiceMock(); + EXPECT_CALL(dispatcher_, createServerConnection_()).WillOnce(Return(connection)); + EXPECT_CALL(filter_chain_factory_, createNetworkFilterChain(_, _)).WillOnce(Return(true)); + active_listener1->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + + // Verify per-listener connection stats. + EXPECT_EQ(1UL, conn_handler_.numConnections()); + + EXPECT_CALL(conn_handler_, decNumConnections()); + connection->close(Network::ConnectionCloseType::NoFlush); + + EXPECT_CALL(listener1, onDestroy()); + active_listener1.reset(); + EXPECT_CALL(listener2, onDestroy()); + active_listener2.reset(); +} +} // namespace +} // namespace Server +} // namespace Envoy