Skip to content

Commit

Permalink
network: Udp socket implementation. (envoyproxy#5108)
Browse files Browse the repository at this point in the history
Signed-off-by: Jojy G Varghese <jojy_varghese@apple.com>
Signed-off-by: Fred Douglas <fredlas@google.com>
  • Loading branch information
conqerAtapple authored and fredlas committed Mar 5, 2019
1 parent 4193d38 commit b131f40
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 81 deletions.
28 changes: 13 additions & 15 deletions source/common/network/listen_socket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,28 @@ void ListenSocketImpl::setListenSocketOptions(const Network::Socket::OptionsShar
}
}

TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port)
: ListenSocketImpl(address->socket(Address::SocketType::Stream), address) {
RELEASE_ASSERT(fd_ != -1, "");

// TODO(htuch): This might benefit from moving to SocketOptionImpl.
int on = 1;
int rc = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
RELEASE_ASSERT(rc != -1, "");

void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port) {
setListenSocketOptions(options);

if (bind_to_port) {
doBind();
}
}

TcpListenSocket::TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(fd, address) {
setListenSocketOptions(options);
template <>
void NetworkListenSocket<
NetworkSocketTrait<Address::SocketType::Stream>>::setPrebindSocketOptions() {
// TODO(htuch): This might benefit from moving to SocketOptionImpl.
int on = 1;
int rc = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
RELEASE_ASSERT(rc != -1, "");
}

template <>
void NetworkListenSocket<
NetworkSocketTrait<Address::SocketType::Datagram>>::setPrebindSocketOptions() {}

UdsListenSocket::UdsListenSocket(const Address::InstanceConstSharedPtr& address)
: ListenSocketImpl(address->socket(Address::SocketType::Stream), address) {
RELEASE_ASSERT(fd_ != -1, "");
Expand Down
39 changes: 34 additions & 5 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,52 @@ class ListenSocketImpl : public SocketImpl {
ListenSocketImpl(int fd, const Address::InstanceConstSharedPtr& local_address)
: SocketImpl(fd, local_address) {}

void setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port);
void doBind();
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options);
};

/**
* Wraps a unix socket.
*/
class TcpListenSocket : public ListenSocketImpl {
template <Address::SocketType T> struct NetworkSocketTrait {};

template <> struct NetworkSocketTrait<Address::SocketType::Stream> {
static constexpr Address::SocketType type = Address::SocketType::Stream;
};

template <> struct NetworkSocketTrait<Address::SocketType::Datagram> {
static constexpr Address::SocketType type = Address::SocketType::Datagram;
};

template <typename T> class NetworkListenSocket : public ListenSocketImpl {
public:
TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port);
TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options);
NetworkListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port)
: ListenSocketImpl(address->socket(T::type), address) {
RELEASE_ASSERT(fd_ != -1, "");

setPrebindSocketOptions();

setupSocket(options, bind_to_port);
}

NetworkListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(fd, address) {
setListenSocketOptions(options);
}

protected:
void setPrebindSocketOptions();
};

using TcpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Stream>>;
typedef std::unique_ptr<TcpListenSocket> TcpListenSocketPtr;

using UdpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Datagram>>;
typedef std::unique_ptr<UdpListenSocket> UdpListenSocketPtr;

class UdsListenSocket : public ListenSocketImpl {
public:
UdsListenSocket(const Address::InstanceConstSharedPtr& address);
Expand Down
55 changes: 44 additions & 11 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ namespace Envoy {
namespace Network {

const std::string Utility::TCP_SCHEME = "tcp://";
const std::string Utility::UDP_SCHEME = "udp://";
const std::string Utility::UNIX_SCHEME = "unix://";

Address::InstanceConstSharedPtr Utility::resolveUrl(const std::string& url) {
if (urlIsTcpScheme(url)) {
return parseInternetAddressAndPort(url.substr(TCP_SCHEME.size()));
} else if (urlIsUdpScheme(url)) {
return parseInternetAddressAndPort(url.substr(UDP_SCHEME.size()));
} else if (urlIsUnixScheme(url)) {
return Address::InstanceConstSharedPtr{
new Address::PipeInstance(url.substr(UNIX_SCHEME.size()))};
Expand All @@ -50,30 +53,42 @@ Address::InstanceConstSharedPtr Utility::resolveUrl(const std::string& url) {
}
}

bool Utility::urlIsTcpScheme(const std::string& url) { return url.find(TCP_SCHEME) == 0; }
bool Utility::urlIsTcpScheme(const std::string& url) {
return StringUtil::startsWith(url.c_str(), TCP_SCHEME, true);
}

bool Utility::urlIsUnixScheme(const std::string& url) { return url.find(UNIX_SCHEME) == 0; }
bool Utility::urlIsUdpScheme(const std::string& url) {
return StringUtil::startsWith(url.c_str(), UDP_SCHEME, true);
}

std::string Utility::hostFromTcpUrl(const std::string& url) {
if (!urlIsTcpScheme(url)) {
throw EnvoyException(fmt::format("expected TCP scheme, got: {}", url));
bool Utility::urlIsUnixScheme(const std::string& url) {
return StringUtil::startsWith(url.c_str(), UNIX_SCHEME, true);
}

namespace {

std::string hostFromUrl(const std::string& url, const std::string& scheme,
const std::string& scheme_name) {
if (url.find(scheme) != 0) {
throw EnvoyException(fmt::format("expected {} scheme, got: {}", scheme_name, url));
}

size_t colon_index = url.find(':', TCP_SCHEME.size());
size_t colon_index = url.find(':', scheme.size());

if (colon_index == std::string::npos) {
throw EnvoyException(fmt::format("malformed url: {}", url));
}

return url.substr(TCP_SCHEME.size(), colon_index - TCP_SCHEME.size());
return url.substr(scheme.size(), colon_index - scheme.size());
}

uint32_t Utility::portFromTcpUrl(const std::string& url) {
if (!urlIsTcpScheme(url)) {
throw EnvoyException(fmt::format("expected TCP scheme, got: {}", url));
uint32_t portFromUrl(const std::string& url, const std::string& scheme,
const std::string& scheme_name) {
if (url.find(scheme) != 0) {
throw EnvoyException(fmt::format("expected {} scheme, got: {}", scheme_name, url));
}

size_t colon_index = url.find(':', TCP_SCHEME.size());
size_t colon_index = url.find(':', scheme.size());

if (colon_index == std::string::npos) {
throw EnvoyException(fmt::format("malformed url: {}", url));
Expand All @@ -88,6 +103,24 @@ uint32_t Utility::portFromTcpUrl(const std::string& url) {
}
}

} // namespace

std::string Utility::hostFromTcpUrl(const std::string& url) {
return hostFromUrl(url, TCP_SCHEME, "TCP");
}

uint32_t Utility::portFromTcpUrl(const std::string& url) {
return portFromUrl(url, TCP_SCHEME, "TCP");
}

std::string Utility::hostFromUdpUrl(const std::string& url) {
return hostFromUrl(url, UDP_SCHEME, "UDP");
}

uint32_t Utility::portFromUdpUrl(const std::string& url) {
return portFromUrl(url, UDP_SCHEME, "UDP");
}

Address::InstanceConstSharedPtr Utility::parseInternetAddress(const std::string& ip_address,
uint16_t port, bool v6only) {
sockaddr_in sa4;
Expand Down
22 changes: 22 additions & 0 deletions source/common/network/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef std::list<PortRange> PortRangeList;
class Utility {
public:
static const std::string TCP_SCHEME;
static const std::string UDP_SCHEME;
static const std::string UNIX_SCHEME;

/**
Expand All @@ -50,6 +51,13 @@ class Utility {
*/
static bool urlIsTcpScheme(const std::string& url);

/**
* Match a URL to the UDP scheme
* @param url supplies the URL to match.
* @return bool true if the URL matches the UDP scheme, false otherwise.
*/
static bool urlIsUdpScheme(const std::string& url);

/**
* Match a URL to the Unix scheme
* @param url supplies the Unix to match.
Expand All @@ -71,6 +79,20 @@ class Utility {
*/
static uint32_t portFromTcpUrl(const std::string& url);

/**
* Parses the host from a UDP URL
* @param the URL to parse host from
* @return std::string the parsed host
*/
static std::string hostFromUdpUrl(const std::string& url);

/**
* Parses the port from a UDP URL
* @param the URL to parse port from
* @return uint32_t the parsed port
*/
static uint32_t portFromUdpUrl(const std::string& url);

/**
* Parse an internet host address (IPv4 or IPv6) and create an Instance from it. The address must
* not include a port number. Throws EnvoyException if unable to parse the address.
Expand Down
126 changes: 77 additions & 49 deletions test/common/network/listen_socket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,94 @@ using testing::Return;
namespace Envoy {
namespace Network {

template <Network::Address::SocketType Type>
class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
protected:
ListenSocketImplTest() : version_(GetParam()) {}
const Address::IpVersion version_;

template <typename... Args>
std::unique_ptr<ListenSocketImpl> createListenSocketPtr(Args&&... args) {
using NetworkSocketTraitType = NetworkSocketTrait<Type>;

return std::make_unique<NetworkListenSocket<NetworkSocketTraitType>>(
std::forward<Args>(args)...);
}

void testBindSpecificPort() {
auto addr_fd = Network::Test::bindFreeLoopbackPort(version_, Address::SocketType::Stream);
auto addr = addr_fd.first;
EXPECT_LE(0, addr_fd.second);

// Confirm that we got a reasonable address and port.
ASSERT_EQ(Address::Type::Ip, addr->type());
ASSERT_EQ(version_, addr->ip()->version());
ASSERT_LT(0U, addr->ip()->port());

// Release the socket and re-bind it.
// WARNING: This test has a small but real risk of flaky behavior if another thread or process
// should bind to our assigned port during the interval between closing the fd and re-binding.
// TODO(jamessynge): Consider adding a loop or other such approach to this test so that a
// bind failure (in the TcpListenSocket ctor) once isn't considered an error.
EXPECT_EQ(0, close(addr_fd.second));

auto option = std::make_unique<MockSocketOption>();
auto options = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options->emplace_back(std::move(option));
auto socket1 = createListenSocketPtr(addr, options, true);
// TODO (conqerAtapple): This is unfortunate. We should be able to templatize this
// instead of if block.
if (NetworkSocketTrait<Type>::type == Address::SocketType::Stream) {
EXPECT_EQ(0, listen(socket1->fd(), 0));
}

EXPECT_EQ(addr->ip()->port(), socket1->localAddress()->ip()->port());
EXPECT_EQ(addr->ip()->addressAsString(), socket1->localAddress()->ip()->addressAsString());

auto option2 = std::make_unique<MockSocketOption>();
auto options2 = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option2, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options2->emplace_back(std::move(option2));
// The address and port are bound already, should throw exception.
EXPECT_THROW(createListenSocketPtr(addr, options2, true), EnvoyException);

// Test the case of a socket with fd and given address and port.
auto socket3 = createListenSocketPtr(dup(socket1->fd()), addr, nullptr);
EXPECT_EQ(addr->asString(), socket3->localAddress()->asString());
}

void testBindPortZero() {
auto loopback = Network::Test::getCanonicalLoopbackAddress(version_);
auto socket = createListenSocketPtr(loopback, nullptr, true);
EXPECT_EQ(Address::Type::Ip, socket->localAddress()->type());
EXPECT_EQ(version_, socket->localAddress()->ip()->version());
EXPECT_EQ(loopback->ip()->addressAsString(), socket->localAddress()->ip()->addressAsString());
EXPECT_GT(socket->localAddress()->ip()->port(), 0U);
}
};

INSTANTIATE_TEST_CASE_P(IpVersions, ListenSocketImplTest,
using ListenSocketImplTestTcp = ListenSocketImplTest<Network::Address::SocketType::Stream>;
using ListenSocketImplTestUdp = ListenSocketImplTest<Network::Address::SocketType::Datagram>;

INSTANTIATE_TEST_CASE_P(IpVersions, ListenSocketImplTestTcp,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

INSTANTIATE_TEST_CASE_P(IpVersions, ListenSocketImplTestUdp,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

TEST_P(ListenSocketImplTest, BindSpecificPort) {
// Pick a free port.
auto addr_fd = Network::Test::bindFreeLoopbackPort(version_, Address::SocketType::Stream);
auto addr = addr_fd.first;
EXPECT_LE(0, addr_fd.second);

// Confirm that we got a reasonable address and port.
ASSERT_EQ(Address::Type::Ip, addr->type());
ASSERT_EQ(version_, addr->ip()->version());
ASSERT_LT(0U, addr->ip()->port());

// Release the socket and re-bind it.
// WARNING: This test has a small but real risk of flaky behavior if another thread or process
// should bind to our assigned port during the interval between closing the fd and re-binding.
// TODO(jamessynge): Consider adding a loop or other such approach to this test so that a
// bind failure (in the TcpListenSocket ctor) once isn't considered an error.
EXPECT_EQ(0, close(addr_fd.second));

auto option = std::make_unique<MockSocketOption>();
auto options = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options->emplace_back(std::move(option));
TcpListenSocket socket1(addr, options, true);
EXPECT_EQ(0, listen(socket1.fd(), 0));
EXPECT_EQ(addr->ip()->port(), socket1.localAddress()->ip()->port());
EXPECT_EQ(addr->ip()->addressAsString(), socket1.localAddress()->ip()->addressAsString());

auto option2 = std::make_unique<MockSocketOption>();
auto options2 = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();
EXPECT_CALL(*option2, setOption(_, envoy::api::v2::core::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options2->emplace_back(std::move(option2));
// The address and port are bound already, should throw exception.
EXPECT_THROW(Network::TcpListenSocket socket2(addr, options2, true), EnvoyException);

// Test the case of a socket with fd and given address and port.
TcpListenSocket socket3(dup(socket1.fd()), addr, nullptr);
EXPECT_EQ(addr->asString(), socket3.localAddress()->asString());
}
TEST_P(ListenSocketImplTestTcp, BindSpecificPort) { testBindSpecificPort(); }

TEST_P(ListenSocketImplTestUdp, BindSpecificPort) { testBindSpecificPort(); }

// Validate that we get port allocation when binding to port zero.
TEST_P(ListenSocketImplTest, BindPortZero) {
auto loopback = Network::Test::getCanonicalLoopbackAddress(version_);
TcpListenSocket socket(loopback, nullptr, true);
EXPECT_EQ(Address::Type::Ip, socket.localAddress()->type());
EXPECT_EQ(version_, socket.localAddress()->ip()->version());
EXPECT_EQ(loopback->ip()->addressAsString(), socket.localAddress()->ip()->addressAsString());
EXPECT_GT(socket.localAddress()->ip()->port(), 0U);
}
TEST_P(ListenSocketImplTestTcp, BindPortZero) { testBindPortZero(); }

TEST_P(ListenSocketImplTestUdp, BindPortZero) { testBindPortZero(); }

} // namespace Network
} // namespace Envoy
Loading

0 comments on commit b131f40

Please sign in to comment.