Skip to content

Commit

Permalink
Listener: respect the connection balancer of the redirected listener (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#15842)

If listener1 redirects the connection to listener2, the balancer field in listener2 decides whether to rebalance.
Previously we rely on the rebalancing at listener1, however, the rebalance is weak because listener1 is likely to
not own any connection and the rebalance is no-op.

Risk Level: MID. Rebalance may introduce latency. User needs to clear rebalancer field of listener2 to recover the original behavior.

Fix envoyproxy#15146 envoyproxy#16113

Signed-off-by: Yuchen Dai <silentdai@gmail.com>
Signed-off-by: Gokul Nair <gnair@twitter.com>
  • Loading branch information
lambdai authored and Gokul Nair committed May 6, 2021
1 parent ef28b68 commit 0e4b716
Show file tree
Hide file tree
Showing 16 changed files with 431 additions and 14 deletions.
6 changes: 6 additions & 0 deletions api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ message Listener {
// The listener's connection balancer configuration, currently only applicable to TCP listeners.
// If no configuration is specified, Envoy will not attempt to balance active connections between
// worker threads.
//
// In the scenario that the listener X redirects all the connections to the listeners Y1 and Y2
// by setting :ref:`use_original_dst <envoy_api_field_config.listener.v3.Listener.use_original_dst>` in X
// and :ref:`bind_to_port <envoy_api_field_config.listener.v3.Listener.bind_to_port>` to false in Y1 and Y2,
// it is recommended to disable the balance config in listener X to avoid the cost of balancing, and
// enable the balance config in Y1 and Y2 to balance the connections among the workers.
ConnectionBalanceConfig connection_balance_config = 20;

// When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and
Expand Down
6 changes: 6 additions & 0 deletions api/envoy/config/listener/v4alpha/listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Minor Behavior Changes
(require upstream 1xx or 204 responses to not have Transfer-Encoding or non-zero Content-Length headers) and
``envoy.reloadable_features.send_strict_1xx_and_204_response_headers``
(do not send 1xx or 204 responses with these headers). Both are true by default.
* listener: respect the :ref:`connection balance config <envoy_v3_api_field_config.listener.v3.Listener.connection_balance_config>`
defined within the listener where the sockets are redirected to. Clear that field to restore the previous behavior.


Bug Fixes
---------
Expand Down
6 changes: 6 additions & 0 deletions generated_api_shadow/envoy/config/listener/v3/listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ ActiveTcpListener::~ActiveTcpListener() {
// for now. If it becomes a problem (developers hitting this assert when using debug builds) we
// can revisit. This case, if it happens, should be benign on production builds. This case is
// covered in ConnectionHandlerTest::RemoveListenerDuringRebalance.
ASSERT(num_listener_connections_ == 0);
ASSERT(num_listener_connections_ == 0, fmt::format("destroyed listener {} has {} connections",
config_->name(), numConnections()));
}

void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
Expand Down Expand Up @@ -188,14 +189,12 @@ void ActiveTcpSocket::newConnection() {
if (new_listener.has_value()) {
// Hands off connections redirected by iptables to the listener associated with the
// original destination address. Pass 'hand_off_restored_destination_connections' as false to
// prevent further redirection as well as 'rebalanced' as true since the connection has
// already been balanced if applicable inside onAcceptWorker() when the connection was
// initially accepted. Note also that we must account for the number of connections properly
// across both listeners.
// prevent further redirection.
// Leave the new listener to decide whether to execute re-balance.
// Note also that we must account for the number of connections properly across both listeners.
// TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better.
listener_.decNumConnections();
new_listener.value().get().incNumConnections();
new_listener.value().get().onAcceptWorker(std::move(socket_), false, true);
new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
} else {
// Set default transport protocol if none of the listener filters did it.
if (socket_->detectedTransportProtocol().empty()) {
Expand Down Expand Up @@ -250,7 +249,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);

// Create and run the filters
// Create and run the filters.
config_->filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);

Expand Down
8 changes: 4 additions & 4 deletions source/server/active_tcp_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ using RebalancedSocketSharedPtr = std::shared_ptr<RebalancedSocket>;
/**
* Wrapper for an active tcp listener owned by this handler.
*/
class ActiveTcpListener : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
class ActiveTcpListener final : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config);
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
Expand Down
1 change: 1 addition & 0 deletions test/common/quic/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ envoy_cc_test(
"//source/common/quic:envoy_quic_connection_helper_lib",
"//source/common/quic:envoy_quic_server_connection_lib",
"//source/common/quic:envoy_quic_server_session_lib",
"//source/server:active_listener_base",
"//test/mocks/http:http_mocks",
"//test/mocks/http:stream_decoder_mock",
"//test/mocks/network:network_mocks",
Expand Down
3 changes: 3 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1547,13 +1547,16 @@ envoy_cc_test(
"//source/common/network:connection_lib",
"//source/common/network:utility_lib",
"//source/extensions/filters/http/health_check:config",
"//source/extensions/filters/network/tcp_proxy:config",
"//test/common/grpc:grpc_client_integration_lib",
"//test/integration/filters:address_restore_listener_filter_lib",
"//test/test_common:resources_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/config/route/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand Down
19 changes: 19 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,25 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "address_restore_listener_filter_lib",
srcs = [
"address_restore_listener_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/network:filter_interface",
"//include/envoy/network:listen_socket_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/network:address_lib",
"//source/common/network:upstream_socket_options_filter_state_lib",
"//source/common/network:utility_lib",
],
)

envoy_cc_test_library(
name = "set_route_filter_lib",
srcs = [
Expand Down
55 changes: 55 additions & 0 deletions test/integration/filters/address_restore_listener_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@


#include "envoy/network/filter.h"
#include "envoy/network/listen_socket.h"
#include "envoy/server/filter_config.h"

#include "common/network/address_impl.h"
#include "common/network/utility.h"

namespace Envoy {

// The FakeOriginalDstListenerFilter restore desired local address without the dependency of OS.
class FakeOriginalDstListenerFilter : public Network::ListenerFilter {
public:
// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override {
FANCY_LOG(debug, "in FakeOriginalDstListenerFilter::onAccept");
Network::ConnectionSocket& socket = cb.socket();
socket.addressProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.2", 80));
FANCY_LOG(debug, "current local socket address is {} restored = {}",
socket.addressProvider().localAddress()->asString(),
socket.addressProvider().localAddressRestored());
return Network::FilterStatus::Continue;
}
};

class FakeOriginalDstListenerFilterConfigFactory
: public Server::Configuration::NamedListenerFilterConfigFactory {
public:
// NamedListenerFilterConfigFactory
Network::ListenerFilterFactoryCb createListenerFilterFactoryFromProto(
const Protobuf::Message&,
const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher,
Server::Configuration::ListenerFactoryContext&) override {
return [listener_filter_matcher](Network::ListenerFilterManager& filter_manager) -> void {
filter_manager.addAcceptFilter(listener_filter_matcher,
std::make_unique<FakeOriginalDstListenerFilter>());
};
}

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Struct()};
}

std::string name() const override {
// This fake original_dest should be used only in integration test!
return "envoy.filters.listener.original_dst";
}
};

static Registry::RegisterFactory<FakeOriginalDstListenerFilterConfigFactory,
Server::Configuration::NamedListenerFilterConfigFactory>
register_;
} // namespace Envoy
101 changes: 101 additions & 0 deletions test/integration/listener_lds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "envoy/config/route/v3/route.pb.h"
#include "envoy/config/route/v3/scoped_route.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
#include "envoy/network/connection.h"
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "common/config/api_version.h"
Expand Down Expand Up @@ -409,5 +411,104 @@ TEST_P(ListenerIntegrationTest, MultipleLdsUpdatesSharingListenSocketFactory) {
}
}

class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion>,
public BaseIntegrationTest {
public:
RebalancerTest()
: BaseIntegrationTest(GetParam(), ConfigHelper::baseConfig() + R"EOF(
filter_chains:
- filters:
- name: envoy.filters.network.tcp_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp_stats
cluster: cluster_0
)EOF") {}

void initialize() override {
config_helper_.renameListener("tcp");
config_helper_.addConfigModifier(
[&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
auto& src_listener_config = *bootstrap.mutable_static_resources()->mutable_listeners(0);
src_listener_config.mutable_use_original_dst()->set_value(true);
// Note that the below original_dst is replaced by FakeOriginalDstListenerFilter at the
// link time.
src_listener_config.add_listener_filters()->set_name(
"envoy.filters.listener.original_dst");
auto& virtual_listener_config = *bootstrap.mutable_static_resources()->add_listeners();
virtual_listener_config = src_listener_config;
virtual_listener_config.mutable_use_original_dst()->set_value(false);
virtual_listener_config.clear_listener_filters();
virtual_listener_config.mutable_bind_to_port()->set_value(false);
virtual_listener_config.set_name("balanced_target_listener");
virtual_listener_config.mutable_connection_balance_config()->mutable_exact_balance();

// 127.0.0.2 is defined in FakeOriginalDstListenerFilter. This virtual listener does not
// listen on a passive socket so it's safe to use any ip address.
*virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() =
"127.0.0.2";
virtual_listener_config.mutable_address()->mutable_socket_address()->set_port_value(80);
});
BaseIntegrationTest::initialize();
}

std::unique_ptr<RawConnectionDriver> createConnectionAndWrite(const std::string& request,
std::string& response) {
Buffer::OwnedImpl buffer(request);
return std::make_unique<RawConnectionDriver>(
lookupPort("tcp"), buffer,
[&response](Network::ClientConnection&, const Buffer::Instance& data) -> void {
response.append(data.toString());
},
version_, *dispatcher_);
}
};

struct PerConnection {
std::string response_;
std::unique_ptr<RawConnectionDriver> client_conn_;
FakeRawConnectionPtr upstream_conn_;
};

// Verify the connections are distributed evenly on the 2 worker threads of the redirected
// listener.
TEST_P(RebalancerTest, RedirectConnectionIsBalancedOnDestinationListener) {
concurrency_ = 2;
int repeats = 10;
initialize();

// The balancer is balanced as per active connection instead of total connection.
// The below vector maintains all the connections alive.
std::vector<PerConnection> connections;
for (uint32_t i = 0; i < repeats * concurrency_; ++i) {
connections.emplace_back();
connections.back().client_conn_ =
createConnectionAndWrite("dummy", connections.back().response_);
connections.back().client_conn_->waitForConnection();
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_));
}
for (auto& conn : connections) {
conn.client_conn_->close();
while (!conn.client_conn_->closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
}

ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
"listener.127.0.0.2_80.worker_0.downstream_cx_total")

->value(),
repeats);
ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
"listener.127.0.0.2_80.worker_1.downstream_cx_total")

->value(),
repeats);
}

INSTANTIATE_TEST_SUITE_P(IpVersions, RebalancerTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

} // namespace
} // namespace Envoy
12 changes: 11 additions & 1 deletion test/mocks/network/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,17 @@ MockListener::MockListener() = default;

MockListener::~MockListener() { onDestroy(); }

MockConnectionHandler::MockConnectionHandler() = default;
MockConnectionHandler::MockConnectionHandler() {
ON_CALL(*this, incNumConnections()).WillByDefault(Invoke([this]() {
++num_handler_connections_;
}));
ON_CALL(*this, decNumConnections()).WillByDefault(Invoke([this]() {
--num_handler_connections_;
}));
ON_CALL(*this, numConnections()).WillByDefault(Invoke([this]() {
return num_handler_connections_;
}));
}
MockConnectionHandler::~MockConnectionHandler() = default;

MockIp::MockIp() = default;
Expand Down
4 changes: 3 additions & 1 deletion test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class MockListener : public Listener {
MOCK_METHOD(void, setRejectFraction, (UnitFloat));
};

class MockConnectionHandler : public ConnectionHandler {
class MockConnectionHandler : public virtual ConnectionHandler {
public:
MockConnectionHandler();
~MockConnectionHandler() override;
Expand All @@ -441,6 +441,8 @@ class MockConnectionHandler : public ConnectionHandler {
MOCK_METHOD(void, enableListeners, ());
MOCK_METHOD(void, setListenerRejectFraction, (UnitFloat), (override));
MOCK_METHOD(const std::string&, statPrefix, (), (const));

uint64_t num_handler_connections_{};
};

class MockIp : public Address::Ip {
Expand Down
19 changes: 19 additions & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "active_tcp_listener_test",
srcs = ["active_tcp_listener_test.cc"],
deps = [
"//source/common/common:utility_lib",
"//source/common/config:utility_lib",
"//source/common/network:address_lib",
"//source/common/network:connection_balancer_lib",
"//source/common/stats:stats_lib",
"//source/server:active_raw_udp_listener_config",
"//source/server:connection_handler_lib",
"//test/mocks/access_log:access_log_mocks",
"//test/mocks/api:api_mocks",
"//test/mocks/network:network_mocks",
"//test/test_common:network_utility_lib",
"//test/test_common:threadsafe_singleton_injector_lib",
],
)

envoy_cc_test(
name = "drain_manager_impl_test",
srcs = ["drain_manager_impl_test.cc"],
Expand Down
Loading

0 comments on commit 0e4b716

Please sign in to comment.