Skip to content

Commit

Permalink
Latest round of reviews.
Browse files Browse the repository at this point in the history
  • Loading branch information
dragos committed Jul 28, 2015
1 parent e9fb45e commit 475e346
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] =
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
if (RateController.isBackPressureEnabled(ssc.conf))
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
else
None

/**
* Gets the receiver object that will be sent to the worker nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkConf
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.{ThreadUtils, Utils}

Expand Down Expand Up @@ -83,3 +83,8 @@ private[streaming] abstract class RateController(val streamUID: Int, rateEstimat
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}

object RateController {
def isBackPressureEnabled(conf: SparkConf): Boolean =
conf.getBoolean("spark.streaming.backpressure.enable", false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,65 +34,70 @@ class RateControllerSuite extends TestSuiteBase {

test("rate controller publishes updates") {
val ssc = new StreamingContext(conf, batchDuration)
val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
val output = new TestOutputStreamWithPartitions(dstream)
output.register()
runStreams(ssc, 1, 1)

eventually(timeout(2.seconds)) {
assert(dstream.publishCalls === 1)
withStreamingContext(ssc) { ssc =>
val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
val output = new TestOutputStreamWithPartitions(dstream)
output.register()
runStreams(ssc, 1, 1)

eventually(timeout(2.seconds)) {
assert(dstream.publishCalls === 1)
}
}
}

test("receiver rate controller updates reach receivers") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
}
SingletonDummyReceiver.reset()

val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
}
SingletonDummyReceiver.reset()

val output = new TestOutputStreamWithPartitions(dstream)
output.register()
runStreams(ssc, 2, 2)
val output = new TestOutputStreamWithPartitions(dstream)
output.register()
runStreams(ssc, 2, 2)

eventually(timeout(5.seconds)) {
assert(dstream.getCurrentRateLimit === Some(200))
eventually(timeout(5.seconds)) {
assert(dstream.getCurrentRateLimit === Some(200))
}
}
}

test("multiple rate controller updates reach receivers") {
val ssc = new StreamingContext(conf, batchDuration)
val rates = Seq(100L, 200L, 300L)
withStreamingContext(ssc) { ssc =>
val rates = Seq(100L, 200L, 300L)

val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
}
SingletonDummyReceiver.reset()

val output = new TestOutputStreamWithPartitions(dstream)
output.register()

val observedRates = mutable.HashSet.empty[Long]

@volatile var done = false
runInBackground {
while (!done) {
try {
dstream.getCurrentRateLimit.foreach(observedRates += _)
} catch {
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
}
SingletonDummyReceiver.reset()

val output = new TestOutputStreamWithPartitions(dstream)
output.register()

val observedRates = mutable.HashSet.empty[Long]

@volatile var done = false
runInBackground {
while (!done) {
try {
dstream.getCurrentRateLimit.foreach(observedRates += _)
} catch {
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
}
Thread.sleep(20)
}
Thread.sleep(20)
}
}
runStreams(ssc, 4, 4)
done = true
runStreams(ssc, 4, 4)
done = true

// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
}
}

private def runInBackground(f: => Unit): Unit = {
Expand Down

0 comments on commit 475e346

Please sign in to comment.