Skip to content

Commit

Permalink
Merge pull request apache#154 from sun-rui/SPARKR-150
Browse files Browse the repository at this point in the history
[SPARKR-150] phase 1: implement sortBy() and sortByKey().
  • Loading branch information
concretevitamin committed Feb 10, 2015
2 parents 0c6e071 + c7964c9 commit bd6705b
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ exportMethods(
"sampleRDD",
"saveAsTextFile",
"saveAsObjectFile",
"sortBy",
"sortByKey",
"take",
"takeSample",
"unionRDD",
Expand Down
100 changes: 100 additions & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,36 @@ setMethod("flatMapValues",
flatMap(X, flatMapFunc)
})

#' Sort an RDD by the given key function.
#'
#' @param rdd An RDD to be sorted.
#' @param func A function used to compute the sort key for each element.
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all elements are sorted.
#' @rdname sortBy
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(3, 2, 1))
#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
#'}
setGeneric("sortBy", function(rdd,
func,
ascending = TRUE,
numPartitions = 1L) {
standardGeneric("sortBy")
})

#' @rdname sortBy
#' @aliases sortBy,RDD,RDD-method
setMethod("sortBy",
signature(rdd = "RDD", func = "function"),
function(rdd, func, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) {
values(sortByKey(keyBy(rdd, func), ascending, numPartitions))
})

############ Shuffle Functions ############

#' Partition an RDD by key
Expand Down Expand Up @@ -1858,6 +1888,76 @@ setMethod("cogroup",
group.func)
})

#' Sort a (k, v) pair RDD by k.
#'
#' @param rdd A (k, v) pair RDD to be sorted.
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all (k, v) pair elements are sorted.
#' @rdname sortByKey
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
#'}
setGeneric("sortByKey", function(rdd,
ascending = TRUE,
numPartitions = 1L) {
standardGeneric("sortByKey")
})

#' @rdname sortByKey
#' @aliases sortByKey,RDD,RDD-method
setMethod("sortByKey",
signature(rdd = "RDD"),
function(rdd, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) {
rangeBounds <- list()

if (numPartitions > 1) {
rddSize <- count(rdd)
# constant from Spark's RangePartitioner
maxSampleSize <- numPartitions * 20
fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)

samples <- collect(keys(sampleRDD(rdd, FALSE, fraction, 1L)))

# Note: the built-in R sort() function only works on atomic vectors
samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)

if (length(samples) > 0) {
rangeBounds <- lapply(seq_len(numPartitions - 1),
function(i) {
j <- ceiling(length(samples) * i / numPartitions)
samples[j]
})
}
}

rangePartitionFunc <- function(key) {
partition <- 0

# TODO: Use binary search instead of linear search, similar with Spark
while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
partition <- partition + 1
}

if (ascending) {
partition
} else {
numPartitions - partition - 1
}
}

partitionFunc <- function(part) {
sortKeyValueList(part, decreasing = !ascending)
}

newRDD <- partitionBy(rdd, numPartitions, rangePartitionFunc)
lapplyPartition(newRDD, partitionFunc)
})

# TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
Expand Down
4 changes: 2 additions & 2 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ initAccumulator <- function() {

# Utility function to sort a list of key value pairs
# Used in unit tests
sortKeyValueList <- function(kv_list) {
sortKeyValueList <- function(kv_list, decreasing = FALSE) {
keys <- sapply(kv_list, function(x) x[[1]])
kv_list[order(keys)]
kv_list[order(keys, decreasing = decreasing)]
}

# Utility function to generate compact R lists from grouped rdd
Expand Down
61 changes: 60 additions & 1 deletion pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ test_that("keyBy on RDDs", {
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
})

test_that("sortBy() on RDDs", {
sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
actual <- collect(sortedRdd)
expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))

rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
actual <- collect(sortedRdd2)
expect_equal(actual, as.list(nums))
})

test_that("keys() on RDDs", {
keys <- keys(intRdd)
actual <- collect(keys)
Expand Down Expand Up @@ -387,6 +398,55 @@ test_that("fullOuterJoin() on pairwise RDDs", {
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})

test_that("sortByKey() on pairwise RDDs", {
numPairsRdd <- map(rdd, function(x) { list (x, x) })
sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
actual <- collect(sortedRdd)
numPairs <- lapply(nums, function(x) { list (x, x) })
expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))

rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
sortedRdd2 <- sortByKey(numPairsRdd2)
actual <- collect(sortedRdd2)
expect_equal(actual, numPairs)

# sort by string keys
l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
rdd3 <- parallelize(sc, l, 2L)
sortedRdd3 <- sortByKey(rdd3)
actual <- collect(sortedRdd3)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))

# test on the boundary cases

# boundary case 1: the RDD to be sorted has only 1 partition
rdd4 <- parallelize(sc, l, 1L)
sortedRdd4 <- sortByKey(rdd4)
actual <- collect(sortedRdd4)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))

# boundary case 2: the sorted RDD has only 1 partition
rdd5 <- parallelize(sc, l, 2L)
sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
actual <- collect(sortedRdd5)
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))

# boundary case 3: the RDD to be sorted has only 1 element
l2 <- list(list("a", 1))
rdd6 <- parallelize(sc, l2, 2L)
sortedRdd6 <- sortByKey(rdd6)
actual <- collect(sortedRdd6)
expect_equal(actual, l2)

# boundary case 4: the RDD to be sorted has 0 element
l3 <- list()
rdd7 <- parallelize(sc, l3, 2L)
sortedRdd7 <- sortByKey(rdd7)
actual <- collect(sortedRdd7)
expect_equal(actual, l3)
})

test_that("collectAsMap() on a pairwise RDD", {
rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
vals <- collectAsMap(rdd)
Expand All @@ -404,4 +464,3 @@ test_that("collectAsMap() on a pairwise RDD", {
vals <- collectAsMap(rdd)
expect_equal(vals, list(`1` = "a", `2` = "b"))
})

36 changes: 36 additions & 0 deletions pkg/man/sortBy.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{sortBy}
\alias{sortBy}
\alias{sortBy,RDD,RDD-method}
\alias{sortBy,RDD,function,missingOrLogical,missingOrInteger-method}
\title{Sort an RDD by the given key function.}
\usage{
sortBy(rdd, func, ascending, numPartitions)

\S4method{sortBy}{RDD,`function`,missingOrLogical,missingOrInteger}(rdd, func,
ascending, numPartitions)
}
\arguments{
\item{rdd}{An RDD to be sorted.}

\item{func}{A function used to compute the sort key for each element.}

\item{ascending}{A flag to indicate whether the sorting is ascending or descending.}

\item{numPartitions}{Number of partitions to create.}
}
\value{
An RDD where all elements are sorted.
}
\description{
Sort an RDD by the given key function.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(3, 2, 1))
collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
}
}

34 changes: 34 additions & 0 deletions pkg/man/sortByKey.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{sortByKey}
\alias{sortByKey}
\alias{sortByKey,RDD,RDD-method}
\alias{sortByKey,RDD,missingOrLogical,missingOrInteger-method}
\title{Sort a (k, v) pair RDD by k.}
\usage{
sortByKey(rdd, ascending, numPartitions)

\S4method{sortByKey}{RDD,missingOrLogical,missingOrInteger}(rdd, ascending,
numPartitions)
}
\arguments{
\item{rdd}{A (k, v) pair RDD to be sorted.}

\item{ascending}{A flag to indicate whether the sorting is ascending or descending.}

\item{numPartitions}{Number of partitions to create.}
}
\value{
An RDD where all (k, v) pair elements are sorted.
}
\description{
Sort a (k, v) pair RDD by k.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
}
}

0 comments on commit bd6705b

Please sign in to comment.