Skip to content

Commit

Permalink
Add bcube allreduce algorithm
Browse files Browse the repository at this point in the history
Summary:
This algorithm implements a hierarchical reduce/scatter followed by
hierarchical allgather. The number of processes in a communication
group must be identical across communication groups at every step in
the hierarchy. Therefore, the number of processes must factorizable.

If the number of processes is a prime number, the execution of this
algorithm is equivalent to an all-to-all reduce/scatter followed by an
all-to-all allgather. This is suboptimal if there is network locality
to take advantage of.

Reviewed By: mrshenli

Differential Revision: D15325539

fbshipit-source-id: c5c35f0113edf2a2f41b0dff82cbd4b309793799
  • Loading branch information
pietern authored and facebook-github-bot committed Jun 19, 2019
1 parent 63dd9f3 commit 46ae6ec
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 9 deletions.
295 changes: 294 additions & 1 deletion gloo/allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ void ring(
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs);

// Forward declaration of bcube algorithm implementation.
void bcube(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs);

// Returns function that computes local reduction over inputs and
// stores it in the output for a given range in those buffers.
// This is done prior to either sending a region to a neighbor, or
Expand Down Expand Up @@ -123,7 +129,17 @@ void allreduce(const detail::AllreduceOptionsImpl& opts) {
return;
}

ring(opts, reduceInputs, broadcastOutputs);
switch (opts.algorithm) {
case detail::AllreduceOptionsImpl::UNSPECIFIED:
case detail::AllreduceOptionsImpl::RING:
ring(opts, reduceInputs, broadcastOutputs);
break;
case detail::AllreduceOptionsImpl::BCUBE:
bcube(opts, reduceInputs, broadcastOutputs);
break;
default:
GLOO_ENFORCE(false, "Algorithm not handled.");
}
}

void ring(
Expand Down Expand Up @@ -368,6 +384,283 @@ void ring(
}
}

// For a given context size and desired group size, compute the actual group
// size per step. Note that the group size per step is n for all steps, only
// if n^(#steps) == size. Otherwise, the final group size is != n.
std::vector<size_t> computeGroupSizePerStep(size_t size, const size_t n) {
std::vector<size_t> result;
GLOO_ENFORCE_GT(n, 1);
while (size % n == 0) {
result.push_back(n);
size /= n;
}
if (size > 1) {
result.push_back(size);
}
return result;
}

// The bcube algorithm implements a hypercube-like strategy for reduction. The
// constraint is that the number of processes can be factorized. If the minimum
// component in the factorization is 2, and the number of processes is equal to
// a power of 2, the algorithm is identical to recursive halving/doubling. The
// number of elements in the factorization determines the number of steps of the
// algorithm. Each element of the factorization determines the number of
// processes each process communicates with at that particular step of the
// algorithm. If the number of processes is not factorizable, the algorithm is
// identical to a direct reduce-scatter followed by allgather.
//
// For example, if #processes == 8, and we factorize as 4 * 2, the algorithm
// runs in 2 steps. In the first step, 2 groups of 4 processes exchange data
// such that all processes have 1/4th of the partial result (with process 0
// having the first quarter, 1 having the second quarter, and so forth). In the
// second step, 4 groups of 2 processes exchange their partial result such that
// all processes have 1/8th of the result. Then, the same factorization is
// followed in reverse to perform an allgather.
//
void bcube(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs) {
const auto& context = opts.context;
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);
const auto elementSize = opts.elementSize;
auto& out = opts.out[0];

constexpr auto n = 2;

// Figure out the number of steps in this algorithm.
const auto groupSizePerStep = computeGroupSizePerStep(context->size, n);

struct group {
// Distance between peers in this group.
size_t peerDistance;

// Segment that this group is responsible for reducing.
size_t bufferOffset;
size_t bufferLength;

// The process ranks that are a member of this group.
std::vector<size_t> ranks;

// Upper bound of the length of the chunk that each process has the
// reduced values for by the end of the reduction for this group.
size_t chunkLength;

// Chunk within the segment that this process is responsible for reducing.
size_t myChunkOffset;
size_t myChunkLength;
};

// Compute the details of a group at every algorithm step.
// We keep this in a vector because we iterate through it in forward order in
// the reduce/scatter phase and in backward order in the allgather phase.
std::vector<struct group> groups;
{
struct group group;
group.peerDistance = 1;
group.bufferOffset = 0;
group.bufferLength = opts.elements;
for (const size_t groupSize : groupSizePerStep) {
const size_t groupRank = (context->rank / group.peerDistance) % groupSize;
const size_t baseRank = context->rank - (groupRank * group.peerDistance);
group.ranks.reserve(groupSize);
for (size_t i = 0; i < groupSize; i++) {
group.ranks.push_back(baseRank + i * group.peerDistance);
}

// Compute the length of the chunk we're exchanging at this step.
group.chunkLength = ((group.bufferLength + (groupSize - 1)) / groupSize);

// This process is computing the reduction of the chunk positioned at
// <rank>/<size> within the current segment.
group.myChunkOffset =
group.bufferOffset + (groupRank * group.chunkLength);
group.myChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) -
int64_t(groupRank * group.chunkLength))));

// Store a const copy of this group in the vector.
groups.push_back(group);

// Initialize with updated peer distance and segment offset and length.
struct group nextGroup;
nextGroup.peerDistance = group.peerDistance * groupSize;
nextGroup.bufferOffset = group.myChunkOffset;
nextGroup.bufferLength = group.myChunkLength;
std::swap(group, nextGroup);
}
}

// The chunk length is rounded up, so the maximum scratch space we need
// might be larger than the size of the output buffer. Compute the maximum
size_t bufferLength = opts.elements;
for (const auto& group : groups) {
bufferLength =
std::max(bufferLength, group.ranks.size() * group.chunkLength);
}

// Allocate scratch space to receive data from peers.
const size_t bufferSize = bufferLength * elementSize;
std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
std::unique_ptr<transport::UnboundBuffer> tmp =
context->createUnboundBuffer(buffer.get(), bufferSize);

// Reduce/scatter.
for (size_t step = 0; step < groups.size(); step++) {
const auto& group = groups[step];

// Issue receive operations for chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
tmp->recv(
src,
slot,
i * group.chunkLength * elementSize,
group.myChunkLength * elementSize);
}

// Issue send operations for local chunks to peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
// Compute the local reduction only in the first step of the algorithm.
// In subsequent steps, we already have a partially reduced result.
if (step == 0) {
reduceInputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
out->send(
dst,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}

// Wait for send and receive operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
tmp->waitRecv();
out->waitSend();
}

// In the first step, prepare the chunk this process is responsible for
// with the reduced version of its inputs (if multiple are specified).
if (step == 0) {
reduceInputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}

// Reduce chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
opts.reduce(
static_cast<uint8_t*>(out->ptr) + (group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(out->ptr) +
(group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(tmp->ptr) +
(i * group.chunkLength * elementSize),
group.myChunkLength);
}
}

// There is one chunk that contains the final result and this chunk
// can already be broadcast locally to out[1..N], if applicable.
// Doing so means we only have to broadcast locally to out[1..N] all
// chunks as we receive them from our peers during the allgather phase.
{
const auto& group = groups.back();
broadcastOutputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}

// Allgather.
for (auto it = groups.rbegin(); it != groups.rend(); it++) {
const auto& group = *it;

// Issue receive operations for reduced chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
out->recv(
src,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}

// Issue send operations for reduced chunk to peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
out->send(
dst,
slot,
group.myChunkOffset * elementSize,
group.myChunkLength * elementSize);
}

// Wait for operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
out->waitRecv();
out->waitSend();
}

// Broadcast result to multiple output buffers, if applicable.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
broadcastOutputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
}
}

} // namespace

void allreduce(const AllreduceOptions& opts) {
Expand Down
18 changes: 17 additions & 1 deletion gloo/allreduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,25 @@ struct AllreduceOptionsImpl {
//
using Func = std::function<void(void*, const void*, const void*, size_t)>;

enum Algorithm {
UNSPECIFIED = 0,
RING = 1,
BCUBE = 2,
};

explicit AllreduceOptionsImpl(const std::shared_ptr<Context>& context)
: context(context), timeout(context->getTimeout()) {}
: context(context),
timeout(context->getTimeout()),
algorithm(UNSPECIFIED) {}

std::shared_ptr<Context> context;

// End-to-end timeout for this operation.
std::chrono::milliseconds timeout;

// Algorithm selection.
Algorithm algorithm;

// Input and output buffers.
// The output is used as input if input is not specified.
std::vector<std::unique_ptr<transport::UnboundBuffer>> in;
Expand Down Expand Up @@ -78,10 +89,15 @@ struct AllreduceOptionsImpl {
class AllreduceOptions {
public:
using Func = detail::AllreduceOptionsImpl::Func;
using Algorithm = detail::AllreduceOptionsImpl::Algorithm;

explicit AllreduceOptions(const std::shared_ptr<Context>& context)
: impl_(context) {}

void setAlgorithm(Algorithm algorithm) {
impl_.algorithm = algorithm;
}

template <typename T>
void setInput(std::unique_ptr<transport::UnboundBuffer> buf) {
std::vector<std::unique_ptr<transport::UnboundBuffer>> bufs(1);
Expand Down
Loading

0 comments on commit 46ae6ec

Please sign in to comment.