Skip to content

Commit

Permalink
Create libuv transport (facebookincubator#212)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#212

This transport is largely modeled after the existing TCP transport.
There is no support for the old style and only for the new style
algorithms (there are no createSendBuffer/createRecvBuffer functions).

This commit adds the basic functionality but more work is needed to
properly deal with errors and timeouts.

It requires libuv to be available. Either it should already be defined
as a build target, if included by another project's build, or it must
be installed and discoverable through pkg-config.

Test Plan: Imported from OSS

Reviewed By: mrshenli

Differential Revision: D17072672

Pulled By: pietern

fbshipit-source-id: 6594a8a580596e86253b3d8783978c40fafef373
  • Loading branch information
pietern authored and facebook-github-bot committed Aug 27, 2019
1 parent 767c232 commit 5afa358
Show file tree
Hide file tree
Showing 23 changed files with 3,176 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ set(USE_REDIS_DEFAULT OFF)
set(USE_IBVERBS_DEFAULT OFF)
set(USE_NCCL_DEFAULT OFF)
set(USE_RCCL_DEFAULT OFF)
set(USE_LIBUV_DEFAULT OFF)

# Options
option(USE_REDIS "Support using Redis for rendezvous" ${USE_REDIS_DEFAULT})
option(USE_IBVERBS "Support ibverbs transport" ${USE_IBVERBS_DEFAULT})
option(USE_NCCL "Support using NCCL for local collectives" ${USE_NCCL_DEFAULT})
option(USE_RCCL "Support using RCCL for local collectives" ${USE_RCCL_DEFAULT})
option(USE_LIBUV "Build libuv transport" ${USE_LIBUV_DEFAULT})

# Set default build type
if(NOT CMAKE_BUILD_TYPE)
Expand Down
25 changes: 25 additions & 0 deletions cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,31 @@ if(USE_IBVERBS)
endif()
endif()

if(USE_LIBUV)
# If the Gloo build is included from another project's build, it may
# have already included libuv and we can use it directly here.
if(TARGET uv_a)
# Note: the CMake files in the libuv don't specify an include
# directory for the uv and uv_a targets. If you're including the
# Gloo build from your own project's build, and include libuv
# there as well, you may need to include the following to tack on
# the include path to the libuv targets.
#
# set_target_properties(uv_a PROPERTIES
# INTERFACE_INCLUDE_DIRECTORIES "${libuv_SOURCE_DIR}/include"
# )
#
else()
include(FindPkgConfig)
pkg_search_module(libuv REQUIRED libuv>=1.29)
add_library(uv_a INTERFACE IMPORTED)
set_target_properties(uv_a PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${libuv_INCLUDE_DIRS}
INTERFACE_LINK_LIBRARIES ${libuv_LIBDIR}/libuv_a.a
)
endif()
endif()

if(USE_MPI)
find_package(MPI)
if(MPI_C_FOUND)
Expand Down
14 changes: 13 additions & 1 deletion gloo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ else()
set(GLOO_HAVE_TRANSPORT_IBVERBS 0)
endif()

# Compile uv transport if libuv is available
if(USE_LIBUV)
set(GLOO_HAVE_TRANSPORT_UV 1)
else()
set(GLOO_HAVE_TRANSPORT_UV 0)
endif()

add_subdirectory(common)
add_subdirectory(mpi)
if(USE_CUDA AND USE_NCCL)
Expand Down Expand Up @@ -116,10 +123,10 @@ set(GLOO_USE_REDIS ${USE_REDIS})
set(GLOO_USE_IBVERBS ${USE_IBVERBS})
set(GLOO_USE_MPI ${USE_MPI})
set(GLOO_USE_AVX ${USE_AVX})
set(GLOO_USE_LIBUV ${USE_LIBUV})
configure_file(config.h.in config.h)

add_library(gloo ${GLOO_STATIC_OR_SHARED} ${GLOO_SRCS})
target_link_libraries(gloo PRIVATE ${gloo_DEPENDENCY_LIBS})
if(USE_CUDA)
cuda_add_library(gloo_cuda ${GLOO_CUDA_SRCS} ${GLOO_STATIC_OR_SHARED})
target_link_libraries(gloo_cuda gloo ${gloo_cuda_DEPENDENCY_LIBS})
Expand All @@ -128,6 +135,11 @@ if(USE_ROCM)
gloo_hip_add_library(gloo_hip ${GLOO_HIP_SRCS})
target_link_libraries(gloo_hip gloo)
endif()
if(USE_LIBUV)
target_link_libraries(gloo PRIVATE uv_a)
endif()

target_link_libraries(gloo PRIVATE ${gloo_DEPENDENCY_LIBS})

# Add Interface include directories that are relocatable.
target_include_directories(gloo INTERFACE $<INSTALL_INTERFACE:include>)
Expand Down
2 changes: 2 additions & 0 deletions gloo/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ static_assert(
#cmakedefine01 GLOO_USE_IBVERBS
#cmakedefine01 GLOO_USE_MPI
#cmakedefine01 GLOO_USE_AVX
#cmakedefine01 GLOO_USE_LIBUV

#cmakedefine01 GLOO_HAVE_TRANSPORT_TCP
#cmakedefine01 GLOO_HAVE_TRANSPORT_IBVERBS
#cmakedefine01 GLOO_HAVE_TRANSPORT_UV
33 changes: 26 additions & 7 deletions gloo/test/base_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#include "gloo/transport/tcp/device.h"
#endif

#if GLOO_HAVE_TRANSPORT_UV
#include "gloo/transport/uv/device.h"
#endif

namespace gloo {
namespace test {

Expand All @@ -50,6 +54,7 @@ class Barrier {

enum Transport {
TCP,
UV,
};

// Transports that instantiated algorithms can be tested against.
Expand All @@ -62,16 +67,26 @@ const std::vector<Transport> kTransportsForClassAlgorithms{
// preferred over the instantiated style.
const std::vector<Transport> kTransportsForFunctionAlgorithms{
Transport::TCP,
Transport::UV,
};

class BaseTest : public ::testing::Test {
protected:
virtual void SetUp() {
std::shared_ptr<::gloo::transport::Device> createDevice(
const Transport transport) {
#if GLOO_HAVE_TRANSPORT_TCP
static auto dev = ::gloo::transport::tcp::CreateDevice("localhost");
device_ = dev;
if (transport == Transport::TCP) {
static auto dev = ::gloo::transport::tcp::CreateDevice("localhost");
return dev;
}
#endif
#if GLOO_HAVE_TRANSPORT_UV
if (transport == Transport::UV) {
static auto dev = ::gloo::transport::uv::CreateDevice("localhost");
return dev;
}
#endif
GLOO_ENFORCE(device_, "No transport device!");
return std::shared_ptr<::gloo::transport::Device>();
}

void spawnThreads(int size, std::function<void(int)> fn) {
Expand Down Expand Up @@ -105,10 +120,16 @@ class BaseTest : public ::testing::Test {
int base = 2) {
Barrier barrier(size);
::gloo::rendezvous::HashStore store;

auto device = createDevice(transport);
if (!device) {
return;
}

spawnThreads(size, [&](int rank) {
auto context =
std::make_shared<::gloo::rendezvous::Context>(rank, size, base);
context->connectFullMesh(store, device_);
context->connectFullMesh(store, device);

try {
fn(context);
Expand All @@ -133,8 +154,6 @@ class BaseTest : public ::testing::Test {
}
});
}

std::shared_ptr<::gloo::transport::Device> device_;
};

template <typename T>
Expand Down
3 changes: 2 additions & 1 deletion gloo/test/context_factory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ TEST_P(ContextStoreTest, RunAlgo) {
auto factory =
std::make_shared<::gloo::rendezvous::ContextFactory>(context);
for (int i = 0; i < repeatCount; ++i) {
auto usingContext = factory->makeContext(device_);
auto device = createDevice(Transport::TCP);
auto usingContext = factory->makeContext(device);
fn(usingContext);
}
});
Expand Down
2 changes: 1 addition & 1 deletion gloo/test/send_recv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ INSTANTIATE_TEST_CASE_P(
SendRecvDefault,
SendRecvTest,
::testing::Combine(
::testing::Values(Transport::TCP),
::testing::Values(Transport::TCP, Transport::UV),
::testing::Values(2, 3, 4, 5, 6, 7, 8),
::testing::Values(1)));

Expand Down
4 changes: 4 additions & 0 deletions gloo/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ if(GLOO_HAVE_TRANSPORT_IBVERBS)
add_subdirectory(ibverbs)
endif()

if(GLOO_HAVE_TRANSPORT_UV)
add_subdirectory(uv)
endif()

list(APPEND GLOO_SRCS ${GLOO_TRANSPORT_SRCS})
list(APPEND GLOO_HDRS ${GLOO_TRANSPORT_HDRS})
set(GLOO_SRCS ${GLOO_SRCS} PARENT_SCOPE)
Expand Down
19 changes: 19 additions & 0 deletions gloo/transport/uv/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
list(APPEND GLOO_TRANSPORT_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/address.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/device.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/context.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/libuv.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/pair.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/unbound_buffer.cc"
)
list(APPEND GLOO_TRANSPORT_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}/address.h"
"${CMAKE_CURRENT_SOURCE_DIR}/device.h"
"${CMAKE_CURRENT_SOURCE_DIR}/context.h"
"${CMAKE_CURRENT_SOURCE_DIR}/libuv.h"
"${CMAKE_CURRENT_SOURCE_DIR}/pair.h"
"${CMAKE_CURRENT_SOURCE_DIR}/unbound_buffer.h"
)

set(GLOO_TRANSPORT_SRCS ${GLOO_TRANSPORT_SRCS} PARENT_SCOPE)
set(GLOO_TRANSPORT_HDRS ${GLOO_TRANSPORT_HDRS} PARENT_SCOPE)
21 changes: 21 additions & 0 deletions gloo/transport/uv/LICENSE.uvw
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2016-2019 Michele Caini

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copy of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copy or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
71 changes: 71 additions & 0 deletions gloo/transport/uv/address.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright (c) 2019-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <gloo/transport/uv/address.h>

#include <string.h>

#include <uv.h>

#include <gloo/common/logging.h>

namespace gloo {
namespace transport {
namespace uv {

Address::Address(struct sockaddr_storage ss, sequence_type seq) {
impl_.ss = std::move(ss);
impl_.seq = seq;
}

Address::Address(const std::vector<char>& bytes) {
GLOO_ENFORCE_EQ(sizeof(impl_), bytes.size());
memcpy(&impl_, bytes.data(), sizeof(impl_));
}

std::vector<char> Address::bytes() const {
std::vector<char> bytes(sizeof(impl_));
memcpy(bytes.data(), &impl_, sizeof(impl_));
return bytes;
}

std::string Address::str() const {
char str[INET6_ADDRSTRLEN + 128];
int port = 0;

str[0] = '[';
if (impl_.ss.ss_family == AF_INET) {
auto in = (struct sockaddr_in*)&impl_.ss;
uv_ip4_name(in, str + 1, sizeof(str) - 1);
port = in->sin_port;
} else if (impl_.ss.ss_family == AF_INET6) {
auto in6 = (struct sockaddr_in6*)&impl_.ss;
uv_ip6_name(in6, str + 1, sizeof(str) - 1);
port = in6->sin6_port;
} else {
snprintf(str + 1, sizeof(str) - 1, "none");
}

size_t len = strlen(str);
if (port > 0) {
len += snprintf(str + len, sizeof(str) - len, "]:%d", port);
} else {
len += snprintf(str + len, sizeof(str) - len, "]");
}

// Append sequence number if one is set.
if (impl_.seq != SIZE_MAX) {
len += snprintf(str + len, sizeof(str) - len, "$%d", impl_.seq);
}

return str;
}

} // namespace uv
} // namespace transport
} // namespace gloo
66 changes: 66 additions & 0 deletions gloo/transport/uv/address.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright (c) 2019-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <sys/socket.h>

#include <gloo/transport/address.h>

namespace gloo {
namespace transport {
namespace uv {

class Address : public ::gloo::transport::Address {
public:
using sequence_type = int;

Address() {}

Address(struct sockaddr_storage ss, sequence_type seq = -1);

explicit Address(const std::vector<char>&);

virtual std::vector<char> bytes() const override;

virtual std::string str() const override;

const struct sockaddr_storage& getSockaddr() const {
return impl_.ss;
}

sequence_type getSeq() const {
return impl_.seq;
}

Address withSeq(sequence_type seq) const {
return Address(impl_.ss, seq);
}

protected:
// Encapsulate fields such that it is trivially copyable. This class
// is not trivially copyable itself (because it is a subclass?).
struct Impl {
// IP address of the listening socket.
struct sockaddr_storage ss;

// Sequence number of this address.
// If this is equal to -1, the address is assumed to
// represent the listening socket of a device. The sequence number
// must be set before it can be used by a pair.
sequence_type seq = -1;
};

static_assert(std::is_trivially_copyable<Impl>::value, "!");

Impl impl_;
};

} // namespace uv
} // namespace transport
} // namespace gloo
Loading

0 comments on commit 5afa358

Please sign in to comment.