Skip to content

Commit

Permalink
Minor updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 13, 2014
1 parent ec2e95e commit d9a452c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)

var topic: String = _

var groupId: String = _
var kafkaParams: Map[String, String] = _
var ssc: StreamingContext = _
var tempDirectory: File = null

before {
setupKafka()
topic = s"test-topic-${Random.nextInt(10000)}"
groupId = s"test-consumer-${Random.nextInt(10000)}"
kafkaParams = Map(
"zookeeper.connect" -> zkAddress,
Expand All @@ -78,6 +77,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter


test("Reliable Kafka input stream with single topic") {
var topic = "test-topic"
createTopic(topic)
produceAndSendMessage(topic, data)

Expand All @@ -95,7 +95,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
}
}
ssc.start()

eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
// A basic process verification for ReliableKafkaReceiver.
// Verify whether received message number is equal to the sent message number.
Expand All @@ -104,26 +103,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
data.keys.foreach { k => assert(data(k) === result(k).toInt) }
// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === Some(29L))

}
ssc.stop()
}
/*
test("Verify the offset commit") {
// Verify the correctness of offset commit mechanism.
createTopic(topic)
produceAndSendMessage(topic, data)

// Do this to consume all the message of this group/topic.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
}
ssc.stop()
}
*/
test("Reliable Kafka input stream with multiple topics") {
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
Expand Down Expand Up @@ -152,7 +135,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
assert(zkClient != null, "Zookeeper client is not initialized")
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
offset
ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
}
}

0 comments on commit d9a452c

Please sign in to comment.