Skip to content

Commit

Permalink
Merge pull request apache#163 from sun-rui/SPARKR-153_1
Browse files Browse the repository at this point in the history
[SPARKR-153] phase 1: implement fold() and aggregate().
  • Loading branch information
concretevitamin committed Feb 15, 2015
2 parents c91ede2 + 141723e commit 5836650
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
exportClasses("RDD")
exportClasses("Broadcast")
exportMethods(
"aggregateRDD",
"cache",
"checkpoint",
"cogroup",
Expand All @@ -17,6 +18,7 @@ exportMethods(
"filterRDD",
"flatMap",
"flatMapValues",
"fold",
"foreach",
"foreachPartition",
"fullOuterJoin",
Expand Down
67 changes: 67 additions & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,73 @@ setMethod("top",
takeOrderedElem(rdd, num, FALSE)
})

#' Fold an RDD using a given associative function and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using a given associative function and a neutral "zero value".
#'
#' @param rdd An RDD.
#' @param zeroValue A neutral "zero value".
#' @param op An associative function for the folding operation.
#' @return The folding result.
#' @rdname fold
#' @seealso reduce
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
#' fold(rdd, 0, "+") # 15
#'}
setGeneric("fold", function(rdd, zeroValue, op) { standardGeneric("fold") })

#' @rdname fold
#' @aliases fold,RDD,RDD-method
setMethod("fold",
signature(rdd = "RDD", zeroValue = "ANY", op = "ANY"),
function(rdd, zeroValue, op) {
aggregateRDD(rdd, zeroValue, op, op)
})

#' Aggregate an RDD using the given combine functions and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using given combine functions and a neutral "zero value".
#'
#' @param rdd An RDD.
#' @param zeroValue A neutral "zero value".
#' @param seqOp A function to aggregate the RDD elements. It may return a different
#' result type from the type of the RDD elements.
#' @param combOp A function to aggregate results of seqOp.
#' @return The aggregation result.
#' @rdname aggregateRDD
#' @seealso reduce
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
#' zeroValue <- list(0, 0)
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#'}
setGeneric("aggregateRDD", function(rdd, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

#' @rdname aggregateRDD
#' @aliases aggregateRDD,RDD,RDD-method
setMethod("aggregateRDD",
signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
function(rdd, zeroValue, seqOp, combOp) {
partitionFunc <- function(part) {
Reduce(seqOp, part, zeroValue)
}

partitionList <- collect(lapplyPartition(rdd, partitionFunc),
flatten = FALSE)
Reduce(combOp, partitionList, zeroValue)
})

############ Shuffle Functions ############

#' Partition an RDD by key
Expand Down
22 changes: 22 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,28 @@ test_that("top() on RDDs", {
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
})

test_that("fold() on RDDs", {
actual <- fold(rdd, 0, "+")
expect_equal(actual, Reduce("+", nums, 0))

rdd <- parallelize(sc, list())
actual <- fold(rdd, 0, "+")
expect_equal(actual, 0)
})

test_that("aggregateRDD() on RDDs", {
rdd <- parallelize(sc, list(1, 2, 3, 4))
zeroValue <- list(0, 0)
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
expect_equal(actual, list(10, 4))

rdd <- parallelize(sc, list())
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
expect_equal(actual, list(0, 0))
})

test_that("keys() on RDDs", {
keys <- keys(intRdd)
actual <- collect(keys)
Expand Down
43 changes: 43 additions & 0 deletions pkg/man/aggregateRDD.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{aggregateRDD}
\alias{aggregateRDD}
\alias{aggregateRDD,RDD,RDD-method}
\alias{aggregateRDD,RDD-method}
\title{Aggregate an RDD using the given combine functions and a neutral "zero value".}
\usage{
aggregateRDD(rdd, zeroValue, seqOp, combOp)

\S4method{aggregateRDD}{RDD}(rdd, zeroValue, seqOp, combOp)
}
\arguments{
\item{rdd}{An RDD.}

\item{zeroValue}{A neutral "zero value".}

\item{seqOp}{A function to aggregate the RDD elements. It may return a different
result type from the type of the RDD elements.}

\item{combOp}{A function to aggregate results of seqOp.}
}
\value{
The aggregation result.
}
\description{
Aggregate the elements of each partition, and then the results for all the
partitions, using given combine functions and a neutral "zero value".
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(1, 2, 3, 4))
zeroValue <- list(0, 0)
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
}
}
\seealso{
reduce
}

37 changes: 37 additions & 0 deletions pkg/man/fold.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{fold}
\alias{fold}
\alias{fold,RDD,RDD-method}
\alias{fold,RDD-method}
\title{Fold an RDD using a given associative function and a neutral "zero value".}
\usage{
fold(rdd, zeroValue, op)

\S4method{fold}{RDD}(rdd, zeroValue, op)
}
\arguments{
\item{rdd}{An RDD.}

\item{zeroValue}{A neutral "zero value".}

\item{op}{An associative function for the folding operation.}
}
\value{
The folding result.
}
\description{
Aggregate the elements of each partition, and then the results for all the
partitions, using a given associative function and a neutral "zero value".
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
fold(rdd, 0, "+") # 15
}
}
\seealso{
reduce
}

0 comments on commit 5836650

Please sign in to comment.