Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: Udp socket implementation. #5108

Merged
merged 9 commits into from
Dec 1, 2018
28 changes: 13 additions & 15 deletions source/common/network/listen_socket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,28 @@ void ListenSocketImpl::setListenSocketOptions(const Network::Socket::OptionsShar
}
}

TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port)
: ListenSocketImpl(address->socket(Address::SocketType::Stream), address) {
RELEASE_ASSERT(fd_ != -1, "");

// TODO(htuch): This might benefit from moving to SocketOptionImpl.
int on = 1;
int rc = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
RELEASE_ASSERT(rc != -1, "");

void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port) {
setListenSocketOptions(options);

if (bind_to_port) {
doBind();
}
}

TcpListenSocket::TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(fd, address) {
setListenSocketOptions(options);
template <>
void NetworkListenSocket<
NetworkSocketTrait<Address::SocketType::Stream>>::setPrebindSocketOptions() {
// TODO(htuch): This might benefit from moving to SocketOptionImpl.
int on = 1;
int rc = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
RELEASE_ASSERT(rc != -1, "");
}

template <>
void NetworkListenSocket<
NetworkSocketTrait<Address::SocketType::Datagram>>::setPrebindSocketOptions() {}

UdsListenSocket::UdsListenSocket(const Address::InstanceConstSharedPtr& address)
: ListenSocketImpl(address->socket(Address::SocketType::Stream), address) {
RELEASE_ASSERT(fd_ != -1, "");
Expand Down
39 changes: 34 additions & 5 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,52 @@ class ListenSocketImpl : public SocketImpl {
ListenSocketImpl(int fd, const Address::InstanceConstSharedPtr& local_address)
: SocketImpl(fd, local_address) {}

void setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port);
void doBind();
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options);
};

/**
* Wraps a unix socket.
*/
class TcpListenSocket : public ListenSocketImpl {
template <Address::SocketType T> struct NetworkSocketTrait {};

template <> struct NetworkSocketTrait<Address::SocketType::Stream> {
static constexpr Address::SocketType type = Address::SocketType::Stream;
};

template <> struct NetworkSocketTrait<Address::SocketType::Datagram> {
static constexpr Address::SocketType type = Address::SocketType::Datagram;
};

template <typename T> class NetworkListenSocket : public ListenSocketImpl {
public:
TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port);
TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options);
NetworkListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port)
: ListenSocketImpl(address->socket(T::type), address) {
RELEASE_ASSERT(fd_ != -1, "");

setPrebindSocketOptions();

setupSocket(options, bind_to_port);
}

NetworkListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to call setProtocolSpecificSocketOptions() in this variant also? Can we somehow unify the setup logic? It's a little confusing as with the constructor variants, protocol specific options, listener options, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 Thanks for reviewing.
Looking at the original code(master), the two constructors differ in -

  • the constructor that takes fd does not have an option to bind
  • only the constructor that takes address sets up SO_REUSEADDR.

So wanted to keep the logic same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I suspect that's a bug and not a feature. Can you spend a little bit of time trying to figure out where the constructors are used? I would prefer we take this opportunity to unify the logic if possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is that when an fd is present, then we don't create the listening socket and assume that the socket is already bound (to that listen fd). So in that case, we should not be applying the option SO_REUSEADDR. All other options could still be applied . Although its confusing to have flag PRE_BIND for all the options.

So maybe we should rename setProtocolSpecificSocketOptions to setPrebindSocketOptions ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, +1 please rename.

const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(fd, address) {
setListenSocketOptions(options);
}

protected:
void setPrebindSocketOptions();
};

using TcpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Stream>>;
typedef std::unique_ptr<TcpListenSocket> TcpListenSocketPtr;

using UdpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Datagram>>;
typedef std::unique_ptr<UdpListenSocket> UdpListenSocketPtr;

class UdsListenSocket : public ListenSocketImpl {
public:
UdsListenSocket(const Address::InstanceConstSharedPtr& address);
Expand Down
55 changes: 44 additions & 11 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ namespace Envoy {
namespace Network {

const std::string Utility::TCP_SCHEME = "tcp://";
const std::string Utility::UDP_SCHEME = "udp://";
const std::string Utility::UNIX_SCHEME = "unix://";

Address::InstanceConstSharedPtr Utility::resolveUrl(const std::string& url) {
if (urlIsTcpScheme(url)) {
return parseInternetAddressAndPort(url.substr(TCP_SCHEME.size()));
} else if (urlIsUdpScheme(url)) {
return parseInternetAddressAndPort(url.substr(UDP_SCHEME.size()));
} else if (urlIsUnixScheme(url)) {
return Address::InstanceConstSharedPtr{
new Address::PipeInstance(url.substr(UNIX_SCHEME.size()))};
Expand All @@ -49,30 +52,42 @@ Address::InstanceConstSharedPtr Utility::resolveUrl(const std::string& url) {
}
}

bool Utility::urlIsTcpScheme(const std::string& url) { return url.find(TCP_SCHEME) == 0; }
bool Utility::urlIsTcpScheme(const std::string& url) {
return StringUtil::startsWith(url.c_str(), TCP_SCHEME, true);
}

bool Utility::urlIsUnixScheme(const std::string& url) { return url.find(UNIX_SCHEME) == 0; }
bool Utility::urlIsUdpScheme(const std::string& url) {
return StringUtil::startsWith(url.c_str(), UDP_SCHEME, true);
}

std::string Utility::hostFromTcpUrl(const std::string& url) {
if (!urlIsTcpScheme(url)) {
throw EnvoyException(fmt::format("expected TCP scheme, got: {}", url));
bool Utility::urlIsUnixScheme(const std::string& url) {
return StringUtil::startsWith(url.c_str(), UNIX_SCHEME, true);
}

namespace {

std::string hostFromUrl(const std::string& url, const std::string& scheme,
const std::string& scheme_name) {
if (url.find(scheme) != 0) {
throw EnvoyException(fmt::format("expected {} scheme, got: {}", scheme_name, url));
}

size_t colon_index = url.find(':', TCP_SCHEME.size());
size_t colon_index = url.find(':', scheme.size());

if (colon_index == std::string::npos) {
throw EnvoyException(fmt::format("malformed url: {}", url));
}

return url.substr(TCP_SCHEME.size(), colon_index - TCP_SCHEME.size());
return url.substr(scheme.size(), colon_index - scheme.size());
}

uint32_t Utility::portFromTcpUrl(const std::string& url) {
if (!urlIsTcpScheme(url)) {
throw EnvoyException(fmt::format("expected TCP scheme, got: {}", url));
uint32_t portFromUrl(const std::string& url, const std::string& scheme,
const std::string& scheme_name) {
if (url.find(scheme) != 0) {
throw EnvoyException(fmt::format("expected {} scheme, got: {}", scheme_name, url));
}

size_t colon_index = url.find(':', TCP_SCHEME.size());
size_t colon_index = url.find(':', scheme.size());

if (colon_index == std::string::npos) {
throw EnvoyException(fmt::format("malformed url: {}", url));
Expand All @@ -87,6 +102,24 @@ uint32_t Utility::portFromTcpUrl(const std::string& url) {
}
}

} // namespace

std::string Utility::hostFromTcpUrl(const std::string& url) {
return hostFromUrl(url, TCP_SCHEME, "TCP");
}

uint32_t Utility::portFromTcpUrl(const std::string& url) {
return portFromUrl(url, TCP_SCHEME, "TCP");
}

std::string Utility::hostFromUdpUrl(const std::string& url) {
return hostFromUrl(url, UDP_SCHEME, "UDP");
}

uint32_t Utility::portFromUdpUrl(const std::string& url) {
return portFromUrl(url, UDP_SCHEME, "UDP");
}

Address::InstanceConstSharedPtr Utility::parseInternetAddress(const std::string& ip_address,
uint16_t port, bool v6only) {
sockaddr_in sa4;
Expand Down
22 changes: 22 additions & 0 deletions source/common/network/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef std::list<PortRange> PortRangeList;
class Utility {
public:
static const std::string TCP_SCHEME;
static const std::string UDP_SCHEME;
static const std::string UNIX_SCHEME;

/**
Expand All @@ -50,6 +51,13 @@ class Utility {
*/
static bool urlIsTcpScheme(const std::string& url);

/**
* Match a URL to the UDP scheme
* @param url supplies the URL to match.
* @return bool true if the URL matches the UDP scheme, false otherwise.
*/
static bool urlIsUdpScheme(const std::string& url);

/**
* Match a URL to the Unix scheme
* @param url supplies the Unix to match.
Expand All @@ -71,6 +79,20 @@ class Utility {
*/
static uint32_t portFromTcpUrl(const std::string& url);

/**
* Parses the host from a UDP URL
* @param the URL to parse host from
* @return std::string the parsed host
*/
static std::string hostFromUdpUrl(const std::string& url);

/**
* Parses the port from a UDP URL
* @param the URL to parse port from
* @return uint32_t the parsed port
*/
static uint32_t portFromUdpUrl(const std::string& url);

/**
* Parse an internet host address (IPv4 or IPv6) and create an Instance from it. The address must
* not include a port number. Throws EnvoyException if unable to parse the address.
Expand Down
126 changes: 77 additions & 49 deletions test/common/network/listen_socket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,94 @@ using testing::Return;
namespace Envoy {
namespace Network {

template <Network::Address::SocketType S>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/S/SocketType or something spelled out.

class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
protected:
ListenSocketImplTest() : version_(GetParam()) {}
const Address::IpVersion version_;

template <typename... Args>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is really interesting - new syntax to me.

I'd been suggesting something more along the lines of
testing::TestWithParam<std::tuple<Network::Address::IpVersion, Address::SocketType>>

That'd allow you to keep the various test code inline with the test cases since things like

if (NetworkSocketTrait<S>::type == Address::SocketType::Stream) {
would be more like
if (std::get<1>(GetParam() == == Address::SocketType::Stream)
so we wouldn't need all the code to be located in class functions.

While I think having test code inline with TEST_P is nice and it would reduce the diffs, I'm going to go ahead and say your current implementation solves my code duplication concerns, so if you want to leave as-is and not rewrite the tests a third time that's Ok :-)

std::unique_ptr<ListenSocketImpl> createListenSocketPtr(Args&&... args) {
using NetworkSocketTraitType = NetworkSocketTrait<S>;

return std::make_unique<NetworkListenSocket<NetworkSocketTraitType>>(
std::forward<Args>(args)...);
}

void testBindSpecificPort() {
auto addr_fd = Network::Test::bindFreeLoopbackPort(version_, Address::SocketType::Stream);
auto addr = addr_fd.first;
EXPECT_LE(0, addr_fd.second);

// Confirm that we got a reasonable address and port.
ASSERT_EQ(Address::Type::Ip, addr->type());
ASSERT_EQ(version_, addr->ip()->version());
ASSERT_LT(0U, addr->ip()->port());

// Release the socket and re-bind it.
// WARNING: This test has a small but real risk of flaky behavior if another thread or process
// should bind to our assigned port during the interval between closing the fd and re-binding.
// TODO(jamessynge): Consider adding a loop or other such approach to this test so that a
// bind failure (in the TcpListenSocket ctor) once isn't considered an error.
EXPECT_EQ(0, close(addr_fd.second));

auto option = std::make_unique<MockSocketOption>();
auto options = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options->emplace_back(std::move(option));
auto socket1 = createListenSocketPtr(addr, options, true);
// TODO (conqerAtapple): This is unfortunate. We should be able to templatize this
// instead of if block.
if (NetworkSocketTrait<S>::type == Address::SocketType::Stream) {
EXPECT_EQ(0, listen(socket1->fd(), 0));
}

EXPECT_EQ(addr->ip()->port(), socket1->localAddress()->ip()->port());
EXPECT_EQ(addr->ip()->addressAsString(), socket1->localAddress()->ip()->addressAsString());

auto option2 = std::make_unique<MockSocketOption>();
auto options2 = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option2, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options2->emplace_back(std::move(option2));
// The address and port are bound already, should throw exception.
EXPECT_THROW(createListenSocketPtr(addr, options2, true), EnvoyException);

// Test the case of a socket with fd and given address and port.
auto socket3 = createListenSocketPtr(dup(socket1->fd()), addr, nullptr);
EXPECT_EQ(addr->asString(), socket3->localAddress()->asString());
}

void testBindPortZero() {
auto loopback = Network::Test::getCanonicalLoopbackAddress(version_);
auto socket = createListenSocketPtr(loopback, nullptr, true);
EXPECT_EQ(Address::Type::Ip, socket->localAddress()->type());
EXPECT_EQ(version_, socket->localAddress()->ip()->version());
EXPECT_EQ(loopback->ip()->addressAsString(), socket->localAddress()->ip()->addressAsString());
EXPECT_GT(socket->localAddress()->ip()->port(), 0U);
}
};

INSTANTIATE_TEST_CASE_P(IpVersions, ListenSocketImplTest,
using ListenSocketImplTestTcp = ListenSocketImplTest<Network::Address::SocketType::Stream>;
using ListenSocketImplTestUdp = ListenSocketImplTest<Network::Address::SocketType::Datagram>;

INSTANTIATE_TEST_CASE_P(IpVersions, ListenSocketImplTestTcp,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

INSTANTIATE_TEST_CASE_P(IpVersions, ListenSocketImplTestUdp,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

TEST_P(ListenSocketImplTest, BindSpecificPort) {
// Pick a free port.
auto addr_fd = Network::Test::bindFreeLoopbackPort(version_, Address::SocketType::Stream);
auto addr = addr_fd.first;
EXPECT_LE(0, addr_fd.second);

// Confirm that we got a reasonable address and port.
ASSERT_EQ(Address::Type::Ip, addr->type());
ASSERT_EQ(version_, addr->ip()->version());
ASSERT_LT(0U, addr->ip()->port());

// Release the socket and re-bind it.
// WARNING: This test has a small but real risk of flaky behavior if another thread or process
// should bind to our assigned port during the interval between closing the fd and re-binding.
// TODO(jamessynge): Consider adding a loop or other such approach to this test so that a
// bind failure (in the TcpListenSocket ctor) once isn't considered an error.
EXPECT_EQ(0, close(addr_fd.second));

auto option = std::make_unique<MockSocketOption>();
auto options = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options->emplace_back(std::move(option));
TcpListenSocket socket1(addr, options, true);
EXPECT_EQ(0, listen(socket1.fd(), 0));
EXPECT_EQ(addr->ip()->port(), socket1.localAddress()->ip()->port());
EXPECT_EQ(addr->ip()->addressAsString(), socket1.localAddress()->ip()->addressAsString());

auto option2 = std::make_unique<MockSocketOption>();
auto options2 = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option2, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options2->emplace_back(std::move(option2));
// The address and port are bound already, should throw exception.
EXPECT_THROW(Network::TcpListenSocket socket2(addr, options2, true), EnvoyException);

// Test the case of a socket with fd and given address and port.
TcpListenSocket socket3(dup(socket1.fd()), addr, nullptr);
EXPECT_EQ(addr->asString(), socket3.localAddress()->asString());
}
TEST_P(ListenSocketImplTestTcp, BindSpecificPort) { testBindSpecificPort(); }

TEST_P(ListenSocketImplTestUdp, BindSpecificPort) { testBindSpecificPort(); }

// Validate that we get port allocation when binding to port zero.
TEST_P(ListenSocketImplTest, BindPortZero) {
auto loopback = Network::Test::getCanonicalLoopbackAddress(version_);
TcpListenSocket socket(loopback, nullptr, true);
EXPECT_EQ(Address::Type::Ip, socket.localAddress()->type());
EXPECT_EQ(version_, socket.localAddress()->ip()->version());
EXPECT_EQ(loopback->ip()->addressAsString(), socket.localAddress()->ip()->addressAsString());
EXPECT_GT(socket.localAddress()->ip()->port(), 0U);
}
TEST_P(ListenSocketImplTestTcp, BindPortZero) { testBindPortZero(); }

TEST_P(ListenSocketImplTestUdp, BindPortZero) { testBindPortZero(); }

} // namespace Network
} // namespace Envoy
Loading