Skip to content

Commit

Permalink
Merge pull request apache#67 from shivaram/improve-shuffle
Browse files Browse the repository at this point in the history
Use a better list append in shuffles
  • Loading branch information
concretevitamin committed Jul 15, 2014
2 parents 388e64d + 81251e2 commit 5881da7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
15 changes: 15 additions & 0 deletions pkg/inst/worker/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,18 @@ writeEnvironment <- function(con, e, keyValPairsSerialized = TRUE) {
}
}
}

# Fast append to list by using an accumulator.
# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
#
# The accumulator should has three fields size, counter and data.
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
if(acc$counter == acc$size) {
acc$size <- acc$size * 2
length(acc$data) <- acc$size
}
acc$counter <- acc$counter + 1
acc$data[[acc$counter]] <- item
}
14 changes: 11 additions & 3 deletions pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,14 @@ if (isEmpty != 0) {
hashVal <- do.call(execFunctionName, list(tuple[[1]]))
bucket <- as.character(hashVal %% numPartitions)
acc <- res[[bucket]]
# TODO?: http://stackoverflow.com/questions/2436688/append-an-object-to-a-list-in-r-in-amortized-constant-time
acc[[length(acc) + 1]] <- tuple
# Create a new accumulator
if (is.null(acc)) {
acc <- new.env()
acc$counter <- 0
acc$data <- list(NULL)
acc$size <- 1
}
addItemToAccumulator(acc, tuple)
res[[bucket]] <- acc
}
invisible(lapply(data, hashTupleToEnvir))
Expand All @@ -112,7 +118,9 @@ if (isEmpty != 0) {
for (name in ls(res)) {
writeInt(outputCon, 2L)
writeInt(outputCon, as.integer(name))
writeRaw(outputCon, res[[name]])
# Truncate the accumulator list to the number of elements we have
length(res[[name]]$data) <- res[[name]]$counter
writeRaw(outputCon, res[[name]]$data)
}
}
}
Expand Down

0 comments on commit 5881da7

Please sign in to comment.