Skip to content

Commit

Permalink
Latest review round.
Browse files Browse the repository at this point in the history
  • Loading branch information
dragos committed Jul 29, 2015
1 parent 5125e60 commit f168c94
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonDummyReceiver}
import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonTestRateReceiver}
import org.apache.spark.util.{Clock, ManualClock, Utils}

/**
Expand Down Expand Up @@ -401,13 +401,13 @@ class CheckpointSuite extends TestSuiteBase {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
}
SingletonDummyReceiver.reset()
SingletonTestRateReceiver.reset()

val output = new TestOutputStreamWithPartitions(dstream.checkpoint(batchDuration * 2))
output.register()
runStreams(ssc, 5, 5)

SingletonDummyReceiver.reset()
SingletonTestRateReceiver.reset()
ssc = new StreamingContext(checkpointDir)
ssc.start()
val outputNew = advanceTimeWithRealDelay(ssc, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,17 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator

class RateControllerSuite extends TestSuiteBase {

override def actuallyWait: Boolean = true
override def useManualClock: Boolean = false

test("rate controller publishes updates") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
val output = new TestOutputStreamWithPartitions(dstream)
output.register()
runStreams(ssc, 1, 1)
val dstream = new RateLimitInputDStream(ssc)
dstream.register()
ssc.start()

eventually(timeout(2.seconds)) {
assert(dstream.publishCalls === 1)
eventually(timeout(10.seconds)) {
assert(dstream.publishCalls > 0)
}
}
}
Expand All @@ -53,13 +52,11 @@ class RateControllerSuite extends TestSuiteBase {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
}
SingletonDummyReceiver.reset()
dstream.register()
SingletonTestRateReceiver.reset()
ssc.start()

val output = new TestOutputStreamWithPartitions(dstream)
output.register()
runStreams(ssc, 2, 2)

eventually(timeout(5.seconds)) {
eventually(timeout(10.seconds)) {
assert(dstream.getCurrentRateLimit === Some(200))
}
}
Expand All @@ -74,58 +71,19 @@ class RateControllerSuite extends TestSuiteBase {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
}
SingletonDummyReceiver.reset()

val output = new TestOutputStreamWithPartitions(dstream)
output.register()
SingletonTestRateReceiver.reset()
dstream.register()

val observedRates = mutable.HashSet.empty[Long]
ssc.start()

@volatile var done = false
runInBackground {
while (!done) {
try {
dstream.getCurrentRateLimit.foreach(observedRates += _)
} catch {
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
}
Thread.sleep(20)
}
eventually(timeout(20.seconds)) {
dstream.getCurrentRateLimit.foreach(observedRates += _)
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
}
runStreams(ssc, 4, 4)
done = true

// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
}
}

private def runInBackground(f: => Unit): Unit = {
new Thread {
override def run(): Unit = {
f
}
}.start()
}
}

/**
* An InputDStream that counts how often its rate controller `publish` method was called.
*/
private class MockRateLimitDStream[T: ClassTag](
@transient ssc: StreamingContext,
input: Seq[Seq[T]],
numPartitions: Int) extends TestInputStream[T](ssc, input, numPartitions) {

@volatile
var publishCalls = 0

override val rateController: Option[RateController] =
Some(new RateController(id, new ConstantEstimator(100.0)) {
override def publish(rate: Long): Unit = {
publishCalls += 1
}
})
}

private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {

test("scheduleReceivers: " +
"schedule receivers evenly when there are more receivers than executors") {
val receivers = (0 until 6).map(new DummyReceiver(_))
val receivers = (0 until 6).map(new RateTestReceiver(_))
val executors = (10000 until 10003).map(port => s"localhost:${port}")
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
Expand All @@ -79,7 +79,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {

test("scheduleReceivers: " +
"schedule receivers evenly when there are more executors than receivers") {
val receivers = (0 until 3).map(new DummyReceiver(_))
val receivers = (0 until 3).map(new RateTestReceiver(_))
val executors = (10000 until 10006).map(port => s"localhost:${port}")
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
Expand All @@ -94,8 +94,8 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
}

test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
val receivers = (0 until 3).map(new DummyReceiver(_)) ++
(3 until 6).map(new DummyReceiver(_, Some("localhost")))
val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
(3 until 6).map(new RateTestReceiver(_, Some("localhost")))
val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
(10003 until 10006).map(port => s"localhost2:${port}")
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
Expand All @@ -121,7 +121,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
}

test("scheduleReceivers: return empty scheduled executors if no executors") {
val receivers = (0 until 3).map(new DummyReceiver(_))
val receivers = (0 until 3).map(new RateTestReceiver(_))
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
scheduledExecutors.foreach { case (receiverId, executors) =>
assert(executors.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {

ssc.addStreamingListener(ReceiverStartedWaiter)
ssc.scheduler.listenerBus.start(ssc.sc)
SingletonDummyReceiver.reset()
SingletonTestRateReceiver.reset()

val newRateLimit = 100L
val inputDStream = new RateLimitInputDStream(ssc)
Expand Down Expand Up @@ -74,17 +74,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContext)
extends ReceiverInputDStream[Int](ssc_) {

override def getReceiver(): DummyReceiver = SingletonDummyReceiver
override def getReceiver(): RateTestReceiver = SingletonTestRateReceiver

def getCurrentRateLimit: Option[Long] = {
invokeExecutorMethod.getCurrentRateLimit
}

@volatile
var publishCalls = 0

override val rateController: Option[RateController] = {
Some(new RateController(id, new ConstantEstimator(100.0)) {
override def publish(rate: Long): Unit = {
publishCalls += 1
}
})
}

private def invokeExecutorMethod: ReceiverSupervisor = {
val c = classOf[Receiver[_]]
val ex = c.getDeclaredMethod("executor")
ex.setAccessible(true)
ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor]
ex.invoke(SingletonTestRateReceiver).asInstanceOf[ReceiverSupervisor]
}
}

Expand All @@ -96,7 +107,7 @@ private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContex
* @note It's necessary to be a top-level object, or else serialization would create another
* one on the executor side and we won't be able to read its rate limit.
*/
private[streaming] object SingletonDummyReceiver extends DummyReceiver(0) {
private[streaming] object SingletonTestRateReceiver extends RateTestReceiver(0) {

/** Reset the object to be usable in another test. */
def reset(): Unit = {
Expand All @@ -107,7 +118,7 @@ private[streaming] object SingletonDummyReceiver extends DummyReceiver(0) {
/**
* Dummy receiver implementation
*/
private[streaming] class DummyReceiver(receiverId: Int, host: Option[String] = None)
private[streaming] class RateTestReceiver(receiverId: Int, host: Option[String] = None)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {

setReceiverId(receiverId)
Expand Down

0 comments on commit f168c94

Please sign in to comment.