Skip to content

Commit

Permalink
Add Spark Streaming example that reads from Kafka and writes to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
miguno committed Sep 29, 2014
1 parent d19c408 commit 83a389a
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ IMPROVEMENTS
* Use Storm 0.9.2. This includes two notable improvements:
* We can and do use the Kafka 0.8 compatible Kafka spout included in Storm 0.9.2.
* We use ZooKeeper 3.4.5, up from 3.3.x before.
* Add Spark Streaming example, which reads from Kafka and writes to Kafka. The streaming job is fucntionally equivalent
to the test topologies in `KafkaStormSpec`.
* AvroKafkaSinkBolt should not declare any output fields because it writes to Kafka only, it does not emit any tuples.
* Reduce logging output when running tests to minimize distraction and confusion.
* Disable ZooKeeper reconnection attempts in the test topology of `KafkaStormSpec` to prevent the Kafka spout from
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ What features do we showcase in kafka-storm-starter? Note that we focus on show
[KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala)
* A Storm topology that writes Avro-encoded data to Kafka:
[KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala)
* Kafka and Spark Streaming integration
* [KafkaSparkStreamingSpec](src/test/scala/com/miguno/kafkastorm/spark/KafkaSparkStreamingSpec.scala) a streaming
job that reads input data from Kafka and writes output data to Kafka. Demonstrates how to read from all
partitions of a topic in parallel, how to decouple the downstream parallelism from the number of parttions
(think: use 20 "threads" for processing the Kafka data even though the Kafka topic has only 5 partitions),
and how to write output of the streaming job back into Kafka. The input and output data is in Avro format,
and we use Twitter Bijection for the serialization work.
* Unit testing
* [AvroDecoderBoltSpec](src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala)
* [AvroSchemeSpec](src/test/scala/com/miguno/kafkastorm/storm/serialization/AvroSchemeSpec.scala)
Expand Down
6 changes: 3 additions & 3 deletions assembly.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ assemblySettings
// https://github.com/sbt/sbt-assembly#-provided-configuration
// http://stackoverflow.com/a/21803413/3827
//
// In our case, the Storm dependency must be set to "provided (cf. `build.sbt`) because, when deploying and launching
// our Storm topology code "for real" to a distributed Storm cluster, Storm wants us to exclude the Storm dependencies
// (jars) as they are provided [no pun intended] by the Storm cluster.
// In our case, the Storm and Spark dependencies must be set to "provided (cf. `build.sbt`) because, when deploying and
// launching our Storm/Spark jobs "for real" to distributed clusters, Storm/Spark want us to exclude the Storm/Spark
// dependencies (jars) as they are provided [no pun intended] by the respective clusters.
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

mergeStrategy in assembly <<= (mergeStrategy in assembly) {
Expand Down
14 changes: 14 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ resolvers ++= Seq(
"clojars-repository" at "https://clojars.org/repo"
)

val sparkVersion = "1.1.0"

libraryDependencies ++= Seq(
"com.twitter" %% "bijection-core" % "0.6.3",
"com.twitter" %% "bijection-avro" % "0.6.3",
Expand All @@ -44,11 +46,23 @@ libraryDependencies ++= Seq(
exclude("org.slf4j", "log4j-over-slf4j"),
"org.apache.storm" % "storm-kafka" % "0.9.2-incubating"
exclude("org.apache.zookeeper", "zookeeper"),
"org.apache.spark" %% "spark-core" % sparkVersion
exclude("org.apache.zookeeper", "zookeeper")
exclude("org.slf4j", "slf4j-api")
exclude("org.slf4j", "slf4j-log4j12")
exclude("org.slf4j", "jul-to-slf4j")
exclude("org.slf4j", "jcl-over-slf4j")
exclude("com.twitter", "chill_2.10")
exclude("log4j", "log4j"),
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
exclude("org.apache.zookeeper", "zookeeper"),
"com.101tec" % "zkclient" % "0.4"
exclude("org.apache.zookeeper", "zookeeper"),
"org.apache.curator" % "curator-test" % "2.4.0"
exclude("org.jboss.netty", "netty")
exclude("org.slf4j", "slf4j-log4j12"),
"commons-io" % "commons-io" % "2.4",
"org.apache.commons" % "commons-pool2" % "2.2",
// Logback with slf4j facade
"ch.qos.logback" % "logback-classic" % "1.1.2",
"ch.qos.logback" % "logback-core" % "1.1.2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.miguno.kafkastorm.kafka

import org.apache.commons.pool2.impl.DefaultPooledObject
import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory}

/**
* An object factory for Kafka producer apps, which is used to create a pool of such producers (think: DB connection
* pool).
*
* We use this class in our Spark Streaming examples when writing data to Kafka. A pool is typically the preferred
* pattern to minimize TCP connection overhead when talking to Kafka from a Spark cluster. Another reason is to to
* reduce the number of TCP connections being established with the cluster in order not to strain the cluster.
*
* See the Spark Streaming Programming Guide, section "Design Patterns for using foreachRDD" in
* [[http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#output-operations-on-dstreams Output Operations on DStreams]]
*/
// TODO: Time out / shutdown producers if they haven't been used in a while.
class PooledKafkaProducerAppFactory(val factory: KafkaProducerAppFactory)
extends BasePooledObjectFactory[KafkaProducerApp] with Serializable {

override def create(): KafkaProducerApp = factory.newInstance()

override def wrap(obj: KafkaProducerApp): PooledObject[KafkaProducerApp] = new DefaultPooledObject(obj)

// From the Commons Pool docs: "Invoked on every instance when it is being "dropped" from the pool. There is no
// guarantee that the instance being destroyed will be considered active, passive or in a generally consistent state."
override def destroyObject(p: PooledObject[KafkaProducerApp]): Unit = {
p.getObject.shutdown()
super.destroyObject(p)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.miguno.kafkastorm.spark.serialization

import com.esotericsoftware.kryo.Kryo
import com.miguno.avro.Tweet
import com.twitter.chill.avro.AvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.serializer.KryoRegistrator

/**
* We register custom classes with Kryo, see the explanations in the
* [[http://spark.apache.org/docs/1.1.0/tuning.html#data-serialization Tuning Spark]] guide.
*
* "If you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with
* each object, which is wasteful."
*/
class KafkaSparkStreamingRegistrator extends KryoRegistrator {

override def registerClasses(kryo: Kryo) {
// Registers a serializer for any generic Avro records. The kafka-storm-starter project does not yet include
// examples that work on generic Avro records, but we keep this registration for the convenience of our readers.
kryo.register(classOf[GenericRecord], AvroSerializer.GenericRecordSerializer[GenericRecord]())
// Registers a serializer specifically for the, well, specific Avro record `Tweet`
kryo.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet])
()
}

}
17 changes: 17 additions & 0 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<appender-ref ref="ACCESS"/>
</logger>

<!-- Jetty (used by Spark) is very chatty -->
<logger name="org.eclipse.jetty" additivity="false" >
<level value="ERROR"/>
<appender-ref ref="ACCESS"/>
</logger>

<!--
Squelch CuratorFrameworkImpl to prevent the following error messages from polluting our test output:
Expand Down Expand Up @@ -81,4 +87,15 @@
<appender-ref ref="ACCESS"/>
</logger>

<!--
Squelch NIOServerCnxnFactory from reporting SparkException's, which are expected during the shutdown phase of our
Spark Streaming examples:
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
-->
<logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" additivity="false" >
<level value="OFF"/>
<appender-ref ref="ACCESS"/>
</logger>

</configuration>
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.miguno.kafkastorm.integration

import com.miguno.kafkastorm.spark.KafkaSparkStreamingSpec
import org.scalatest.Stepwise

class IntegrationSuite extends Stepwise(
new KafkaSpec,
new StormSpec,
new KafkaStormSpec
new KafkaStormSpec,
new KafkaSparkStreamingSpec
)
Loading

0 comments on commit 83a389a

Please sign in to comment.