Skip to content

Commit

Permalink
Merge pull request apache#157 from lythesia/master
Browse files Browse the repository at this point in the history
[SPARKR-161] Support reduceByKeyLocally()
  • Loading branch information
concretevitamin committed Feb 9, 2015
2 parents 343b6ab + f5038c0 commit 0c6e071
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 54 deletions.
1 change: 1 addition & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ exportMethods(
"persist",
"reduce",
"reduceByKey",
"reduceByKeyLocally",
"rightOuterJoin",
"sampleRDD",
"saveAsTextFile",
Expand Down
146 changes: 92 additions & 54 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1382,26 +1382,32 @@ setMethod("groupByKey",
groupVals <- function(part) {
vals <- new.env()
keys <- new.env()
pred <- function(item) exists(item$hash, keys)
appendList <- function(acc, x) {
addItemToAccumulator(acc, x)
acc
}
makeList <- function(x) {
acc <- initAccumulator()
addItemToAccumulator(acc, x)
acc
}
# Each item in the partition is list of (K, V)
lapply(part,
function(item) {
hashVal <- as.character(hashCode(item[[1]]))
if (exists(hashVal, vals)) {
acc <- vals[[hashVal]]
acc[[length(acc) + 1]] <- item[[2]]
vals[[hashVal]] <- acc
} else {
vals[[hashVal]] <- list(item[[2]])
keys[[hashVal]] <- item[[1]]
}
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, vals, pred,
appendList, makeList)
})
# extract out data field
vals <- eapply(vals,
function(x) {
length(x$data) <- x$counter
x$data
})
# Every key in the environment contains a list
# Convert that to list(K, Seq[V])
grouped <- lapply(ls(vals),
function(name) {
list(keys[[name]], vals[[name]])
})
grouped
convertEnvsToList(keys, vals)
}
lapplyPartition(shuffled, groupVals)
})
Expand Down Expand Up @@ -1442,28 +1448,78 @@ setMethod("reduceByKey",
reduceVals <- function(part) {
vals <- new.env()
keys <- new.env()
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
hashVal <- as.character(hashCode(item[[1]]))
if (exists(hashVal, vals)) {
vals[[hashVal]] <- do.call(
combineFunc, list(vals[[hashVal]], item[[2]]))
} else {
vals[[hashVal]] <- item[[2]]
keys[[hashVal]] <- item[[1]]
}
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
})
combined <- lapply(ls(vals),
function(name) {
list(keys[[name]], vals[[name]])
})
combined
convertEnvsToList(keys, vals)
}
locallyReduced <- lapplyPartition(rdd, reduceVals)
shuffled <- partitionBy(locallyReduced, numPartitions)
lapplyPartition(shuffled, reduceVals)
})

#' Merge values by key locally
#'
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
#' and merges the values for each key using an associative reduce function, but return the
#' results immediately to the driver as an R list.
#'
#' @param rdd The RDD to reduce by key. Should be an RDD where each element is
#' list(K, V) or c(K, V).
#' @param combineFunc The associative reduce function to use.
#' @return A list of elements of type list(K, V') where V' is the merged value for each key
#' @rdname reduceByKeyLocally
#' @seealso reduceByKey
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
#' rdd <- parallelize(sc, pairs)
#' reduced <- reduceByKeyLocally(rdd, "+")
#' reduced # list(list(1, 6), list(1.1, 3))
#'}
setGeneric("reduceByKeyLocally",
function(rdd, combineFunc) {
standardGeneric("reduceByKeyLocally")
})

#' @rdname reduceByKeyLocally
#' @aliases reduceByKeyLocally,RDD,integer-method
setMethod("reduceByKeyLocally",
signature(rdd = "RDD", combineFunc = "ANY"),
function(rdd, combineFunc) {
reducePart <- function(part) {
vals <- new.env()
keys <- new.env()
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
})
list(list(keys, vals)) # return hash to avoid re-compute in merge
}
mergeParts <- function(accum, x) {
pred <- function(item) {
exists(item$hash, accum[[1]])
}
lapply(ls(x[[1]]),
function(name) {
item <- list(x[[1]][[name]], x[[2]][[name]])
item$hash <- name
updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
})
accum
}
reduced <- mapPartitions(rdd, reducePart)
merged <- reduce(reduced, mergeParts)
convertEnvsToList(merged[[1]], merged[[2]])
})

#' Combine values by key
#'
#' Generic function to combine the elements for each key using a custom set of
Expand Down Expand Up @@ -1513,46 +1569,28 @@ setMethod("combineByKey",
combineLocally <- function(part) {
combiners <- new.env()
keys <- new.env()
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
k <- as.character(item[[1]])
if (!exists(k, keys)) {
combiners[[k]] <- do.call(createCombiner,
list(item[[2]]))
keys[[k]] <- item[[1]]
} else {
combiners[[k]] <- do.call(mergeValue,
list(combiners[[k]],
item[[2]]))
}
})
lapply(ls(keys), function(k) {
list(keys[[k]], combiners[[k]])
item$hash <- as.character(item[[1]])
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
})
convertEnvsToList(keys, combiners)
}
locallyCombined <- lapplyPartition(rdd, combineLocally)
shuffled <- partitionBy(locallyCombined, numPartitions)
mergeAfterShuffle <- function(part) {
combiners <- new.env()
keys <- new.env()
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
k <- as.character(item[[1]])
if (!exists(k, combiners)) {
combiners[[k]] <- item[[2]]
keys[[k]] <- item[[1]]
} else {
combiners[[k]] <- do.call(mergeCombiners,
list(combiners[[k]],
item[[2]]))
}
})
lapply(ls(keys), function(k) {
list(keys[[k]], combiners[[k]])
item$hash <- as.character(item[[1]])
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
})
convertEnvsToList(keys, combiners)
}
combined <-lapplyPartition(shuffled, mergeAfterShuffle)
combined
lapplyPartition(shuffled, mergeAfterShuffle)
})

############ Binary Functions #############
Expand Down
29 changes: 29 additions & 0 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,32 @@ joinTaggedList <- function(tagged_list, cnull) {
lists <- genCompactLists(tagged_list, cnull)
mergeCompactLists(lists[[1]], lists[[2]])
}

# Utility function to reduce a key-value list with predicate
# Used in *ByKey functions
# param
# pair key-value pair
# keys/vals env of key/value with hashes
# updateOrCreatePred predicate function
# updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
# createFn create function for new pair, similar with `createCombiner` @combinebykey
updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
# assume hashVal bind to `$hash`, key/val with index 1/2
hashVal <- pair$hash
key <- pair[[1]]
val <- pair[[2]]
if (updateOrCreatePred(pair)) {
assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
} else {
assign(hashVal, do.call(createFn, list(val)), envir = vals)
assign(hashVal, key, envir=keys)
}
}

# Utility function to convert key&values envs into key-val list
convertEnvsToList <- function(keys, vals) {
lapply(ls(keys),
function(name) {
list(keys[[name]], vals[[name]])
})
}
13 changes: 13 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ test_that("flatMapValues() on pairwise RDDs", {
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
})

test_that("reduceByKeyLocally() on PairwiseRDDs", {
pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
actual <- reduceByKeyLocally(pairs, "+")
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, 6), list(1.1, 3))))

pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
list("bb", 5)), 4L)
actual <- reduceByKeyLocally(pairs, "+")
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
})

test_that("distinct() on RDDs", {
nums.rep2 <- rep(1:10, 2)
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
Expand Down
40 changes: 40 additions & 0 deletions pkg/man/reduceByKeyLocally.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
% Generated by roxygen2 (4.1.0): do not edit by hand
% Please edit documentation in R/RDD.R
\docType{methods}
\name{reduceByKeyLocally}
\alias{reduceByKeyLocally}
\alias{reduceByKeyLocally,RDD,integer-method}
\alias{reduceByKeyLocally,RDD-method}
\title{Merge values by key locally}
\usage{
reduceByKeyLocally(rdd, combineFunc)

\S4method{reduceByKeyLocally}{RDD}(rdd, combineFunc)
}
\arguments{
\item{rdd}{The RDD to reduce by key. Should be an RDD where each element is
list(K, V) or c(K, V).}

\item{combineFunc}{The associative reduce function to use.}
}
\value{
A list of elements of type list(K, V') where V' is the merged value for each key
}
\description{
This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
and merges the values for each key using an associative reduce function, but return the
results immediately to the driver as an R list.
}
\examples{
\dontrun{
sc <- sparkR.init()
pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
rdd <- parallelize(sc, pairs)
reduced <- reduceByKeyLocally(rdd, "+")
reduced # list(list(1, 6), list(1.1, 3))
}
}
\seealso{
reduceByKey
}

0 comments on commit 0c6e071

Please sign in to comment.