diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e7d53a0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +*.class +*.log + +# sbt specific +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Kafka +logs/* diff --git a/.sbtopts b/.sbtopts new file mode 100644 index 0000000..f38f1cb --- /dev/null +++ b/.sbtopts @@ -0,0 +1,2 @@ +-J-Xmx512m +-Djava.awt.headless=true diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..5be4bbe --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +# 0.1.0 (May 27, 2014) + +* Initial release. Integrates Kafka 0.8.1.1 with Storm 0.9.1-incubating. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..952b8cf --- /dev/null +++ b/LICENSE @@ -0,0 +1,13 @@ +Copyright © 2014 Michael G. Noll + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..e668b2e --- /dev/null +++ b/README.md @@ -0,0 +1,804 @@ +# kafka-storm-starter + +Code examples that show how to integrate +[Apache Kafka](http://kafka.apache.org/) 0.8+ (latest stable) with +[Apache Storm](http://storm.incubator.apache.org/) 0.9+ (latest stable), +while using [Apache Avro](http://avro.apache.org/) as the data serialization format. + +--- + +Table of Contents + +* Quick start +* Features +* Implementation details +* Development + * Build requirements + * Building the code + * Running the tests + * Creating code coverage reports + * Packaging the code + * IDE support +* FAQ + * Kafka + * Storm +* Known issues and limitations + * Upstream code + * kafka-storm-starter code +* Change log +* Contributing +* License +* References + * Wirbelsturm + * Kafka + * Storm + * Avro + * Kryo + +--- + + + + +# Quick start + +## Show me! + + $ ./sbt test + +This command launches our test suite. + +Notably it will run end-to-end tests of Kafka, Storm, and Kafka-Storm integration. See this abridged version of the +test output: + +``` +[...other tests removed...] + +[info] KafkaSpec: +[info] Kafka +[info] - should synchronously send and receive a Tweet in Avro format +[info] + Given a ZooKeeper instance +[info] + And a Kafka broker instance +[info] + And some tweets +[info] + And a single-threaded Kafka consumer group +[info] + When I start a synchronous Kafka producer that sends the tweets in Avro binary format +[info] + Then the consumer app should receive the tweets +[info] - should asynchronously send and receive a Tweet in Avro format +[info] + Given a ZooKeeper instance +[info] + And a Kafka broker instance +[info] + And some tweets +[info] + And a single-threaded Kafka consumer group +[info] + When I start an asynchronous Kafka producer that sends the tweets in Avro binary format +[info] + Then the consumer app should receive the tweets +[info] StormSpec: +[info] Storm +[info] - should start a local cluster +[info] + Given no cluster +[info] + When I start a LocalCluster instance +[info] + Then the local cluster should start properly +[info] - should run a basic topology +[info] + Given a local cluster +[info] + And a wordcount topology +[info] + And the input words alice, bob, joe, alice +[info] + When I submit the topology +[info] + Then the topology should properly count the words +[info] KafkaStormSpec: +[info] Feature: AvroDecoderBolt[T] +[info] Scenario: User creates a Storm topology that uses AvroDecoderBolt +[info] Given a ZooKeeper instance +[info] And a Kafka broker instance +[info] And a Storm topology that uses AvroDecoderBolt and that reads tweets from topic testing-input and writes them as-is to topic testing-output +[info] And some tweets +[info] And a synchronous Kafka producer app that writes to the topic testing-input +[info] And a single-threaded Kafka consumer app that reads from topic testing-output +[info] And a Storm topology configuration that registers an Avro Kryo decorator for Tweet +[info] When I run the Storm topology +[info] And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka +[info] Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology +[info] Feature: AvroScheme[T] for Kafka spout +[info] Scenario: User creates a Storm topology that uses AvroScheme in Kafka spout +[info] Given a ZooKeeper instance +[info] And a Kafka broker instance +[info] And a Storm topology that uses AvroScheme and that reads tweets from topic testing-input and writes them as-is to topic testing-output +[info] And some tweets +[info] And a synchronous Kafka producer app that writes to the topic testing-input +[info] And a single-threaded Kafka consumer app that reads from topic testing-output +[info] And a Storm topology configuration that registers an Avro Kryo decorator for Tweet +[info] When I run the Storm topology +[info] And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka +[info] Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology +[info] Run completed in 21 seconds, 852 milliseconds. +[info] Total number of tests run: 25 +[info] Suites: completed 8, aborted 0 +[info] Tests: succeeded 25, failed 0, canceled 0, ignored 0, pending 0 +[info] All tests passed. +[success] Total time: 22 s, completed May 23, 2014 12:31:09 PM +``` + + +## Show me one more time! + + $ ./sbt run + +This command launches [KafkaStormDemo](src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala). This demo +starts in-memory instances of ZooKeeper, Kafka, and Storm. It then runs a demo Storm topology that connects to and +reads from the Kafka instance. + +You will see output similar to the following (some parts removed to improve readability): + +``` +7031 [Thread-19] INFO backtype.storm.daemon.worker - Worker 3f7f1a51-5c9e-43a5-b431-e39a7272215e for storm kafka-storm-starter-1-1400839826 on daa60807-d440-4b45-94fc-8dd7798453d2:1027 has finished loading +7033 [Thread-29-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}} +7050 [Thread-29-kafka-spout] INFO backtype.storm.daemon.executor - Opened spout kafka-spout:(1) +7051 [Thread-29-kafka-spout] INFO backtype.storm.daemon.executor - Activating spout kafka-spout:(1) +7051 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections +7065 [Thread-29-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}} +7066 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: [] +7066 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [Partition{host=127.0.0.1:9092, partition=0}] +7083 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Read partition information from: /kafka-spout/kafka-storm-starter/partition_0 --> null +7100 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset +7105 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 18 +7106 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing +7126 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0} +7126 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527 +9128 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0} +9129 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527 +``` + +At this point Storm is connected to Kafka (more precisely: to the `testing` topic in Kafka). The last few lines from +above -- "Committing offset ..." --- will be repeated again and again, because a) this demo Storm topology only reads +from the Kafka topic but it does nothing to the data that was read and b) because we are not sending any data to the +Kafka topic. + +Note that this example will actually run _two_ in-memory instances of ZooKeeper: the first (listening at +`127.0.0.1:2181/tcp`) is used by the Kafka instance, the second (listening at `127.0.0.1:2000/tcp`) is automatically +started and used by the in-memory Storm cluster. This is because, when running in local aka in-memory mode, Storm does +not allow you to reconfigure or disable its own ZooKeeper instance (see the [Storm FAQ](#FAQ-Storm) below for further +information). + +**To stop the demo application you must kill or `Ctrl-C` the process in the terminal.** + +You can use [KafkaStormDemo](src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala) as a starting point to +create your own, "real" Storm topologies that read from a "real" Kafka, Storm, and ZooKeeper infrastructure. An easy +way to get started with such an infrastructure is by deploying Kafka, Storm, and ZooKeeper via a tool such as +[Wirbelsturm](https://github.com/miguno/wirbelsturm). + + + + + +# Features + +What features do we showcase in kafka-storm-starter? Note that we focus on showcasing, and not necessarily on +"production ready". + +* How to integrate Kafka and Storm. +* How to use [Avro](http://avro.apache.org/) with Kafka and Storm. +* Kafka standalone code examples + * [KafkaProducerApp](src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala): + A simple Kafka producer app for writing Avro-encoded data into Kafka. + [KafkaSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala) puts this producer to use and shows + how to use Twitter Bijection to Avro-encode the messages being sent to Kafka. + * [KafkaConsumer](src/main/scala/com/miguno/kafkastorm/kafka/KafkaConsumer.scala): + A simple Kafka consumer app for reading Avro-encoded data from Kafka. + [KafkaSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala) puts this consumer to use and shows + how to use Twitter Bijection to Avro-decode the messages being read from Kafka. +* Storm standalone code examples + * [AvroDecoderBolt[T]](src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala): + An `AvroDecoderBolt[T <: org.apache.avro.specific.SpecificRecordBase]` that can be parameterized with the type of + the Avro record `T` it will deserialize its data to (i.e. no need to write another decoder bolt just because the + bolt needs to handle a different Avro schema). + * [AvroScheme[T]](src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala): + An `AvroScheme[T <: org.apache.avro.specific.SpecificRecordBase]` scheme, i.e. a custom + `backtype.storm.spout.Scheme` to auto-deserialize a spout's incoming data. The scheme can be parameterized with + the type of the Avro record `T` it will deserializes its data to (i.e. no need to write another scheme just + because the scheme needs to handle a different Avro schema). + * You can opt to configure a spout (such as the Kafka spout) with `AvroScheme` if you want to perform the Avro + decoding step directly in the spout instead of placing an `AvroDecoderBolt` after the Kafka spout. You may + want to profile your topology which of the two approaches works best for your use case. + * [TweetAvroKryoDecorator](src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala): + A custom `backtype.storm.serialization.IKryoDecorator`, i.e. a custom + [Kryo serializer for Storm](http://storm.incubator.apache.org/documentation/Serialization.html). + * Unfortunately we have not figured out a way to implement a parameterized `AvroKryoDecorator[T]` variant yet. + (A "straight-forward" approach we tried -- similar to the other parameterized components -- compiled fine but + failed at runtime when running the tests). Code contributions are welcome! +* Kafka and Storm integration + * [AvroKafkaSinkBolt[T]](src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala): + An `AvroKafkaSinkBolt[T <: org.apache.avro.specific.SpecificRecordBase]` that can be parameterized with the type + of the Avro record `T` it will serialize its data to before sending the encoded data to Kafka (i.e. no + need to write another Kafka sink bolt just because the bolt needs to handle a different Avro schema). + * Storm topologies that read Avro-encoded data from Kafka: + [KafkaStormDemo](src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala) and + [KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala) + * A Storm topology that writes Avro-encoded data to Kafka: + [KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala) +* Unit testing + * [AvroDecoderBoltSpec](src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala) + * [AvroSchemeSpec](src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala) + * And more under [src/test/scala](src/test/scala/com/miguno/kafkastorm/). +* Integration testing + * [KafkaSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala): + Tests for Kafka, which launch and run against in-memory instances of Kafka and ZooKeeper. + * [StormSpec](src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala): + Tests for Storm, which launch and run against in-memory instances of Storm and ZooKeeper. + * [KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala): + Tests for integrating Storm and Kafka, which launch and run against in-memory instances of Kafka, Storm, and + ZooKeeper. + + + + +# Implementation details + +* We use [Twitter Bijection](https://github.com/twitter/bijection) for Avro encoding and decoding. +* We use [Twitter Chill](https://github.com/twitter/chill/) (which in turn uses Bijection) to implement a + [custom Kryo serializer for Storm](src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala) that + handles our Avro-derived Java class `Tweet` from [twitter.avsc](src/main/avro/twitter.avsc). +* Unit and integration tests are implemented with [ScalaTest](http://scalatest.org/). +* We use [ZooKeeper 3.3.4](https://zookeeper.apache.org/) instead of the latest version 3.4.5. + See section _Known issues_ below for why we do that. +* We use the Kafka spout [wurstmeister/storm-kafka-0.8-plus](https://github.com/wurstmeister/storm-kafka-0.8-plus). + Unfortunately that spout is not yet released for Scala 2.10. For that reason [@miguno](https://github.com/miguno/) + has [forked and branched](https://github.com/miguno/storm-kafka-0.8-plus/tree/miguno_clojars) the code to add Scala + 2.10 support, and released such a version to [Clojars](https://clojars.org/com.miguno/storm-kafka-0.8-plus_2.10). + See [build.sbt](build.sbt) for details. + * _Once Storm 0.9.2 is released we will migrate to the new_ + _[Kafka spout that ships with Storm](https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)_ + _(which is based on the spout developed by wurstmeister)._ + + + + +# Development + + + + +## Git setup: git-flow + +This project follows the [git-flow](https://github.com/nvie/gitflow) approach. This means, for instance, that: + +* The branch `develop` is used for integration of the "next release". +* The branch `master` is used for bringing forth production releases. + +See [git-flow](https://github.com/nvie/gitflow) and the introduction article +[Why aren't you using git-flow?](http://jeffkreeftmeijer.com/2010/why-arent-you-using-git-flow/) for details. + + + + +## Build requirements + +* [Scala](http://www.scala-lang.org/) 2.10.4 +* [sbt](http://www.scala-sbt.org/) 0.13.2 +* Oracle Java JDK 6 (version 6 is still recommended for use with Kafka and Storm) + * The code _in this project_ works with Java 7, too. However, some dependencies we use are not published for Java 7 + yet. + + +### Install on Mac OS X + +_The instructions below assume you have [Homebrew](http://brew.sh/) installed on your Mac._ + +First, install Oracle Java JDK 6 for Mac: + +* [Java 6 for Mac OS X](http://support.apple.com/downloads/DL1572/en_US/JavaForOSX2013-05.dmg) aka + "Java for OS X 2013-005". This will give you Java 1.6.0_65. + +Then, install Scala and sbt: + + $ brew update + $ brew install scala210 sbt + + +### Install on RHEL/CentOS 6 + +First, install Oracle Java JDK 6: + +* Follow [these instructions](http://www.if-not-true-then-false.com/2010/install-sun-oracle-java-jdk-jre-6-on-fedora-centos-red-hat-rhel/) + (untested!). + * Note: As a RHEL 6 user you may have access to a ready-to-use RPM package of Oracle JDK 6 in your existing yum + repositories as part of the RedHat Network (RHN). If so, you do not need to follow the instructions in the link + above. Instead, you only need to run e.g. `sudo yum install java-1.6.0-sun-devel` + ([details](https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/Installation_Guide/appe-install_jdk.html)). + +Then, install Scala and sbt: + + $ sudo yum install http://www.scala-lang.org/files/archive/scala-2.10.4.rpm + $ sudo yum install http://dl.bintray.com/sbt/rpm/sbt-0.13.2.rpm + + +See [Download Scala 2.10.4](http://www.scala-lang.org/download/2.10.4.html) and +[Installing sbt](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html) for details. + + +### Install on Ubuntu/Debian + +First, install Oracle JDK 6: + +* Follow [these instructions](http://linuxg.net/how-to-install-oracle-java-jdk-678-on-ubuntu-13-04-12-10-12-04/) + (untested!). Note that by following these instructions you will install Oracle JDK/JRE from a third-party PPA package + repository (`ppa:webupd8team/java`, managed by [webupd8](http://www.webupd8.org/)). Unfortunately Oracle does not + provide official apt repositories for Ubuntu, and the Ubuntu team was required to remove "their" Oracle JDK/JRE + packages from the Ubuntu repositories because of licensing issues with Oracle. + +Then, install Scala and sbt: + + $ wget http://www.scala-lang.org/files/archive/scala-2.10.4.deb + $ sudo dpkg -i scala-2.10.4.deb + $ wget http://dl.bintray.com/sbt/debian/sbt-0.13.2.deb + $ sudo dpkg -i http://dl.bintray.com/sbt/debian/sbt-0.13.2.deb + +See [Download Scala 2.10.4](http://www.scala-lang.org/download/2.10.4.html) and +[Installing sbt](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html) for details. + + + + +## Building the code + + $ ./sbt clean compile + +If you want to only (re)generate Java classes from Avro schemas: + + $ ./sbt avro:generate + +Generated Java sources are stored under `target/scala-*/src_managed/main/compiled_avro/`. + + + + +## Running the tests + + $ ./sbt clean test + +Here are some examples that demonstrate how you can run only a certain subset of tests: + + # Use `-l` to exclude tests by tag: + # Run all tests WITH THE EXCEPTION of those tagged as integration tests + $ ./sbt "test-only * -- -l com.miguno.kafkastorm.integration.IntegrationTest" + + # Use `-n` to include tests by tag (and skip all tests that lack the tag): + # Run ONLY tests tagged as integration tests + $ ./sbt "test-only * -- -n com.miguno.kafkastorm.integration.IntegrationTest" + + # Run only the tests in suite AvroSchemeSpec: + $ ./sbt "test-only com.miguno.kafkastorm.storm.AvroSchemeSpec" + + # You can also combine the examples above, of course. + +Test reports in JUnit XML format are written to `target/test-reports/junitxml/*.xml`. Make sure that your actual build +steps run the `./sbt test` task, otherwise the JUnit XML reports will not be generate (note that `./sbt scoverage:test` +_will not_ generate the JUnit XML reports unfortunately). + +Integration with CI servers: + +* Jenkins integration: + * Configure the build job. + * Go to _Post-build Actions_. + * Add a post-build action for _Publish JUnit test result report_. + * In the _Test report XMLs_ field add the pattern `**/target/test-reports/junitxml/*.xml`. + * Now each build of your job will have a _Test Result_ link. +* TeamCity integration: + * Edit the build configuration. + * Select configuration step 3, _Build steps_. + * Under _Additional Build Features_ add a new build feature. + * Use the following build feature configuration: + * Report type: Ant JUnit + * Monitoring rules: `target/test-reports/junitxml/*.xml` + * Now each build of your job will have a _Tests_ tab. + +Further details are available at: + +* How to tag tests in ScalaTest: [Tagging your tests](http://www.scalatest.org/user_guide/tagging_your_tests) +* How to selectively run tests: [Using ScalaTest with sbt](http://www.scalatest.org/user_guide/using_scalatest_with_sbt) + and [How to Run Tagged Scala Tests with SBT and ScalaTest](http://code.hootsuite.com/tagged-tests-with-sbt/) + + + + +## Creating code coverage reports + +We are using [sbt-scoverage](https://github.com/scoverage/sbt-scoverage) to create code coverage reports for unit tests. + +Run the unit tests via: + + $ ./sbt clean scoverage:test + +* An HTML report will be created at `target/scala-2.10/scoverage-report/index.html`. +* XML reports will be created at: + * `./target/scala-2.10/coverage-report/cobertura.xml` + * `./target/scala-2.10/scoverage-report/scoverage.xml` + +Integration with CI servers: + +* Jenkins integration: + * Configure the build. + * Go to _Post-build Actions_. + * Add a post-build action for _Publish Cobertura Coverage Report_. + * In the _Cobertura xml report pattern_ field add the pattern `**/target/scala-2.10/coverage-report/cobertura.xml`. + * Now each build of your job will have a _Coverage Report_ link. +* TeamCity integration: + * Edit the build configuration. + * Select configuration step 1, _General settings_. + * In the _Artifact Paths_ field add the path `target/scala-2.10/scoverage-report/** => coberturareport/`. + * Now each build of your job will have a _Cobertura Coverage Report_ tab. + + + + +## Packaging the code + +To create a normal ("slim") jar: + + $ ./sbt clean package + + >>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT.jar` + +To create a fat jar, which includes any dependencies of kafka-storm-starter: + + $ ./sbt assembly + + >>> Generates `target/scala-2.10/kafka-storm-starter-assembly-0.1.0-SNAPSHOT.jar` + +To create a scaladoc/javadoc jar: + + $ ./sbt packageDoc + + >>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT-javadoc.jar` + +To create a sources jar: + + $ ./sbt packageSrc + + >>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT-sources.jar` + +To create API docs: + + $ ./sbt doc + + >>> Generates `target/scala-2.10/api/*` (HTML files) + + + + +## IDE support + +### IntelliJ IDEA + +kafka-storm-starter integrates the [sbt-idea plugin](https://github.com/mpeltonen/sbt-idea). Use the following command +to build IDEA project files: + + $ ./sbt gen-idea + +You can then open kafka-storm-starter as a project in IDEA via _File > Open..._ and selecting the top-level directory +of kafka-storm-starter. + +**Important note:** There is a bug when using the sbt plugins for Avro and for IntelliJ IDEA in combination. The sbt +plugin for Avro reads the Avro `*.avsc` schemas stored under `src/main/avro` and generates the corresponding Java +classes, which it stores under `target/scala-2.10/src_managed/main/compiled_avro` (in the case of kafka-storm-starter, +a `Tweet.java` class will be generated from the Avro schema [twitter.avsc](src/main/avro/twitter.avsc)). The latter +path must be added to IDEA's _Source Folders_ setting, which will happen automatically for you. However the +aforementioned bug will add a second, incorrect path to _Source Folders_, too, which will cause IDEA to complain about +not being able to find the Avro-generated Java classes (here: the `Tweet` class). + +Until this bug is fixed upstream you can use the following workaround, which you must perform everytime you run +`./sbt gen-idea`: + +1. In IntelliJ IDEA open the project structure for kafka-storm-starter via _File > Project Structure..._. +2. Under _Project settings_ on the left-hand side select _Modules_. +3. Select the _Sources_ tab on the right-hand side. +4. Remove the problematic `target/scala-2.10/src_managed/main/compiled_avro/com` entry from the _Source Folders_ listing + (the source folders are colored in light-blue). Note the trailing `.../com`, which comes from + `com.miguno.avro.Tweet` in the [twitter.avsc](src/main/avro/twitter.avsc) Avro schema. +5. Click Ok. + +See also this screenshot (click to enlarge): + +[![Fix bug in IntelliJIDEA when using avro Avro](images/IntelliJ-IDEA-Avro-bug_400x216.png?raw=true)](images/IntelliJ-IDEA-Avro-bug.png?raw=true) + + +### Eclipse + +kafka-storm-starter integrates the [sbt-eclipse plugin](https://github.com/typesafehub/sbteclipse). Use the following +command to build Eclipse project files: + + $ ./sbt eclipse + +Then use the _Import Wizard_ in Eclipse to import _Existing Projects into Workspace_. + + + + +# FAQ + + + + +## Kafka + +### Where do the unit tests store broker logs in the local filesystem? + +The in-memory Kafka instances that are launched by the unit tests store their Kafka "log" files (i.e. the files that +contain the messages that are being sent to the Kafka topics) under `/tmp/kafka-logs/`. + +You may need to manually remove this directory in case you want start from a clean state. At the moment the unit tests +do not remove this directory for you. + +### ZooKeeper exceptions "KeeperException: NoNode for /[ZK path]" logged at INFO level + +In short you can normally safely ignore those errors -- it's for a reason they are logged at INFO level and not at ERROR +level. + +As described in the mailing list thread [Zookeeper exceptions](http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201204.mbox/%3CCAFbh0Q3BxaAkyBq1_yUHhUkkhxX4RBQZPAA2pkR4U9+m4VY8nA@mail.gmail.com%3E): + +"The reason you see those NoNode error code is the following. Every time we want to create a new [ZK] path, say +`/brokers/ids/1`, we try to create it directly. If this fails because the parent path doesn't exist, we try to create +the parent path first. This will happen recursively. However, the `NoNode` error should show up only once, not every +time a broker is started (assuming ZK data hasn't been cleaned up)." + +A similar answer was given in the thread +[Clean up kafka environment](http://grokbase.com/t/kafka/users/137qgfyga0/clean-up-kafka-environmet): + +"These info messages show up when Kafka tries to create new consumer groups. While trying to create the children of +`/consumers/[group]`, if the parent path doesn't exist, the zookeeper server logs these messages. Kafka internally +handles these cases correctly by first creating the parent node." + + + + +## Storm + +### Storm `LocalCluster` and ZooKeeper + +[LocalCluster](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/LocalCluster.clj) +starts an embedded ZooKeeper instance listening at `localhost:2000/tcp`. If a different process is already bound to +`2000/tcp`, then Storm will increment the embedded ZooKeeper's port until it finds a free port (`2000` -> `2001` -> +`2002`, and so on). `LocalCluster` then reads the Storm defaults and overrides some of Storm's configuration (see the +`mk-local-storm-cluster` function in +[testing.clj](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/testing.clj) and +the `mk-inprocess-zookeeper` function in +[zookeeper.clj](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/zookeeper.clj) +for details): + + STORM-CLUSTER-MODE "local" + STORM-ZOOKEEPER-PORT zk-port + STORM-ZOOKEEPER-SERVERS ["localhost"]} + +where `zk-port` is the final port chosen. + +As of May 2014 it is not possible to launch a local Storm cluster via `LocalCluster` without its own embedded ZooKeeper. +Likewise it is not possible to control on which port the embedded ZooKeeper process will listen -- it will always follow +the `2000/tcp` based algorithm above to set the port. A JIRA ticket was opened to untangle this hard wiring between +`LocalCluster` and ZooKeeper, cf. +[STORM-213: Decouple In-Process ZooKeeper from LocalCluster](https://issues.apache.org/jira/browse/STORM-213). + + + + +# Known issues and limitations + +This section lists known issues and limitations a) for the upstream projects such as Storm and Kafka, and b) for our +own code. + + + + +## Upstream code + +### Kryo version conflict in Storm + +_Note: This problem is resolved in the upcoming 0.9.2 version of Storm._ + +There is a Kryo version conflict between Storm 0.9.1 (uses Kryo 2.17) and Twitter Chill (uses Kryo 2.21). + +In this code project we use the workaround to exclude Kryo (2.21) from the Twitter Chill dependency, but this may not +be a universal workaround. Twitter have apparently run into data corruption issues with Kryo 2.17, and for that reason +have built their own version of Storm using Kryo 2.21. +See [CHILL-173: Kryo version conflict between Chill and Storm 0.9.1-incubating causes Avro serialization to fail](https://github.com/twitter/chill/issues/173) +for details. + + +### ZooKeeper throws InstanceAlreadyExistsException during tests + +You will see the following exception when running the integration tests, which you can safely ignore: + + [2014-03-07 11:56:59,250] WARN Failed to register with JMX (org.apache.zookeeper.server.ZooKeeperServer) + javax.management.InstanceAlreadyExistsException: org.apache.ZooKeeperService:name0=StandaloneServer_port-1 + +The root cause is that in-memory ZooKeeper instances have a hardcoded JMX setup. And because we cannot prevent Storm's +`LocalCluster` to start its own ZooKeeper instance alongside "ours" (see FAQ section above), there will be two ZK +instances trying to use the same JMX setup. Since the JMX setup is not relevant for our testing the exception can be +safely ignored, albeit we'd prefer to come up with a proper fix, of course. + + +### ZooKeeper version 3.3.x recommended for use with Storm 0.9.1 and Kafka 0.8.x + +_Note: The upcoming version 0.9.2 of Storm uses ZooKeeper 3.4.5._ + +At the time of writing both Storm (<= 0.9.1) and Kafka (<= 0.8.1.1) are not officially compatible with ZooKeeper 3.4.x +yet, which is the latest stable version of ZooKeeper. Instead the use of ZooKeeper 3.3.x is recommended. + +So which version of ZooKeeper should you do pick, particularly if you are already running a ZooKeeper cluster for other +parts of your infrastructure (such as an Hadoop cluster)? + +**The TL;DR version is:** Try using ZooKeeper 3.4.5 for both Kafka and Storm, but see the caveats and workarounds +below. If you do run into problems, consider downgrading to ZooKeeper 3.3.6. If that fails, too, try 3.3.4. In the +worst case use separate ZooKeeper clusters/versions for Storm (3.3.3) and Kafka (3.3.4). + +**The longer version is:** Storm versions up to and including 0.9.1 want ZK 3.3.3, but the upcoming 0.9.2 version +relies on ZooKeeper 3.4.x. +[All current versions of Kafka still prefer ZK 3.3.4](https://kafka.apache.org/documentation.html#zkversion). +Generally speaking though, the best 3.3.x version of ZooKeeper is 3.3.6, which is the latest stable 3.3.x version. This +is because 3.3.6 fixed a number of serious bugs that could lead to data corruption. + +_Tip: You can verify against which ZK version the code in this project is actually built by running_ +_`./sbt dependency-graph`._ + +**The really long version is:** In the _code and tests_ of this project we cannot use ZK 3.4.x just yet because Storm +0.9.1 is not 100% incompatible with ZK 3.4.x. For instance, Storm will throw errors if you try to run a Storm +`LocalCluster` (for unit testing) against ZK 3.4.x. At the same time, and somewhat surprisingly, you can run a "real" +Storm cluster against ZK 3.4.x. For instance, Netflix have reportedly been using ZK 3.4.5 in production since some +time. + +* Storm and ZooKeeper: Storm versions up to and including 0.9.1 are built against ZooKeeper 3.3.3 because of Storm's + dependency on [Netflix Curator 1.0.1](https://github.com/Netflix/curator). These versions of Zookeeper and Curator + are very old, and the upcoming Storm 0.9.2 therefore switches to Apache Curator 2.4.0 with ZooKeeper 3.4.x. +* Kafka and ZooKeeper: LinkedIn recommend the use of ZK 3.3.x but warn against the use of 3.3.3 because that + version has known serious issues regarding ephemeral node deletion and session expirations. For these reasons + LinkedIn run ZK 3.3.4 in production. + See [ZooKeeper version](https://kafka.apache.org/documentation.html#zkversion) in the Kafka documentation. + Lastly, there is an open Kafka JIRA ticket that covers upgrading Kafka to ZK 3.4.5, see + [KAFKA-854: Upgrade dependencies for 0.8](https://issues.apache.org/jira/browse/KAFKA-854). +* Storm and Cloudera CDH 4.5: + * [Storm cannot run in combination with a recent Hadoop/HBase version](http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3CCADoiZqom8Wuzi9uiqT4d01cTNn2r_nOmXyZyCSqEko-vOyrQBA@mail.gmail.com%3E) + -- The author ran into problems when using Storm in combination with Cloudera CDH 4. It looks as if he is trying + to build a code project that lists both Storm and Hadoop/HBase as its dependencies (similar to how we combine + Storm with Kafka), and due to that runs into ZooKeeper version conflicts as CDH 4 runs ZooKeeper 3.4.5. +* If in a production environment you run into problems when using ZooKeeper 3.4.5 with Storm <= 0.9.1, you can try + a [workaround using Google jarjar](https://groups.google.com/forum/#!topic/storm-user/TVVF_jqvD_A) in order to + deploy ZooKeeper 3.4.5 alongside Storm's/Curator's hard dependency on ZooKeeper 3.3.3. + [Another user reported](http://grokbase.com/t/gg/storm-user/134f2tw5gx/recommended-zookeeper-version-for-storm-0-8-2) + that he uses ZK 3.4.5 in production and ZK 3.3.3 for local testing by not including ZooKeeper in the uber jar + and putting the correct ZK version in the CLASSPATH at runtime. + [STORM-70: Use ZooKeeper 3.4.5](https://issues.apache.org/jira/browse/STORM-70). + + + + +## kafka-storm-starter code + +* Some code in kafka-storm-starter does not look like idiomatic Scala code. While sometimes this may be our own fault, + there is one area where we cannot easily prevent this from happening: When the underlying Java APIs (here: the Java + API of Storm) do not lend themselves to a more Scala-like code style. You can see this, for instance, in the way we + wire the spouts and bolts of a topology. One alternative, of course, would be to create Scala-fied wrappers but this + seemed inappropriate for this project. +* We are using `Thread.sleep()` in some tests instead of more intelligent approaches. To prevent transient failures we + may thus want to improve those tests. In Kafka's test suites, for instance, tests are using `waitUntilTrue()` to + detect more reliably when to proceed (or fail/timeout) with the next step. See the related discussion in the + [review request 19696 for KAFKA-1317](https://reviews.apache.org/r/19696/#comment71202). +* We noticed that the tests may fail when using Oracle/Sun JDK 1.6.0_24. Later versions (e.g. 1.6.0_31) work fine. + + + + +# Change log + +See [CHANGELOG](CHANGELOG.md). + + + + +# Contributing to kafka-storm-starter + +Code contributions, bug reports, feature requests etc. are all welcome. + +If you are new to GitHub please read [Contributing to a project](https://help.github.com/articles/fork-a-repo) for how +to send patches and pull requests to kafka-storm-starter. + + + + +# License + +Copyright © 2014 Michael G. Noll + +See [LICENSE](LICENSE) for licensing information. + + + + +# References + + + + +## Wirbelsturm + +Want to perform 1-click deployments of Kafka clusters and/or Storm clusters (with a Graphite instance, with Redis, +with...)? Take a look at [Wirbelsturm](https://github.com/miguno/wirbelsturm), with which you can deploy such +environments locally and to Amazon AWS. + + + + +## Kafka + +Unit testing: + +* [buildlackey/cep/kafka-0.8.x](https://github.com/buildlackey/cep/tree/master/kafka-0.8.x) + -- A simple Kafka producer/consumer example with in-memory Kafka and Zookeeper instances. For a number of reasons + we opted not to use that code. We list it in this section in case someone else may find it helpful. + + + + +## Storm + +Storm in general: + +* [Storm FAQ](http://storm.incubator.apache.org/documentation/FAQ.html) +* [Config (Java API)](http://storm.incubator.apache.org/apidocs/backtype/storm/Config.html) +* [Understanding the Internal Message Buffers of Storm](http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/) +* [Sending Metrics From Storm to Graphite](http://www.michael-noll.com/blog/2013/11/06/sending-metrics-from-storm-to-graphite/) + +Unit testing: + +* [TestingApiDemo.java](https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java) + -- Demonstrates in Java how to use Storm's built-in testing API. Unfortunately the code is more than a year old and + not well documented. + * Note that `backtype.storm.Testing` is apparently not well suited to test Trident topologies. + See [Any Java example to write test cases for storm Transactional topology](https://groups.google.com/forum/#!msg/storm-user/nZs2NwNqqn8/CjKaZK7eRFsJ) + (Mar 2013) for details. +* [MockOutputCollector](https://gist.github.com/k2xl/1782187) + -- Code example on how to implement a mock `OutputCollector` for unit testing. +* [Testing the logic of Storm topologies](https://groups.google.com/forum/#!topic/storm-user/Magc5-vt2Hg) + -- Discussion in the old storm-user mailing list, Dec 2011 +* [buildlackey/cep/storm-kafka](https://github.com/buildlackey/cep/tree/master/storm%2Bkafka) + -- Kafka spout integration test with an in-memory Storm cluster (`LocalCluster`), and in-memory Kafka and Zookeeper + instances. For a number of reasons we opted not to use that code. We list it in this section in case someone else + may find it helpful. +* [buildlackey/cep/esper+storm+kafka](https://github.com/buildlackey/cep/tree/master/esper%2Bstorm%2Bkafka) + -- Example illustrating a Kafka consumer spout, a Kafka producer bolt, and an Esper streaming query bolt +* [schleyfox/storm-test](https://github.com/schleyfox/storm-test) + -- Test utilities for Storm (in Clojure). + +Kafka spout [wurstmeister/storm-kafka-0.8-plus](https://github.com/wurstmeister/storm-kafka-0.8-plus): + +* [Example code on how to use the spout](https://github.com/wurstmeister/storm-kafka-0.8-plus-test) + +Kafka spout [HolmesNL/kafka-spout](https://github.com/HolmesNL/kafka-spout), written by the +[Netherlands Forensics Institute](http://forensicinstitute.nl): + +* [Main documentation](https://github.com/HolmesNL/kafka-spout/wiki) +* [KafkaSpout.java](https://github.com/HolmesNL/kafka-spout/blob/develop/src/main/java/nl/minvenj/nfi/storm/kafka/KafkaSpout.java) + -- Helpful to understand how the spout works. +* [ConfigUtils.java](https://github.com/HolmesNL/kafka-spout/blob/develop/src/main/java/nl/minvenj/nfi/storm/kafka/util/ConfigUtils.java) + -- Helpful to understand how the Kafka spout can be configured. + + + + +## Avro + +Twitter Bijection: + +* [SpecificAvroCodecsSpecification.scala](https://github.com/twitter/bijection/blob/develop/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala) + -- How to use Bijection for Avro's `Specific*` API (which is what you would usually do) +* [GenericAvroCodecsSpecification.scala](https://github.com/twitter/bijection/blob/develop/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala) + -- How to use Bijection for Avro's `Generic*` API + +Kafka: + +* [How to use Kafka and Avro](http://stackoverflow.com/questions/8298308/how-to-use-kafka-and-avro) + + + + +## Kryo + +* [AdamKryoRegistrator.java](https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala) + -- example on how to register serializers with Kyro +* Twitter Chill examples on how to create Avro-based serializers for Kryo: + * [AvroSerializerSpec.scala](https://github.com/twitter/chill/blob/develop/chill-avro/src/test/scala/com/twitter/chill/avro/AvroSerializerSpec.scala) + * [BijectionEnrichedKryo.scala](https://github.com/twitter/chill/blob/develop/chill-bijection/src/main/scala/com/twitter/chill/BijectionEnrichedKryo.scala) diff --git a/assembly.sbt b/assembly.sbt new file mode 100644 index 0000000..fd69c77 --- /dev/null +++ b/assembly.sbt @@ -0,0 +1,16 @@ +import AssemblyKeys._ + +assemblySettings + +// Any customized settings must be written here, i.e. after 'assemblySettings' above. +// See https://github.com/sbt/sbt-assembly for available parameters. + +// Include "provided" dependencies back to run/test tasks' classpath. +// See: +// https://github.com/sbt/sbt-assembly#-provided-configuration +// http://stackoverflow.com/a/21803413/3827 +// +// In our case, the Storm dependency must be set to "provided (cf. `build.sbt`) because, when deploying and launching +// our Storm topology code "for real" to a distributed Storm cluster, Storm wants us to exclude the Storm dependencies +// (jars) as they are provided [no pun intended] by the Storm cluster. +run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)) diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..e7ac3bc --- /dev/null +++ b/build.sbt @@ -0,0 +1,89 @@ +organization := "com.miguno.kafkastorm" + +name := "kafka-storm-starter" + +scalaVersion := "2.10.4" + +seq(sbtavro.SbtAvro.avroSettings : _*) + +// Configure the desired Avro version. sbt-avro automatically injects a libraryDependency. +(version in avroConfig) := "1.7.6" + +// Look for *.avsc etc. files in src/test/avro +(sourceDirectory in avroConfig) <<= (sourceDirectory in Compile)(_ / "avro") + +(stringType in avroConfig) := "String" + +// https://github.com/jrudolph/sbt-dependency-graph +net.virtualvoid.sbt.graph.Plugin.graphSettings + +resolvers ++= Seq( + "typesafe-repository" at "http://repo.typesafe.com/typesafe/releases/", + "clojars-repository" at "https://clojars.org/repo", + // For retrieving Kafka release artifacts directly from Apache. The artifacts are also available via Maven Central. + "Apache releases" at "https://repository.apache.org/content/repositories/releases/" +) + +libraryDependencies ++= Seq( + "com.twitter" %% "bijection-core" % "0.6.2", + "com.twitter" %% "bijection-avro" % "0.6.2", + // Chill uses Kryo 2.21, which is not fully compatible with 2.17 (used by Storm). + // We must exclude the newer Kryo version, otherwise we run into the problem described at + // https://github.com/thinkaurelius/titan/issues/301. + // + // TODO: Once Storm 0.9.2 is released we can update our dependencies to use Chill as-is (without excludes) because + // Storm then uses Kryo 2.21 (via Carbonite 1.3.3) just like Chill does. + "com.twitter" %% "chill" % "0.3.6" + exclude("com.esotericsoftware.kryo", "kryo"), + "com.twitter" % "chill-avro" % "0.3.6" + exclude("com.esotericsoftware.kryo", "kryo"), + "com.twitter" %% "chill-bijection" % "0.3.6" + exclude("com.esotericsoftware.kryo", "kryo"), + // The excludes of jms, jmxtools and jmxri are required as per https://issues.apache.org/jira/browse/KAFKA-974. + // The exclude of slf4j-simple is because it overlaps with our use of logback with slf4j facade; without the exclude + // we get slf4j warnings and logback's configuration is not picked up. + "org.apache.kafka" % "kafka_2.10" % "0.8.1.1" + exclude("javax.jms", "jms") + exclude("com.sun.jdmk", "jmxtools") + exclude("com.sun.jmx", "jmxri") + exclude("org.slf4j", "slf4j-simple"), + "org.apache.storm" % "storm-core" % "0.9.1-incubating" % "provided" + exclude("org.slf4j", "log4j-over-slf4j"), + // We exclude curator-framework because storm-kafka-0.8-plus recently switched from curator 1.0.1 to 1.3.3, which + // pulls in a newer version of ZooKeeper with which Storm 0.9.1 is not yet compatible. + // + // TODO: Remove the exclude once Storm 0.9.2 is released, because that version depends on a newer version (3.4.x) of + // ZooKeeper. + "com.miguno" %% "storm-kafka-0.8-plus" % "0.5.0-SNAPSHOT" + exclude("com.netflix.curator", "curator-framework"), + "com.netflix.curator" % "curator-test" % "1.0.1", + "com.101tec" % "zkclient" % "0.4", + // Logback with slf4j facade + "ch.qos.logback" % "logback-classic" % "1.1.2", + "ch.qos.logback" % "logback-core" % "1.1.2", + "org.slf4j" % "slf4j-api" % "1.7.7", + // Test dependencies + "org.scalatest" %% "scalatest" % "2.1.6" % "test", + "org.mockito" % "mockito-all" % "1.9.5" % "test" +) + +// Required IntelliJ workaround. This tells `sbt gen-idea` to include scala-reflect as a compile dependency (and not +// merely as a test dependency), which we need for TypeTag usage. +libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _) + +scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") + +publishArtifact in Test := false + +parallelExecution in Test := false + +// Write test results to file in JUnit XML format +testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/test-reports/junitxml") + +// Write test results to console/stdout +testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-o") + +// See https://github.com/scoverage/scalac-scoverage-plugin +ScoverageSbtPlugin.instrumentSettings + +mainClass in (Compile,run) := Some("com.miguno.kafkastorm.storm.KafkaStormDemo") diff --git a/images/IntelliJ-IDEA-Avro-bug.png b/images/IntelliJ-IDEA-Avro-bug.png new file mode 100644 index 0000000..455bb09 Binary files /dev/null and b/images/IntelliJ-IDEA-Avro-bug.png differ diff --git a/images/IntelliJ-IDEA-Avro-bug_400x216.png b/images/IntelliJ-IDEA-Avro-bug_400x216.png new file mode 100644 index 0000000..b5de7ef Binary files /dev/null and b/images/IntelliJ-IDEA-Avro-bug_400x216.png differ diff --git a/project/assembly.sbt b/project/assembly.sbt new file mode 100644 index 0000000..54c3252 --- /dev/null +++ b/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..8ac605a --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.2 diff --git a/project/build.sbt b/project/build.sbt new file mode 100644 index 0000000..ccfbb29 --- /dev/null +++ b/project/build.sbt @@ -0,0 +1,2 @@ +// https://github.com/sbt/sbt-release +addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.8.3") diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..a0fcddd --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,20 @@ +resolvers ++= Seq( + "sbt-plugin-releases-repo" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases", + "sbt-idea-repository" at "http://mpeltonen.github.io/maven/" +) + +// https://github.com/mpeltonen/sbt-idea +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0") + +// https://github.com/typesafehub/sbteclipse +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0") + +// https://github.com/cavorite/sbt-avro +addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2") + +// https://github.com/jrudolph/sbt-dependency-graph +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4") + +// See https://github.com/scoverage/scalac-scoverage-plugin +// and https://github.com/scoverage/sbt-scoverage +addSbtPlugin("org.scoverage" %% "sbt-scoverage" % "0.98.2") diff --git a/sbt b/sbt new file mode 100755 index 0000000..15a6f76 --- /dev/null +++ b/sbt @@ -0,0 +1,457 @@ +#!/usr/bin/env bash +# +# A more capable sbt runner, coincidentally also called sbt. +# Author: Paul Phillips + +# todo - make this dynamic +declare -r sbt_release_version="0.13.2" +declare -r sbt_unreleased_version="0.13.5-RC1" +declare -r buildProps="project/build.properties" + +declare sbt_jar sbt_dir sbt_create sbt_version +declare scala_version java_home sbt_explicit_version +declare verbose noshare batch trace_level log_level +declare sbt_saved_stty + +echoerr () { echo >&2 "$@"; } +vlog () { [[ -n "$verbose" ]] && echoerr "$@"; } + +# spaces are possible, e.g. sbt.version = 0.13.0 +build_props_sbt () { + [[ -r "$buildProps" ]] && \ + grep '^sbt\.version' "$buildProps" | tr '=' ' ' | awk '{ print $2; }' +} + +update_build_props_sbt () { + local ver="$1" + local old="$(build_props_sbt)" + + [[ -r "$buildProps" ]] && [[ "$ver" != "$old" ]] && { + perl -pi -e "s/^sbt\.version\b.*\$/sbt.version=${ver}/" "$buildProps" + grep -q '^sbt.version[ =]' "$buildProps" || printf "\nsbt.version=%s\n" "$ver" >> "$buildProps" + + vlog "!!!" + vlog "!!! Updated file $buildProps setting sbt.version to: $ver" + vlog "!!! Previous value was: $old" + vlog "!!!" + } +} + +set_sbt_version () { + sbt_version="${sbt_explicit_version:-$(build_props_sbt)}" + [[ -n "$sbt_version" ]] || sbt_version=$sbt_release_version + export sbt_version +} + +# restore stty settings (echo in particular) +onSbtRunnerExit() { + [[ -n "$sbt_saved_stty" ]] || return + vlog "" + vlog "restoring stty: $sbt_saved_stty" + stty "$sbt_saved_stty" + unset sbt_saved_stty +} + +# save stty and trap exit, to ensure echo is reenabled if we are interrupted. +trap onSbtRunnerExit EXIT +sbt_saved_stty="$(stty -g 2>/dev/null)" +vlog "Saved stty: $sbt_saved_stty" + +# this seems to cover the bases on OSX, and someone will +# have to tell me about the others. +get_script_path () { + local path="$1" + [[ -L "$path" ]] || { echo "$path" ; return; } + + local target="$(readlink "$path")" + if [[ "${target:0:1}" == "/" ]]; then + echo "$target" + else + echo "${path%/*}/$target" + fi +} + +die() { + echo "Aborting: $@" + exit 1 +} + +make_url () { + version="$1" + + case "$version" in + 0.7.*) echo "http://simple-build-tool.googlecode.com/files/sbt-launch-0.7.7.jar" ;; + 0.10.* ) echo "$sbt_launch_repo/org.scala-tools.sbt/sbt-launch/$version/sbt-launch.jar" ;; + 0.11.[12]) echo "$sbt_launch_repo/org.scala-tools.sbt/sbt-launch/$version/sbt-launch.jar" ;; + *) echo "$sbt_launch_repo/org.scala-sbt/sbt-launch/$version/sbt-launch.jar" ;; + esac +} + +init_default_option_file () { + local overriding_var="${!1}" + local default_file="$2" + if [[ ! -r "$default_file" && "$overriding_var" =~ ^@(.*)$ ]]; then + local envvar_file="${BASH_REMATCH[1]}" + if [[ -r "$envvar_file" ]]; then + default_file="$envvar_file" + fi + fi + echo "$default_file" +} + +declare -r cms_opts="-XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC" +declare -r jit_opts="-XX:ReservedCodeCacheSize=256m -XX:+TieredCompilation" +declare -r default_jvm_opts="-XX:MaxPermSize=384m -Xms512m -Xmx1536m -Xss2m $jit_opts $cms_opts" +declare -r noshare_opts="-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy" +declare -r latest_28="2.8.2" +declare -r latest_29="2.9.3" +declare -r latest_210="2.10.4" +declare -r latest_211="2.11.0" + +declare -r script_path="$(get_script_path "$BASH_SOURCE")" +declare -r script_name="${script_path##*/}" + +# some non-read-onlies set with defaults +declare java_cmd="java" +declare sbt_opts_file="$(init_default_option_file SBT_OPTS .sbtopts)" +declare jvm_opts_file="$(init_default_option_file JVM_OPTS .jvmopts)" +declare sbt_launch_repo="http://typesafe.artifactoryonline.com/typesafe/ivy-releases" + +# pull -J and -D options to give to java. +declare -a residual_args +declare -a java_args +declare -a scalac_args +declare -a sbt_commands + +# args to jvm/sbt via files or environment variables +declare -a extra_jvm_opts extra_sbt_opts + +# if set, use JAVA_HOME over java found in path +[[ -e "$JAVA_HOME/bin/java" ]] && java_cmd="$JAVA_HOME/bin/java" + +# directory to store sbt launchers +declare sbt_launch_dir="$HOME/.sbt/launchers" +[[ -d "$sbt_launch_dir" ]] || mkdir -p "$sbt_launch_dir" +[[ -w "$sbt_launch_dir" ]] || sbt_launch_dir="$(mktemp -d -t sbt_extras_launchers.XXXXXX)" + +build_props_scala () { + if [[ -r "$buildProps" ]]; then + versionLine="$(grep '^build.scala.versions' "$buildProps")" + versionString="${versionLine##build.scala.versions=}" + echo "${versionString%% .*}" + fi +} + +execRunner () { + # print the arguments one to a line, quoting any containing spaces + vlog "# Executing command line:" && { + for arg; do + if [[ -n "$arg" ]]; then + if printf "%s\n" "$arg" | grep -q ' '; then + printf >&2 "\"%s\"\n" "$arg" + else + printf >&2 "%s\n" "$arg" + fi + fi + done + vlog "" + } + + if [[ -n "$batch" ]]; then + exec /dev/null; then + curl --fail --silent "$url" --output "$jar" + elif which wget >/dev/null; then + wget --quiet -O "$jar" "$url" + fi + } && [[ -r "$jar" ]] +} + +acquire_sbt_jar () { + sbt_url="$(jar_url "$sbt_version")" + sbt_jar="$(jar_file "$sbt_version")" + + [[ -r "$sbt_jar" ]] || download_url "$sbt_url" "$sbt_jar" +} + +usage () { + cat < display stack traces with a max of frames (default: -1, traces suppressed) + -no-colors disable ANSI color codes + -sbt-create start sbt even if current directory contains no sbt project + -sbt-dir path to global settings/plugins directory (default: ~/.sbt/) + -sbt-boot path to shared boot directory (default: ~/.sbt/boot in 0.11+) + -ivy path to local Ivy repository (default: ~/.ivy2) + -no-share use all local caches; no sharing + -offline put sbt in offline mode + -jvm-debug Turn on JVM debugging, open at the given port. + -batch Disable interactive mode + -prompt Set the sbt prompt; in expr, 's' is the State and 'e' is Extracted + + # sbt version (default: sbt.version from $buildProps if present, otherwise $sbt_release_version) + -sbt-version use the specified version of sbt (default: $sbt_release_version) + -sbt-jar use the specified jar as the sbt launcher + -sbt-launch-dir directory to hold sbt launchers (default: ~/.sbt/launchers) + -sbt-launch-repo repo url for downloading sbt launcher jar (default: $sbt_launch_repo) + + # scala version (default: as chosen by sbt) + -28 use $latest_28 + -29 use $latest_29 + -210 use $latest_210 + -211 use $latest_211 + -scala-home use the scala build at the specified directory + -scala-version use the specified version of scala + -binary-version use the specified scala version when searching for dependencies + + # java version (default: java from PATH, currently $(java -version 2>&1 | grep version)) + -java-home alternate JAVA_HOME + + # passing options to the jvm - note it does NOT use JAVA_OPTS due to pollution + # The default set is used if JVM_OPTS is unset and no -jvm-opts file is found + $default_jvm_opts + JVM_OPTS environment variable holding either the jvm args directly, or + the reference to a file containing jvm args if given path is prepended by '@' (e.g. '@/etc/jvmopts') + Note: "@"-file is overridden by local '.jvmopts' or '-jvm-opts' argument. + -jvm-opts file containing jvm args (if not given, .jvmopts in project root is used if present) + -Dkey=val pass -Dkey=val directly to the jvm + -J-X pass option -X directly to the jvm (-J is stripped) + + # passing options to sbt, OR to this runner + SBT_OPTS environment variable holding either the sbt args directly, or + the reference to a file containing sbt args if given path is prepended by '@' (e.g. '@/etc/sbtopts') + Note: "@"-file is overridden by local '.sbtopts' or '-sbt-opts' argument. + -sbt-opts file containing sbt args (if not given, .sbtopts in project root is used if present) + -S-X add -X to sbt's scalacOptions (-S is stripped) +EOM +} + +addJava () { + vlog "[addJava] arg = '$1'" + java_args=( "${java_args[@]}" "$1" ) +} +addSbt () { + vlog "[addSbt] arg = '$1'" + sbt_commands=( "${sbt_commands[@]}" "$1" ) +} +addScalac () { + vlog "[addScalac] arg = '$1'" + scalac_args=( "${scalac_args[@]}" "$1" ) +} +addResidual () { + vlog "[residual] arg = '$1'" + residual_args=( "${residual_args[@]}" "$1" ) +} +addResolver () { + addSbt "set resolvers += $1" +} +addDebugger () { + addJava "-Xdebug" + addJava "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=$1" +} +setScalaVersion () { + [[ "$1" == *"-SNAPSHOT" ]] && addResolver 'Resolver.sonatypeRepo("snapshots")' + addSbt "++ $1" +} + +process_args () +{ + require_arg () { + local type="$1" + local opt="$2" + local arg="$3" + + if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then + die "$opt requires <$type> argument" + fi + } + while [[ $# -gt 0 ]]; do + case "$1" in + -h|-help) usage; exit 1 ;; + -v) verbose=true && shift ;; + -d) addSbt "--debug" && shift ;; + -w) addSbt "--warn" && shift ;; + -q) addSbt "--error" && shift ;; + -trace) require_arg integer "$1" "$2" && trace_level="$2" && shift 2 ;; + -ivy) require_arg path "$1" "$2" && addJava "-Dsbt.ivy.home=$2" && shift 2 ;; + -no-colors) addJava "-Dsbt.log.noformat=true" && shift ;; + -no-share) noshare=true && shift ;; + -sbt-boot) require_arg path "$1" "$2" && addJava "-Dsbt.boot.directory=$2" && shift 2 ;; + -sbt-dir) require_arg path "$1" "$2" && sbt_dir="$2" && shift 2 ;; + -debug-inc) addJava "-Dxsbt.inc.debug=true" && shift ;; + -offline) addSbt "set offline := true" && shift ;; + -jvm-debug) require_arg port "$1" "$2" && addDebugger "$2" && shift 2 ;; + -batch) batch=true && shift ;; + -prompt) require_arg "expr" "$1" "$2" && addSbt "set shellPrompt in ThisBuild := (s => { val e = Project.extract(s) ; $2 })" && shift 2 ;; + + -sbt-create) sbt_create=true && shift ;; + -sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;; + -sbt-version) require_arg version "$1" "$2" && sbt_explicit_version="$2" && shift 2 ;; + -sbt-dev) sbt_explicit_version="$sbt_unreleased_version" && shift ;; +-sbt-launch-dir) require_arg path "$1" "$2" && sbt_launch_dir="$2" && shift 2 ;; +-sbt-launch-repo) require_arg path "$1" "$2" && sbt_launch_repo="$2" && shift 2 ;; + -scala-version) require_arg version "$1" "$2" && setScalaVersion "$2" && shift 2 ;; +-binary-version) require_arg version "$1" "$2" && addSbt "set scalaBinaryVersion in ThisBuild := \"$2\"" && shift 2 ;; + -scala-home) require_arg path "$1" "$2" && addSbt "set every scalaHome := Some(file(\"$2\"))" && shift 2 ;; + -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && shift 2 ;; + -sbt-opts) require_arg path "$1" "$2" && sbt_opts_file="$2" && shift 2 ;; + -jvm-opts) require_arg path "$1" "$2" && jvm_opts_file="$2" && shift 2 ;; + + -D*) addJava "$1" && shift ;; + -J*) addJava "${1:2}" && shift ;; + -S*) addScalac "${1:2}" && shift ;; + -28) setScalaVersion "$latest_28" && shift ;; + -29) setScalaVersion "$latest_29" && shift ;; + -210) setScalaVersion "$latest_210" && shift ;; + -211) setScalaVersion "$latest_211" && shift ;; + + *) addResidual "$1" && shift ;; + esac + done +} + +# process the direct command line arguments +process_args "$@" + +# skip #-styled comments and blank lines +readConfigFile() { + while read line; do + [[ $line =~ ^# ]] || [[ -z $line ]] || echo "$line" + done < "$1" +} + +# if there are file/environment sbt_opts, process again so we +# can supply args to this runner +if [[ -r "$sbt_opts_file" ]]; then + vlog "Using sbt options defined in file $sbt_opts_file" + while read opt; do extra_sbt_opts+=("$opt"); done < <(readConfigFile "$sbt_opts_file") +elif [[ -n "$SBT_OPTS" && ! ("$SBT_OPTS" =~ ^@.*) ]]; then + vlog "Using sbt options defined in variable \$SBT_OPTS" + extra_sbt_opts=( $SBT_OPTS ) +else + vlog "No extra sbt options have been defined" +fi + +[[ -n "${extra_sbt_opts[*]}" ]] && process_args "${extra_sbt_opts[@]}" + +# reset "$@" to the residual args +set -- "${residual_args[@]}" +argumentCount=$# + +# set sbt version +set_sbt_version + +# only exists in 0.12+ +setTraceLevel() { + case "$sbt_version" in + "0.7."* | "0.10."* | "0.11."* ) echoerr "Cannot set trace level in sbt version $sbt_version" ;; + *) addSbt "set every traceLevel := $trace_level" ;; + esac +} + +# set scalacOptions if we were given any -S opts +[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[@]}\"" + +# Update build.properties on disk to set explicit version - sbt gives us no choice +[[ -n "$sbt_explicit_version" ]] && update_build_props_sbt "$sbt_explicit_version" +vlog "Detected sbt version $sbt_version" + +[[ -n "$scala_version" ]] && vlog "Overriding scala version to $scala_version" + +# no args - alert them there's stuff in here +(( argumentCount > 0 )) || { + vlog "Starting $script_name: invoke with -help for other options" + residual_args=( shell ) +} + +# verify this is an sbt dir or -create was given +[[ -r ./build.sbt || -d ./project || -n "$sbt_create" ]] || { + cat < Unit) { + val topicCountMap = Map(topic -> numThreads) + val valueDecoder = new DefaultDecoder + val keyDecoder = valueDecoder + val consumerMap = consumerConnector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder) + val consumerThreads = consumerMap.get(topic) match { + case Some(streams) => streams.view.zipWithIndex map { + case (stream, threadId) => + new ConsumerTask(stream, new ConsumerTaskContext(threadId), f) + } + case _ => Seq() + } + consumerThreads foreach executor.submit + } + + def shutdown() { + consumerConnector.shutdown() + executor.shutdown() + } + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + shutdown() + } + }) + +} + +class ConsumerTask[K, V, C <: ConsumerTaskContext](stream: KafkaStream[K, V], context: C, + f: (MessageAndMetadata[K, V], C) => Unit) extends Runnable with Logging { + + override def run() { + info(s"Consumer thread ${context.threadId} started") + stream foreach { + case msg: MessageAndMetadata[_, _] => + trace(s"Thread ${context.threadId} received message: " + msg) + f(msg, context) + case _ => trace(s"Received unexpected message type from broker") + } + info(s"Shutting down consumer thread ${context.threadId}") + } + +} + +case class ConsumerTaskContext(threadId: Int) \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/kafka/KafkaEmbedded.scala b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaEmbedded.scala new file mode 100644 index 0000000..734e5c4 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaEmbedded.scala @@ -0,0 +1,68 @@ +package com.miguno.kafkastorm.kafka + +import java.util.Properties +import kafka.server.{KafkaServerStartable, KafkaConfig} +import kafka.utils.Logging + +/** + * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092`. + * + * Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance running at + * `127.0.0.1:2181`. You can specify a different ZooKeeper instance by setting the `zookeeper.connect` parameter in the + * broker's configuration. + * + * @param config Broker configuration settings. + */ +class KafkaEmbedded(config: Properties = new Properties) extends Logging { + + private val defaultZkConnect = "127.0.0.1:2181" + + private val effectiveConfig = { + val c = new Properties + c.load(this.getClass.getResourceAsStream("/broker-defaults.properties")) + c.putAll(config) + c + } + + private val kafkaConfig = new KafkaConfig(effectiveConfig) + private val kafka = new KafkaServerStartable(kafkaConfig) + + /** + * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. + * + * You can use this to tell Kafka producers and consumers how to connect to this instance. + */ + val brokerList = kafka.serverConfig.hostName + ":" + kafka.serverConfig.port + + /** + * The ZooKeeper connection string aka `zookeeper.connect`. + */ + val zookeeperConnect = { + val zkConnectLookup = Option(effectiveConfig.getProperty("zookeeper.connect")) + zkConnectLookup match { + case Some(zkConnect) => zkConnect + case _ => + warn(s"zookeeper.connect is not configured -- falling back to default setting $defaultZkConnect") + defaultZkConnect + } + } + + /** + * Start the broker. + */ + def start() { + debug(s"Starting embedded Kafka broker at $brokerList (using ZooKeeper server at $zookeeperConnect) ...") + kafka.startup() + debug("Embedded Kafka broker startup completed") + } + + /** + * Stop the broker. + */ + def stop() { + debug("Shutting down embedded Kafka broker...") + kafka.shutdown() + debug("Embedded Kafka broker shutdown completed") + } + +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala new file mode 100644 index 0000000..7b30db7 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala @@ -0,0 +1,77 @@ +package com.miguno.kafkastorm.kafka + +import kafka.producer.{KeyedMessage, ProducerConfig, Producer} +import java.util.Properties + +/** + * Demonstrates how to implement a simple Kafka producer application to send data to Kafka. + * + * Don't read too much into the actual implementation of this class. Its sole purpose is to showcase the use of the + * Kafka API. + * + * @param topic The Kafka topic to send data to. + * @param brokerList Value for Kafka's `metadata.broker.list` setting. + * @param producerConfig Additional producer configuration settings. + */ +case class KafkaProducerApp( + val topic: String, + val brokerList: String, + producerConfig: Properties = new Properties + ) { + + private val producer = { + val effectiveConfig = { + val c = new Properties + c.load(this.getClass.getResourceAsStream("/producer-defaults.properties")) + c.putAll(producerConfig) + c.put("metadata.broker.list", brokerList) + c + } + new Producer[Array[Byte], Array[Byte]](new ProducerConfig(effectiveConfig)) + } + + // The configuration field of the wrapped producer is immutable (including its nested fields), so it's safe to expose + // it directly. + val config = producer.config + + private def toMessage(key: Option[Array[Byte]], value: Array[Byte]): KeyedMessage[Array[Byte], Array[Byte]] = + key match { + case Some(key) => new KeyedMessage(topic, key, value) + case _ => new KeyedMessage(topic, value) + } + + def send(key: Array[Byte], value: Array[Byte]): Unit = producer.send(toMessage(Some(key), value)) + + def send(value: Array[Byte]): Unit = producer.send(toMessage(None, value)) + +} + +/** + * Creates KafkaProducerApp instances. + * + * We require such a factory because of how Storm and notably + * [[http://storm.incubator.apache.org/documentation/Serialization.html serialization within Storm]] work. + * Without such a factory we cannot properly unit tests Storm bolts that need to write to Kafka. + * + * Preferably we would simply pass a Kafka producer directly to a Storm bolt. During testing we could then mock this + * collaborator. However this intuitive approach fails at (Storm) runtime because Kafka producers are not serializable. + * The alternative approach of instantiating the Kafka producer from within the bolt (e.g. using a `@transient lazy val` + * field) does work at runtime but prevents us from verifying the correct interaction between our bolt's code and its + * collaborator, the Kafka producer, because we cannot easily mock the producer in this setup. The chosen approach of + * the factory method, while introducing some level of unwanted indirection and complexity, is a pragmatic approach to + * make our Storm code work correctly at runtime and to make it testable. + * + * @param topic The Kafka topic to send data to. + * @param brokerList Value for Kafka's `metadata.broker.list` setting. + * @param config Additional producer configuration settings. + */ +abstract class KafkaProducerAppFactory(topic: String, brokerList: String, config: Properties) extends Serializable { + def newInstance(): KafkaProducerApp +} + +class BaseKafkaProducerAppFactory(topic: String, brokerList: String, config: Properties = new Properties) + extends KafkaProducerAppFactory(topic, brokerList, config) { + + override def newInstance() = new KafkaProducerApp(topic, brokerList, config) + +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala b/src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala new file mode 100644 index 0000000..5390e7a --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala @@ -0,0 +1,106 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.topology.base.BaseBasicBolt +import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} +import backtype.storm.tuple.{Fields, Tuple, Values} +import com.google.common.base.Throwables +import com.twitter.bijection.avro.SpecificAvroCodecs +import com.twitter.bijection.Injection +import org.apache.avro.specific.SpecificRecordBase +import org.slf4j.{Logger, LoggerFactory} +import scala.util.{Try, Failure, Success} + +/** + * An binaryAvro->pojoAvro converter bolt. + * + * This bolt expects incoming data in Avro-encoded binary format, serialized according to the Avro schema of `T`. It + * will deserialize the incoming data into a `T` pojo, and emit this pojo to downstream consumers. As such this bolt + * can be considered the Storm equivalent of Twitter Bijection's `Injection.invert[T, Array[Byte]](bytes)` for + * Avro data. + * + * By using this bolt you don't need to write another decoder bolt just because the bolt needs to handle a different + * Avro schema. + * + * @example {{{ + * import backtype.storm.topology.TopologyBuilder + * import com.miguno.avro.Tweet + * + * val builder = new TopologyBuilder + * // ...spout is set up here... + * val decoderBolt = new AvroDecoderBolt[Tweet] + * builder.setBolt(decoderBoltId, decoderBolt).shuffleGrouping(spoutId) // or whatever grouping you need + * }}} + * + * @param inputField The name of the field in the input tuple to read from. (Default: "bytes") + * @param outputField The name of the field in the output tuple to write to. (Default: "pojo") + * @tparam T The type of the Avro record (e.g. a `Tweet`) based on the underlying Avro schema being used. Must be + * a subclass of Avro's `SpecificRecordBase`. + */ +class AvroDecoderBolt[T <: SpecificRecordBase : Manifest]( + inputField: String = "bytes", + outputField: String = "pojo") + extends BaseBasicBolt { + + // Note: Ideally we would like to use TypeTag's instead of Manifest's here. Doing so would only require replacing + // `manifest[T]` with `typeOf[T]`, and adding AvroDecoderBolt[T : TypeTag]. Unfortunately there is a known + // serialization bug in Scala's TypeTag implementation that will trigger runtime exceptions when submitting/running + // this class in a Storm topology. + // + // See "SI-5919: Type tags (and Exprs as well) should be serializable" (https://issues.scala-lang.org/browse/SI-5919) + val tpe = manifest[T] + + // Must be transient because Logger is not serializable + @transient lazy private val log: Logger = LoggerFactory.getLogger(classOf[AvroDecoderBolt[T]]) + + // Must be transient because Injection is not serializable. Must be implicit because that's who Injection works. + @transient lazy implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] = + SpecificAvroCodecs.toBinary[T] + + override def execute(tuple: Tuple, collector: BasicOutputCollector) { + val readTry = Try(tuple.getBinaryByField(inputField)) + readTry match { + case Success(bytes) if bytes != null => decodeAndSinkToKafka(bytes, collector) + case Success(_) => log.error("Reading from input tuple returned null") + case Failure(e) => log.error("Could not read from input tuple: " + Throwables.getStackTraceAsString(e)) + } + } + + private def decodeAndSinkToKafka(bytes: Array[Byte], collector: BasicOutputCollector) { + require(bytes != null, "bytes must not be null") + val decodeTry = Injection.invert[T, Array[Byte]](bytes) + decodeTry match { + case Success(pojo) => + log.debug("Binary data decoded into pojo: " + pojo) + collector.emit(new Values(pojo)) + case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e)) + } + } + + override def declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(new Fields(outputField)) + } + +} + +object AvroDecoderBolt { + + /** + * Factory method for Java interoperability. + * + * @example {{{ + * // in Java + * AvroDecoderBolt decoderBolt = AvroDecoderBolt.ofType(Tweet.class); + * }}} + * + * @param cls + * @tparam T + * @return + */ + def ofType[T <: SpecificRecordBase](cls: java.lang.Class[T]) = { + val manifest = Manifest.classType[T](cls) + newInstance[T](manifest) + } + + private def newInstance[T <: SpecificRecordBase : Manifest] = new AvroDecoderBolt[T] + +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala b/src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala new file mode 100644 index 0000000..7cd79d8 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala @@ -0,0 +1,104 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.task.TopologyContext +import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} +import backtype.storm.topology.base.BaseBasicBolt +import backtype.storm.tuple.{Tuple, Fields} +import com.miguno.kafkastorm.kafka.{KafkaProducerAppFactory, KafkaProducerApp} +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import java.util +import org.apache.avro.specific.SpecificRecordBase +import org.slf4j.{Logger, LoggerFactory} + +/** + * A Storm->Kafka writer bolt. + * + * This bolt expects Avro pojos of type `T` as incoming data. It will Avro-encode these pojos into a binary + * representation (bytes) according to the Avro schema of `T`, and then send these bytes to Kafka. + * + * @param producerFactory A factory to instantiate the required Kafka producer. We require such a factory because of + * unit testing and the way Storm code is (shipped and) executed in a Storm cluster. Because + * a bolt is instantiated on a different JVM we cannot simply pass the "final" Kafka producer + * directly to the bolt when we wire the topology. Instead we must enable each bolt instance to + * create its own Kafka producer when it is starting up (and this startup typically happens in a + * different JVM on a different machine). + * @param inputField The name of the field in the input tuple to read from. (Default: "pojo") + * @param outputField The name of the field in the output tuple to write to. (Default: "bytes") + * @tparam T The type of the Avro record (e.g. a `Tweet`) based on the underlying Avro schema being used. Must be + * a subclass of Avro's `SpecificRecordBase`. + */ +class AvroKafkaSinkBolt[T <: SpecificRecordBase : Manifest]( + producerFactory: KafkaProducerAppFactory, + inputField: String = "pojo", + outputField: String = "bytes") + extends BaseBasicBolt { + + // Note: Ideally we would like to use TypeTag's instead of Manifest's here. Doing so would only require replacing + // `manifest[T]` with `typeOf[T]`, and adding AvroKafkaSinkBolt[T : TypeTag]. Unfortunately there is a known + // serialization bug in Scala's TypeTag implementation that will trigger runtime exceptions when submitting/running + // this class in a Storm topology. + // + // See "SI-5919: Type tags (and Exprs as well) should be serializable" (https://issues.scala-lang.org/browse/SI-5919) + val tpe = manifest[T] + + // Must be transient because Logger is not serializable + @transient lazy private val log: Logger = LoggerFactory.getLogger(classOf[AvroKafkaSinkBolt[T]]) + + // Must be transient because Injection is not serializable + @transient lazy implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] = + SpecificAvroCodecs.toBinary[T] + + // Must be transient because KafkaProducerApp is not serializable. The factory approach to instantiate a Kafka producer + // unfortunately means we must use a var combined with `prepare()` -- a val would cause a NullPointerException at + // runtime for the producer. + @transient private var producer: KafkaProducerApp = _ + + override def prepare(stormConf: util.Map[_, _], context: TopologyContext) { + producer = producerFactory.newInstance() + } + + override def execute(tuple: Tuple, collector: BasicOutputCollector) { + tuple.getValueByField(inputField) match { + case pojo: T => + val bytes = Injection[T, Array[Byte]](pojo) + log.debug("Encoded pojo " + pojo + " to Avro binary format") + producer.send(bytes) + case _ => log.error("Could not decode binary data") + } + } + + override def declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(new Fields()) + } + +} + +object AvroKafkaSinkBolt { + + /** + * Factory method for Java interoperability. + * + * @example {{{ + * // Java example + * AvroKafkaSinkBolt kafkaSinkBolt = AvroKafkaSinkBolt.ofType(Tweet.class)(brokerList, ...); + * }}} + * + * @param cls + * @tparam T + * @return + */ + def ofType[T <: SpecificRecordBase](cls: java.lang.Class[T])( + producerFactory: KafkaProducerAppFactory, + inputFieldName: String = "pojo") = { + val manifest = Manifest.classType[T](cls) + newInstance[T](producerFactory, inputFieldName)(manifest) + } + + private def newInstance[T <: SpecificRecordBase]( + producerFactory: KafkaProducerAppFactory, + inputFieldName: String = "pojo") + (implicit man: Manifest[T]) = + new AvroKafkaSinkBolt[T](producerFactory, inputFieldName) + +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala b/src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala new file mode 100644 index 0000000..8a767e6 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala @@ -0,0 +1,80 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.spout.Scheme +import backtype.storm.tuple.{Fields, Values} +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import org.apache.avro.specific.SpecificRecordBase +import scala.util.{Failure, Success} + +/** + * A custom binaryAvro->pojoAvro `backtype.storm.spout.Scheme` to auto-deserialize a spout's incoming data. You can + * parameterize this scheme with the Avro type `T` of the spout's expected input data. + * + * In the case of `storm.kafka.KafkaSpout` its default scheme is Storm's `backtype.storm.spout.RawMultiScheme`, + * which simply returns the raw bytes of the incoming data (i.e. leaving deserialization up to you in subsequent bolts + * such as [[AvroDecoderBolt]]). Alternatively, you configure the spout to use this custom scheme. If you do, then the + * spout will automatically deserialize its incoming data into pojos. Note that you will need to register a custom + * Kryo decorator for the Avro type `T`, see [[TweetAvroKryoDecorator]] for an example. + * + * @example {{{ + * import backtype.storm.spout.SchemeAsMultiScheme + * import com.miguno.avro.Tweet + * storm.kafka.{KafkaSpout, SpoutConfig} + * + * val spoutConfig = new SpoutConfig(...) + * spoutConfig.scheme = new SchemeAsMultiScheme(new AvroScheme[Tweet]) + * val kafkaSpout = new KafkaSpout(spoutConfig) + * }}} + * + * @tparam T The type of the Avro record (e.g. a `Tweet`) based on the underlying Avro schema being used. Must be + * a subclass of Avro's `SpecificRecordBase`. + */ +class AvroScheme[T <: SpecificRecordBase : Manifest] extends Scheme { + + // Note: Ideally we would like to use TypeTag's instead of Manifest's here. Doing so would only require replacing + // `manifest[T]` with `typeOf[T]`, and adding AvroScheme[T : TypeTag]. Unfortunately there is a known serialization + // bug in the TypeTag implementation of Scala versions <= 2.11.1 that will trigger runtime exceptions when + // submitting/running this class in a Storm topology. + // + // See "SI-5919: Type tags (and Exprs as well) should be serializable" (https://issues.scala-lang.org/browse/SI-5919) + val tpe = manifest[T] + + private val OutputFieldName = "pojo" + + @transient lazy implicit private val specificAvroBinaryInjection = SpecificAvroCodecs.toBinary[T] + + override def deserialize(bytes: Array[Byte]): java.util.List[AnyRef] = { + val result = Injection.invert[T, Array[Byte]](bytes) + result match { + case Success(pojo) => new Values(pojo) + case Failure(e) => throw new RuntimeException("Could not decode input bytes") + } + } + + override def getOutputFields() = new Fields(OutputFieldName) + +} + +object AvroScheme { + + /** + * Factory method for Java interoperability. + * + * @example {{{ + * // in Java + * AvroScheme avroScheme = AvroScheme.ofType(Tweet.class); + * }}} + * + * @param cls + * @tparam T + * @return + */ + def ofType[T <: SpecificRecordBase](cls: java.lang.Class[T]) = { + val manifest = Manifest.classType[T](cls) + newInstance[T](manifest) + } + + private def newInstance[T <: SpecificRecordBase : Manifest] = new AvroScheme[T] + +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala b/src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala new file mode 100644 index 0000000..4a81ea9 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala @@ -0,0 +1,121 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.{Config, LocalCluster} +import backtype.storm.generated.KillOptions +import backtype.storm.topology.TopologyBuilder +import com.miguno.kafkastorm.kafka.KafkaEmbedded +import com.miguno.kafkastorm.zookeeper.ZooKeeperEmbedded +import java.util.Properties +import kafka.admin.AdminUtils +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import scala.concurrent.duration._ +import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} + +/** + * Showcases how to create a Storm topology that reads data from Kafka. Because it's a demo this topology does not + * (yet?) do anything to the input data -- it just reads, that's it. If you want to add functionality you only need to + * put one or more Storm bolts after the spout that reads from Kafka. + * + * The default setup runs the topology against an in-memory instance of Kafka (that is backed by an in-memory instance + * of ZooKeeper). Alternatively, you can also point the topology to a "real" Kafka cluster. An easy and quick way to + * deploy such a Kafka and ZooKeeper infrastructure is to use a tool such as + * [[https://github.com/miguno/wirbelsturm Wirbelsturm]]. + */ +class KafkaStormDemo(kafkaZkConnect: String, topic: String, numTopicPartitions: Int = 1, + topologyName: String = "kafka-storm-starter", runtime: Duration = 1.hour) { + + def runTopologyLocally() { + val zkHosts = new ZkHosts(kafkaZkConnect) + val topic = "testing" + val zkRoot = "/kafka-spout" + // The spout appends this id to zkRoot when composing its ZooKeeper path. You don't need a leading `/`. + val zkSpoutId = "kafka-storm-starter" + val kafkaConfig = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId) + val kafkaSpout = new KafkaSpout(kafkaConfig) + val numSpoutExecutors = numTopicPartitions + val builder = new TopologyBuilder + val spoutId = "kafka-spout" + builder.setSpout(spoutId, kafkaSpout, numSpoutExecutors) + + // Showcases how to customize the topology configuration + val topologyConfiguration = { + val c = new Config + c.setDebug(false) + c.setNumWorkers(4) + c.setMaxSpoutPending(1000) + c.setMessageTimeoutSecs(60) + c.setNumAckers(0) + c.setMaxTaskParallelism(50) + c.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384: Integer) + c.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384: Integer) + c.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8: Integer) + c.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32: Integer) + c.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.05: java.lang.Double) + c + } + + // Now run the topology in a local, in-memory Storm cluster + val cluster = new LocalCluster + cluster.submitTopology(topologyName, topologyConfiguration, builder.createTopology()) + Thread.sleep(runtime.toMillis) + val killOpts = new KillOptions() + killOpts.set_wait_secs(1) + cluster.killTopologyWithOpts(topologyName, killOpts) + cluster.shutdown() + } + +} + +object KafkaStormDemo { + + private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None + private var zkClient: Option[ZkClient] = None + private var kafkaEmbedded: Option[KafkaEmbedded] = None + + def main(args: Array[String]) { + val kafkaTopic = "testing" + startZooKeeperAndKafka(kafkaTopic) + for {z <- zookeeperEmbedded} { + val topology = new KafkaStormDemo(z.connectString, kafkaTopic) + topology.runTopologyLocally() + } + stopZooKeeperAndKafka() + } + + /** + * Launches in-memory, embedded instances of ZooKeeper and Kafka so that our demo Storm topology can connect to and + * read from Kafka. + */ + private def startZooKeeperAndKafka(topic: String, numTopicPartitions: Int = 1, numTopicReplicationFactor: Int = 1, + zookeeperPort: Int = 2181) { + + zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort)) + for {z <- zookeeperEmbedded} { + val brokerConfig = new Properties + brokerConfig.put("zookeeper.connect", z.connectString) + kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig)) + for {k <- kafkaEmbedded} { + k.start() + } + + val sessionTimeout = 30.seconds + val connectionTimeout = 30.seconds + zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt, + ZKStringSerializer)) + for { + zc <- zkClient + } { + val topicConfig = new Properties + AdminUtils.createTopic(zc, topic, numTopicPartitions, numTopicReplicationFactor, topicConfig) + } + } + } + + private def stopZooKeeperAndKafka() { + for {k <- kafkaEmbedded} k.stop() + for {zc <- zkClient} zc.close() + for {z <- zookeeperEmbedded} z.stop() + } + +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala b/src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala new file mode 100644 index 0000000..313a423 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala @@ -0,0 +1,14 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.serialization.IKryoDecorator +import com.esotericsoftware.kryo.Kryo +import com.miguno.avro.Tweet +import com.twitter.chill.KryoSerializer +import com.twitter.chill.avro.AvroSerializer + +class TweetAvroKryoDecorator extends IKryoDecorator { + override def decorate(k: Kryo) { + k.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet]) + KryoSerializer.registerAll(k) + } +} \ No newline at end of file diff --git a/src/main/scala/com/miguno/kafkastorm/storm/utils/StormRunner.scala b/src/main/scala/com/miguno/kafkastorm/storm/utils/StormRunner.scala new file mode 100644 index 0000000..2a71e72 --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/storm/utils/StormRunner.scala @@ -0,0 +1,24 @@ +package com.miguno.kafkastorm.storm.utils + +import backtype.storm.{Config, StormSubmitter, LocalCluster} +import backtype.storm.generated.StormTopology +import scala.concurrent.duration._ + +/** + * Provides convenience functions to run Storm topologies locally and remotely (i.e. in a "real" Storm cluster). + */ +object StormRunner { + + def runTopologyLocally(topology: StormTopology, topologyName: String, conf: Config, runtime: Duration) { + val cluster: LocalCluster = new LocalCluster + cluster.submitTopology(topologyName, conf, topology) + Thread.sleep(runtime.toMillis) + cluster.killTopology(topologyName) + cluster.shutdown() + } + + def runTopologyRemotely(topology: StormTopology, topologyName: String, conf: Config) { + StormSubmitter.submitTopology(topologyName, conf, topology) + } + +} diff --git a/src/main/scala/com/miguno/kafkastorm/zookeeper/ZooKeeperEmbedded.scala b/src/main/scala/com/miguno/kafkastorm/zookeeper/ZooKeeperEmbedded.scala new file mode 100644 index 0000000..f7a9e7a --- /dev/null +++ b/src/main/scala/com/miguno/kafkastorm/zookeeper/ZooKeeperEmbedded.scala @@ -0,0 +1,36 @@ +package com.miguno.kafkastorm.zookeeper + +import com.netflix.curator.test.TestingServer +import kafka.utils.Logging + +/** + * Runs an in-memory, "embedded" instance of a ZooKeeper server. + * + * The ZooKeeper server instance is automatically started when you create a new instance of this class. + * + * @param port The port (aka `clientPort`) to listen to. Default: 2181. + */ +class ZooKeeperEmbedded(port: Int) extends Logging { + + debug(s"Starting embedded ZooKeeper server on port ${port}...") + + private val server = new TestingServer(port) + + /** + * Stop the instance. + */ + def stop() { + debug("Shutting down embedded ZooKeeper server...") + server.close() + debug("Embedded ZooKeeper server shutdown completed") + } + + /** + * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. + * Example: `127.0.0.1:2181`. + * + * You can use this to e.g. tell Kafka and Storm how to connect to this instance. + */ + val connectString = server.getConnectString + +} \ No newline at end of file diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..25ae243 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka.logs.dir=logs + +log4j.rootLogger=WARN, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=WARN, kafkaAppender +# Set WARN to INFO to see e.g. effective Kafka broker/consumer/producer config properties (cf. VerifiableProperties) +log4j.logger.kafka.utils=WARN, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=WARN, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false +log4j.logger.kafka.log.Cleaner=WARN, cleanerAppender +log4j.additivity.kafka.log.Cleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false + +# kafka-storm-starter settings +log4j.logger.com.miguno.kafkastorm=DEBUG, stdout +# If additivity is not set to false you will see log messages for com.miguno.kafkastorm.* twice. +log4j.additivity.com.miguno.kafkastorm=false diff --git a/src/test/scala/com/miguno/kafkastorm/integration/IntegrationSuite.scala b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationSuite.scala new file mode 100644 index 0000000..7e7b2b2 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationSuite.scala @@ -0,0 +1,9 @@ +package com.miguno.kafkastorm.integration + +import org.scalatest.Stepwise + +class IntegrationSuite extends Stepwise( + new KafkaSpec, + new StormSpec, + new KafkaStormSpec +) \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/integration/IntegrationTest.scala b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationTest.scala new file mode 100644 index 0000000..c8c9ecf --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationTest.scala @@ -0,0 +1,5 @@ +package com.miguno.kafkastorm.integration + +import org.scalatest.Tag + +object IntegrationTest extends Tag("com.miguno.kafkastorm.integration.IntegrationTest") diff --git a/src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala b/src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala new file mode 100644 index 0000000..c4697a1 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala @@ -0,0 +1,219 @@ +package com.miguno.kafkastorm.integration + +import _root_.kafka.message.MessageAndMetadata +import _root_.kafka.utils.{Logging, ZKStringSerializer} +import com.miguno.avro.Tweet +import com.miguno.kafkastorm.kafka.{KafkaProducerApp, ConsumerTaskContext, KafkaConsumer, KafkaEmbedded} +import com.miguno.kafkastorm.zookeeper.ZooKeeperEmbedded +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import java.util.Properties +import org.I0Itec.zkclient.ZkClient +import org.scalatest._ +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.reflectiveCalls +import kafka.admin.AdminUtils + +@DoNotDiscover +class KafkaSpec extends FunSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging { + + private val testTopic = "testing" + private val testTopicNumPartitions = 1 + private val testTopicReplicationFactor = 1 + private val zookeeperPort = 2181 + + private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None + private var zkClient: Option[ZkClient] = None + private var kafkaEmbedded: Option[KafkaEmbedded] = None + + implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Tweet] + + override def beforeAll() { + // Start embedded ZooKeeper server + zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort)) + + for {z <- zookeeperEmbedded} { + // Start embedded Kafka broker + val brokerConfig = new Properties + brokerConfig.put("zookeeper.connect", z.connectString) + kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig)) + for {k <- kafkaEmbedded} { + k.start() + } + + // Create test topic + val sessionTimeout = 30.seconds + val connectionTimeout = 30.seconds + zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt, + ZKStringSerializer)) + for { + zc <- zkClient + } { + val topicConfig = new Properties + AdminUtils.createTopic(zc, testTopic, testTopicNumPartitions, testTopicReplicationFactor, topicConfig) + } + } + } + + override def afterAll() { + for {k <- kafkaEmbedded} k.stop() + + for { + zc <- zkClient + } { + info("ZooKeeper client: shutting down...") + zc.close() + info("ZooKeeper client: shutdown completed") + } + + for {z <- zookeeperEmbedded} z.stop() + } + + + val fixture = { + val BeginningOfEpoch = 0.seconds + val AnyTimestamp = 1234.seconds + val now = System.currentTimeMillis().millis + + new { + val t1 = new Tweet("ANY_USER_1", "ANY_TEXT_1", now.toSeconds) + val t2 = new Tweet("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds) + val t3 = new Tweet("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds) + + val messages = Seq(t1, t2, t3) + } + } + + describe("Kafka") { + + it("should synchronously send and receive a Tweet in Avro format", IntegrationTest) { + for { + z <- zookeeperEmbedded + k <- kafkaEmbedded + } { + Given("a ZooKeeper instance") + And("a Kafka broker instance") + And("some tweets") + val f = fixture + val tweets = f.messages + And("a single-threaded Kafka consumer group") + // The Kafka consumer group must be running before the first messages are being sent to the topic. + val numConsumerThreads = 1 + val consumerConfig = { + val c = new Properties + c.put("group.id", "test-consumer") + c + } + val consumer = new KafkaConsumer(testTopic, z.connectString, numConsumerThreads, consumerConfig) + val actualTweets = new mutable.SynchronizedQueue[Tweet] + consumer.startConsumers( + (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => { + val tweet = Injection.invert[Tweet, Array[Byte]](m.message) + for {t <- tweet} { + info(s"Consumer thread ${c.threadId}: received Tweet ${t} from partition ${m.partition} of topic ${m.topic} (offset: ${m.offset})") + actualTweets += t + } + }) + val waitForConsumerStartup = 300.millis + debug(s"Waiting $waitForConsumerStartup ms for Kafka consumer threads to launch") + Thread.sleep(waitForConsumerStartup.toMillis) + debug("Finished waiting for Kafka consumer threads to launch") + + When("I start a synchronous Kafka producer that sends the tweets in Avro binary format") + val syncProducerConfig = { + val c = new Properties + c.put("producer.type", "sync") + c.put("client.id", "test-sync-producer") + c.put("request.required.acks", "1") + c + } + val producerApp = new KafkaProducerApp(testTopic, k.brokerList, syncProducerConfig) + tweets foreach { + case tweet => { + val bytes = Injection[Tweet, Array[Byte]](tweet) + info(s"Synchronously sending Tweet $tweet to topic ${producerApp.topic}") + producerApp.send(bytes) + } + } + + Then("the consumer app should receive the tweets") + val waitForConsumerToReadStormOutput = 300.millis + debug(s"Waiting $waitForConsumerToReadStormOutput ms for Kafka consumer threads to read messages") + Thread.sleep(waitForConsumerToReadStormOutput.toMillis) + debug("Finished waiting for Kafka consumer threads to read messages") + debug("Shutting down Kafka consumer threads") + consumer.shutdown() + + actualTweets.toSeq should be(f.messages.toSeq) + } + } + + it("should asynchronously send and receive a Tweet in Avro format", IntegrationTest) { + for { + z <- zookeeperEmbedded + k <- kafkaEmbedded + } { + Given("a ZooKeeper instance") + And("a Kafka broker instance") + And("some tweets") + val f = fixture + val tweets = f.messages + And("a single-threaded Kafka consumer group") + // The Kafka consumer group must be running before the first messages are being sent to the topic. + val numConsumerThreads = 1 + val consumerConfig = { + val c = new Properties + c.put("group.id", "test-consumer") + c + } + val consumer = new KafkaConsumer(testTopic, z.connectString, numConsumerThreads, consumerConfig) + val actualTweets = new mutable.SynchronizedQueue[Tweet] + consumer.startConsumers( + (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => { + val tweet = Injection.invert[Tweet, Array[Byte]](m.message) + for {t <- tweet} { + info(s"Consumer thread ${c.threadId}: received Tweet ${t} from partition ${m.partition} of topic ${m.topic} (offset: ${m.offset})") + actualTweets += t + } + }) + val waitForConsumerStartup = 300.millis + debug(s"Waiting $waitForConsumerStartup ms for Kafka consumer threads to launch") + Thread.sleep(waitForConsumerStartup.toMillis) + debug("Finished waiting for Kafka consumer threads to launch") + + When("I start an asynchronous Kafka producer that sends the tweets in Avro binary format") + val syncProducerConfig = { + val c = new Properties + c.put("producer.type", "async") + c.put("client.id", "test-sync-producer") + c.put("request.required.acks", "1") + // We must set `batch.num.messages` and/or `queue.buffering.max.ms` so that the async producer will actually + // send our (typically few) test messages before the unit test finishes. + c.put("batch.num.messages", tweets.size.toString) + c + } + val producerApp = new KafkaProducerApp(testTopic, k.brokerList, syncProducerConfig) + tweets foreach { + case tweet => { + val bytes = Injection[Tweet, Array[Byte]](tweet) + info(s"Asynchronously sending Tweet $tweet to topic ${producerApp.topic}") + producerApp.send(bytes) + } + } + + Then("the consumer app should receive the tweets") + val waitForConsumerToReadStormOutput = 300.millis + debug(s"Waiting $waitForConsumerToReadStormOutput ms for Kafka consumer threads to read messages") + Thread.sleep(waitForConsumerToReadStormOutput.toMillis) + debug("Finished waiting for Kafka consumer threads to read messages") + debug("Shutting down Kafka consumer threads") + consumer.shutdown() + + actualTweets.toSeq should be(f.messages.toSeq) + } + } + + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala b/src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala new file mode 100644 index 0000000..c53f32f --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala @@ -0,0 +1,318 @@ +package com.miguno.kafkastorm.integration + +import kafka.admin.AdminUtils +import _root_.kafka.utils.{Logging, ZKStringSerializer} +import _root_.storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} +import backtype.storm.{Testing, ILocalCluster, Config} +import backtype.storm.generated.StormTopology +import backtype.storm.spout.SchemeAsMultiScheme +import backtype.storm.testing._ +import backtype.storm.topology.TopologyBuilder +import com.miguno.avro.Tweet +import com.miguno.kafkastorm.kafka._ +import com.miguno.kafkastorm.storm.{AvroDecoderBolt, AvroKafkaSinkBolt, AvroScheme, TweetAvroKryoDecorator} +import com.miguno.kafkastorm.zookeeper.ZooKeeperEmbedded +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import java.util.Properties +import kafka.message.MessageAndMetadata +import org.I0Itec.zkclient.ZkClient +import org.scalatest._ +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.reflectiveCalls + +/** + * This Kafka/Storm integration test code is slightly more complicated than the other tests in this project. This is + * due to a number of reasons, such as: the way Storm topologies are "wired" and configured, the test facilities + * exposed by Storm, and -- on a higher level -- because there are quite a number of components involved (ZooKeeper, + * Kafka producers and consumers, Storm) which must be set up, run, and terminated in the correct order. For these + * reasons the integration tests are not simple "given/when/then" style tests. + */ +@DoNotDiscover +class KafkaStormSpec extends FeatureSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging { + + private val inputTopic = "testing-input" + private val inputTopicNumPartitions = 1 + private val inputTopicReplicationFactor = 1 + private val outputTopic = "testing-output" + private val outputTopicNumPartitions = 1 + private val outputTopicReplicationFactor = 1 + private val zookeeperPort = 2181 + private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None + private var zkClient: Option[ZkClient] = None + private var kafkaEmbedded: Option[KafkaEmbedded] = None + + implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Tweet] + + override def beforeAll() { + // Start embedded ZooKeeper server + zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort)) + + for {z <- zookeeperEmbedded} { + // Start embedded Kafka broker + val brokerConfig = new Properties + brokerConfig.put("zookeeper.connect", z.connectString) + kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig)) + for {k <- kafkaEmbedded} { + k.start() + } + + // Create test topics + val sessionTimeout = 30.seconds + val connectionTimeout = 30.seconds + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then createTopic() will only + // seem to work (it will return without error). Topic will exist in only ZooKeeper, and will be returned when + // listing topics, but Kafka itself does not create the topic. + zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt, + ZKStringSerializer)) + for { + zc <- zkClient + } { + val inputTopicConfig = new Properties + AdminUtils.createTopic(zc, inputTopic, inputTopicNumPartitions, inputTopicReplicationFactor, inputTopicConfig) + val outputTopicConfig = new Properties + AdminUtils.createTopic(zc, outputTopic, outputTopicNumPartitions, outputTopicReplicationFactor, + outputTopicConfig) + } + } + } + + override def afterAll() { + for {k <- kafkaEmbedded} k.stop() + + for { + zc <- zkClient + } { + info("ZooKeeper client: shutting down...") + zc.close() + info("ZooKeeper client: shutdown completed") + } + + for {z <- zookeeperEmbedded} z.stop() + } + + val fixture = { + val BeginningOfEpoch = 0.seconds + val AnyTimestamp = 1234.seconds + val now = System.currentTimeMillis().millis + + new { + val t1 = new Tweet("ANY_USER_1", "ANY_TEXT_1", now.toSeconds) + val t2 = new Tweet("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds) + val t3 = new Tweet("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds) + + val messages = Seq(t1, t2, t3) + } + } + + info("As a user of Storm") + info("I want to read Avro-encoded data from Kafka") + info("so that I can quickly build Kafka<->Storm data flows") + + feature("AvroDecoderBolt[T]") { + + scenario("User creates a Storm topology that uses AvroDecoderBolt", IntegrationTest) { + for { + k <- kafkaEmbedded + z <- zookeeperEmbedded + } { + Given("a ZooKeeper instance") + And("a Kafka broker instance") + And(s"a Storm topology that uses AvroDecoderBolt and that reads tweets from topic $inputTopic and writes " + + s"them as-is to topic $outputTopic") + // We create a topology instance that makes use of an Avro decoder bolt to deserialize the Kafka spout's output + // into pojos. Here, the data flow is KafkaSpout -> AvroDecoderBolt -> AvroKafkaSinkBolt. + val builder = new TopologyBuilder + val kafkaSpoutId = "kafka-spout" + val kafkaSpoutConfig = kafkaSpoutBaseConfig(z.connectString, inputTopic) + val kafkaSpout = new KafkaSpout(kafkaSpoutConfig) + val numSpoutExecutors = inputTopicNumPartitions + builder.setSpout(kafkaSpoutId, kafkaSpout, numSpoutExecutors) + + val decoderBoltId = "avro-decoder-bolt" + val decoderBolt = new AvroDecoderBolt[Tweet] + // Note: Should test messages arrive out-of-order, we may want to enforce a parallelism of 1 for this bolt. + builder.setBolt(decoderBoltId, decoderBolt).globalGrouping(kafkaSpoutId) + + val kafkaSinkBoltId = "avro-kafka-sink-bolt" + val producerAppFactory = new BaseKafkaProducerAppFactory(outputTopic, k.brokerList) + val kafkaSinkBolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory) + // Note: Should test messages arrive out-of-order, we may want to enforce a parallelism of 1 for this bolt. + builder.setBolt(kafkaSinkBoltId, kafkaSinkBolt).globalGrouping(decoderBoltId) + val topology = builder.createTopology() + + baseIntegrationTest(z, k, topology, inputTopic, outputTopic) + } + } + } + + feature("AvroScheme[T] for Kafka spout") { + scenario("User creates a Storm topology that uses AvroScheme in Kafka spout", IntegrationTest) { + for { + k <- kafkaEmbedded + z <- zookeeperEmbedded + } { + Given("a ZooKeeper instance") + And("a Kafka broker instance") + And(s"a Storm topology that uses AvroScheme and that reads tweets from topic $inputTopic and writes them " + + s"as-is to topic $outputTopic") + // Creates a topology instance that adds an Avro decoder "scheme" to the Kafka spout, so that the spout's output + // are ready-to-use pojos. Here, the data flow is KafkaSpout -> AvroKafkaSinkBolt. + // + // Note that Storm will still need to re-serialize the spout's pojo output to send the data across the wire to + // downstream consumers/bolts, which will then deserialize the data again. In our case we have a custom Kryo + // serializer registered with Storm to make this serde step as fast as possible. + val builder = new TopologyBuilder + val kafkaSpoutId = "kafka-spout" + val kafkaSpoutConfig = kafkaSpoutBaseConfig(z.connectString, inputTopic) + // You can provide the Kafka spout with a custom `Scheme` to deserialize incoming messages in a particular way. + // The default scheme is Storm's `backtype.storm.spout.RawMultiScheme`, which simply returns the raw bytes of the + // incoming data (i.e. leaving deserialization up to you). In this example, we configure the spout to use + // a custom scheme, AvroScheme[Tweet], which will modify the spout to automatically deserialize incoming data + // into pojos. + kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new AvroScheme[Tweet]) + val kafkaSpout = new KafkaSpout(kafkaSpoutConfig) + val numSpoutExecutors = inputTopicNumPartitions + builder.setSpout(kafkaSpoutId, kafkaSpout, numSpoutExecutors) + + val kafkaSinkBoltId = "avro-kafka-sink-bolt" + val producerAppFactory = new BaseKafkaProducerAppFactory(outputTopic, k.brokerList) + val kafkaSinkBolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory) + // Note: Should test messages arrive out-of-order, we may want to enforce a parallelism of 1 for this bolt. + builder.setBolt(kafkaSinkBoltId, kafkaSinkBolt).globalGrouping(kafkaSpoutId) + val topology = builder.createTopology() + + baseIntegrationTest(z, k, topology, inputTopic, outputTopic) + } + } + } + + private def kafkaSpoutBaseConfig(zookeeperConnect: String, inputTopic: String): SpoutConfig = { + val zkHosts = new ZkHosts(zookeeperConnect) + val zkRoot = "/kafka-storm-starter-spout" + // This id is appended to zkRoot for constructing a ZK path under which the spout stores partition information. + val zkId = "kafka-spout" + // To configure the spout to read from the very beginning of the topic (auto.offset.reset = smallest), you can use + // either of the following two equivalent approaches: + // + // 1. spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime + // 2. spoutConfig.forceFromStart = true + // + // To configure the spout to read from the end of the topic (auto.offset.reset = largest), you can use either of + // the following two equivalent approaches: + // + // 1. Do nothing -- reading from the end of the topic is the default behavior. + // 2. spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime + // + val spoutConfig = new SpoutConfig(zkHosts, inputTopic, zkRoot, zkId) + spoutConfig + } + + /** + * This method sends Avro-encoded test data into a Kafka "input" topic. This data is read from Kafka into Storm, + * which will then decode and re-encode the data, and then write the data to an "output" topic in Kafka (which is our + * means/workaround to "tap into" Storm's output, as we haven't been able yet to use Storm's built-in testing + * facilities for such integration tests). Lastly, we read the data from the "output" topic via a Kafka consumer + * group, and then compare the output data with the input data, with the latter serving the dual purpose of also + * being the expected output data. + */ + private def baseIntegrationTest(zookeeper: ZooKeeperEmbedded, kafka: KafkaEmbedded, topology: StormTopology, + inputTopic: String, outputTopic: String) { + And("some tweets") + val f = fixture + val tweets = f.messages + + And(s"a synchronous Kafka producer app that writes to the topic $inputTopic") + val kafkaSyncProducerConfig = { + val c = new Properties + c.put("producer.type", "sync") + c.put("client.id", "kafka-storm-test-sync-producer") + c.put("request.required.acks", "1") + c + } + val producerApp = new KafkaProducerApp(inputTopic, kafka.brokerList, kafkaSyncProducerConfig) + + And(s"a single-threaded Kafka consumer app that reads from topic $outputTopic") + // We start the Kafka consumer group, which (in our case) must be running before the first messages are being sent + // to the output Kafka topic. The Storm topology will write its output to this topic. We use the Kafka consumer + // group to learn which data was created by Storm, and compare this actual output data to the expected data (which + // in our case is the original input data). + val numConsumerThreads = 1 + val kafkaConsumerConfig = { + val c = new Properties + c.put("group.id", "kafka-storm-test-consumer") + c + } + val consumer = new KafkaConsumer(outputTopic, zookeeper.connectString, numConsumerThreads, kafkaConsumerConfig) + val actualTweets = new mutable.SynchronizedQueue[Tweet] + consumer.startConsumers( + (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => { + val tweet = Injection.invert[Tweet, Array[Byte]](m.message()) + for {t <- tweet} { + info(s"Consumer thread ${c.threadId}: received Tweet $t from partition ${m.partition} of topic ${m.topic} " + + s"(offset: ${m.offset})") + actualTweets += t + } + }) + val waitForConsumerStartup = 300.millis + Thread.sleep(waitForConsumerStartup.toMillis) + + And("a Storm topology configuration that registers an Avro Kryo decorator for Tweet") + // We create the topology configuration here simply to clarify that it is part of the test's initial context defined + // under "Given". + val topologyConfig = { + val conf = new Config + // Use more than one worker thread. It looks as if serialization occurs only if you have actual parallelism in + // LocalCluster (i.e. numWorkers > 1). + conf.setNumWorkers(2) + // Never use Java's default serialization. This allows us to see whether Kryo serialization is properly + // configured and working for all types. + conf.setFallBackOnJavaSerialization(false) + // Serialization config, see http://storm.incubator.apache.org/documentation/Serialization.html + // Note: We haven't been able yet to come up with a KryoDecorator[Tweet] approach. + conf.registerDecorator(classOf[TweetAvroKryoDecorator]) + conf + } + + When("I run the Storm topology") + val stormTestClusterParameters = { + val mkClusterParam = new MkClusterParam + mkClusterParam.setSupervisors(2) + val daemonConf = new Config + // STORM_LOCAL_MODE_ZMQ: Whether or not to use ZeroMQ for messaging in local mode. If this is set to false, then + // Storm will use a pure-Java messaging system. The purpose of this flag is to make it easy to run Storm in local + // mode by eliminating the need for native dependencies, which can be difficult to install. + daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false: java.lang.Boolean) + mkClusterParam.setDaemonConf(daemonConf) + mkClusterParam + } + Testing.withLocalCluster(stormTestClusterParameters, new TestJob() { + override def run(stormCluster: ILocalCluster) { + val topologyName = "storm-kafka-integration-test" + stormCluster.submitTopology(topologyName, topologyConfig, topology) + val waitForTopologyStartupMs = 3.seconds.toMillis + Thread.sleep(waitForTopologyStartupMs) + + And("I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka") + // Send the test input data to Kafka. + tweets foreach { + case tweet => + val bytes = Injection[Tweet, Array[Byte]](tweet) + info(s"Synchronously sending Tweet $tweet to topic ${producerApp.topic}") + producerApp.send(bytes) + } + + val waitForStormToReadFromKafka = 1.seconds + Thread.sleep(waitForStormToReadFromKafka.toMillis) + } + }) + + Then("the Kafka consumer app should receive the decoded, original tweets from the Storm topology") + val waitForConsumerToReadStormOutput = 300.millis + Thread.sleep(waitForConsumerToReadStormOutput.toMillis) + consumer.shutdown() + actualTweets.toSeq should be(tweets.toSeq) + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala b/src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala new file mode 100644 index 0000000..cd9ea94 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala @@ -0,0 +1,113 @@ +package com.miguno.kafkastorm.integration + +import _root_.kafka.utils.Logging +import backtype.storm.{Config, ILocalCluster, Testing} +import backtype.storm.testing._ +import backtype.storm.topology.TopologyBuilder +import backtype.storm.tuple.{Fields, Values} +import org.scalatest._ + +/** + * For more details on Storm unit testing please take a look at: + * https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java + */ +@DoNotDiscover +class StormSpec extends FunSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging { + + describe("Storm") { + + it("should start a local cluster", IntegrationTest) { + Given("no cluster") + + When("I start a LocalCluster instance") + val mkClusterParam = new MkClusterParam + mkClusterParam.setSupervisors(2) + mkClusterParam.setPortsPerSupervisor(2) + val daemonConf = new Config + daemonConf.put(Config.SUPERVISOR_ENABLE, false: java.lang.Boolean) + daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0: Integer) + mkClusterParam.setDaemonConf(daemonConf) + + // When testing your topology, you need a `LocalCluster` to run your topologies. Normally this would mean you'd + // have to perform lifecycle management of that local cluster, i.e. you'd need to create it, and after using it, + // you'd need to stop it. Using `Testing.withLocalCluster` you don't need to do any of this, just use the + // `cluster` provided through the param of `TestJob.run`.` + Testing.withLocalCluster(mkClusterParam, new TestJob { + override def run(stormCluster: ILocalCluster) { + Then("the local cluster should start properly") + stormCluster.getState shouldNot be(null) + } + }) + } + + it("should run a basic topology", IntegrationTest) { + Given("a local cluster") + And("a wordcount topology") + val mkClusterParam = new MkClusterParam + mkClusterParam.setSupervisors(4) + val daemonConf = new Config + daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false: java.lang.Boolean) + mkClusterParam.setDaemonConf(daemonConf) + + // Base topology setup + val builder = new TopologyBuilder + val spoutId = "wordSpout" + builder.setSpout(spoutId, new TestWordSpout(true), 3) + val wordCounterId = "wordCounterBolt" + builder.setBolt(wordCounterId, new TestWordCounter, 4).fieldsGrouping(spoutId, new Fields("word")) + val globalCountId = "globalCountBolt" + builder.setBolt(globalCountId, new TestGlobalCount).globalGrouping(spoutId) + val aggregatesCounterId = "aggregatesCounterBolt" + builder.setBolt(aggregatesCounterId, new TestAggregatesCounter).globalGrouping(wordCounterId) + val topology = builder.createTopology() + val completeTopologyParam = new CompleteTopologyParam + + And("the input words alice, bob, joe, alice") + val mockedSources = new MockedSources() + mockedSources.addMockData(spoutId, new Values("alice"), new Values("bob"), new Values("joe"), new Values("alice")) + completeTopologyParam.setMockedSources(mockedSources) + + // Finalize topology config + val conf = new Config + conf.setNumWorkers(2) + completeTopologyParam.setStormConf(conf) + + When("I submit the topology") + var result: Option[java.util.Map[_, _]] = None + Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() { + override def run(stormCluster: ILocalCluster) { + // `completeTopology()` takes your topology, cluster, and configuration. It will mock out the spouts you + // specify, and will run the topology until it is idle and all tuples from the spouts have been either acked or + // failed, and return all the tuples that have been emitted from all the topology components. + result = Some(Testing.completeTopology(stormCluster, topology, completeTopologyParam)) + } + }) + + // We could split this `Then()` into multiple ones, each of which covering one of the `Testing.multiseteq()` calls + // below. Left as an exercise for the reader. :-) + Then("the topology should properly count the words") + // Type ascription required for Scala-Java interoperability. + val one = 1: Integer + val two = 2: Integer + val three = 3: Integer + val four = 4: Integer + + // Verify the expected behavior for each of the components (spout + bolts) in the topology by comparing + // their actual output tuples vs. the corresponding expected output tuples. + for { + r <- result + } { + Testing.multiseteq(Testing.readTuples(r, spoutId), + new Values(new Values("alice"), new Values("bob"), new Values("joe"), new Values("alice"))) should be(true) + Testing.multiseteq(Testing.readTuples(r, wordCounterId), + new Values(new Values("alice", one), new Values("alice", two), new Values("bob", one), new Values("joe", one))) should be(true) + Testing.multiseteq(Testing.readTuples(r, globalCountId), + new Values(new Values(one), new Values(two), new Values(three), new Values(four))) should be(true) + Testing.multiseteq(Testing.readTuples(r, aggregatesCounterId), + new Values(new Values(one), new Values(two), new Values(three), new Values(four))) should be(true) + } + } + + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/kafka/KafkaProducerAppSpec.scala b/src/test/scala/com/miguno/kafkastorm/kafka/KafkaProducerAppSpec.scala new file mode 100644 index 0000000..81a05ea --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/kafka/KafkaProducerAppSpec.scala @@ -0,0 +1,59 @@ +package com.miguno.kafkastorm.kafka + +import _root_.kafka.utils.Logging +import java.util.Properties +import org.scalatest.{FunSpec, GivenWhenThen, Matchers} + +class KafkaProducerAppSpec extends FunSpec with Matchers with GivenWhenThen with Logging { + + private val AnyTopic = "some-topic" + private val AnyBrokerList = "a:9092,b:9093" + private val AnyConfigParam = "queue.buffering.max.ms" + private val AnyConfigValue = "12345" + + describe("A KafkaProducerApp") { + + it("should let the user configure the broker list") { + Given("no app") + + When("I create an app with the broker list set to " + AnyBrokerList) + val producerApp = new KafkaProducerApp(AnyTopic, AnyBrokerList) + + Then("the Kafka producer's metadata.broker.list config parameter should be set to this value") + producerApp.config.props.getString("metadata.broker.list") should be(AnyBrokerList) + } + + it("should use the broker list constructor parameter as the authoritative setting for the broker list") { + Given("no app") + + When("I create an app with a producer config that sets the broker list to notMe:1234") + val config = { + val c = new Properties + c.put("metadata.broker.list", "notMe:1234") + c + } + And("with the constructor parameter that sets the broker list to " + AnyBrokerList) + val producerApp = new KafkaProducerApp(AnyTopic, AnyBrokerList, config) + + Then("the Kafka producer's actual broker list should be " + AnyBrokerList) + producerApp.config.props.getString("metadata.broker.list") should be(AnyBrokerList) + } + + it("should let the user customize the Kafka producer configuration") { + Given("no app") + + When(s"I create an app with a producer config that sets $AnyConfigParam to $AnyConfigValue") + val config = { + val c = new Properties + c.put(AnyConfigParam, AnyConfigValue) + c + } + val producerApp = new KafkaProducerApp(AnyTopic, AnyBrokerList, config) + + Then(s"the Kafka producer's $AnyConfigParam parameter should be to set to $AnyConfigValue") + producerApp.config.props.getString(AnyConfigParam) should be(AnyConfigValue) + } + + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala b/src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala new file mode 100644 index 0000000..fb98046 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala @@ -0,0 +1,144 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} +import backtype.storm.tuple.{Fields, Tuple, Values} +import com.miguno.avro.Tweet +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import org.mockito.Matchers._ +import org.mockito.Mockito.{when => mwhen, _} +import org.scalatest.{FunSpec, GivenWhenThen, Matchers} +import org.scalatest.mock.MockitoSugar +import scala.concurrent.duration._ + +class AvroDecoderBoltSpec extends FunSpec with Matchers with GivenWhenThen with MockitoSugar { + + implicit val specificAvroBinaryInjection: Injection[Tweet, Array[Byte]] = SpecificAvroCodecs.toBinary[Tweet] + + private type AnyAvroSpecificRecordBase = Tweet + + private val AnyTweet = new Tweet("ANY_USER_1", "ANY_TEXT_1", 1234.seconds.toSeconds) + private val AnyTweetInAvroBytes = Injection[Tweet, Array[Byte]](AnyTweet) + + describe("An AvroDecoderBolt") { + + it("should read by default the input field 'bytes' from incoming tuples") { + Given("no bolt") + + When("I create a bolt without customizing the input field name") + val bolt = new AvroDecoderBolt[AnyAvroSpecificRecordBase] + And("the bolt receives a tuple") + val tuple = mock[Tuple] + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("the bolt should read the field 'bytes' from the tuple") + verify(tuple, times(1)).getBinaryByField("bytes") + } + + it("should let the user configure the name of the input field to read from incoming tuples") { + Given("no bolt") + + When("I create a bolt with a custom input field name 'foobar'") + val bolt = new AvroDecoderBolt[AnyAvroSpecificRecordBase](inputField = "foobar") + And("the bolt receives a tuple") + val tuple = mock[Tuple] + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("the bolt should read the field 'foobar' from the tuple") + verify(tuple, times(1)).getBinaryByField("foobar") + } + + it("should deserialize binary records into pojos and send the pojos to downstream bolts") { + Given("a bolt of type Tweet") + val bolt = new AvroDecoderBolt[Tweet] + And("a Tweet record") + val tuple = mock[Tuple] + mwhen(tuple.getBinaryByField(anyString)).thenReturn(AnyTweetInAvroBytes) + + When("the bolt receives the Tweet record") + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("the bolt should send the decoded Tweet pojo to downstream bolts") + verify(collector, times(1)).emit(new Values(AnyTweet)) + } + + it("should skip over tuples that contain invalid binary records") { + Given("a bolt of type Tweet") + val bolt = new AvroDecoderBolt[Tweet] + And("an invalid binary record") + val tuple = mock[Tuple] + val invalidBinaryRecord = Array[Byte](1, 2, 3, 4) + mwhen(tuple.getBinaryByField(anyString)).thenReturn(invalidBinaryRecord) + + When("the bolt receives the record") + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("the bolt should not send any data to downstream bolts") + verifyZeroInteractions(collector) + } + + it("should skip over tuples for which reading fails") { + Given("a bolt") + val bolt = new AvroDecoderBolt[AnyAvroSpecificRecordBase] + And("a tuple from which one cannot read") + val tuple = mock[Tuple] + mwhen(tuple.getBinaryByField(anyString)).thenReturn(null) + + When("the bolt receives the tuple") + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("the bolt should not send any data to downstream bolts") + verifyZeroInteractions(collector) + } + + it("should declare a single output field with the default name 'pojo'") { + Given("no bolt") + + When("I create a bolt without customizing the output field name") + val bolt = new AvroDecoderBolt[Tweet] + + Then("the bolt should declare a single output field named 'pojo'") + val declarer = mock[OutputFieldsDeclarer] + bolt.declareOutputFields(declarer) + // We use ArgumentMatcher as a workaround because Storm's Field class does not implement a proper `equals()` + // method, and Mockito relies on `equals()` for verification. Because of that the following typical approach + // does NOT work: `verify(declarer, times(1)).declare(new Fields("pojo"))`. + verify(declarer, times(1)).declare(argThat(FieldsEqualTo(new Fields("pojo")))) + } + + it("should let the user define the name of its output field") { + Given("no bolt") + + When("I create a bolt with a custom output field name") + val bolt = new AvroDecoderBolt[Tweet](outputField = "myCustomFieldName") + + Then("the bolt should declare a single output field with this custom name") + val declarer = mock[OutputFieldsDeclarer] + bolt.declareOutputFields(declarer) + verify(declarer, times(1)).declare(argThat(FieldsEqualTo(new Fields("myCustomFieldName")))) + } + + } + + describe("An AvroDecoderBolt companion object") { + + it("should create an AvroDecoderBolt for the correct type") { + Given("a companion object") + + When("I ask it to create a bolt for type Tweet") + val bolt = AvroDecoderBolt.ofType(classOf[Tweet]) + + Then("the bolt should be an AvroDecoderBolt") + bolt shouldBe an[AvroDecoderBolt[_]] + And("the bolt should be parameterized with the type Tweet") + bolt.tpe.shouldEqual(manifest[Tweet]) + } + + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBoltSpec.scala b/src/test/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBoltSpec.scala new file mode 100644 index 0000000..6306234 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBoltSpec.scala @@ -0,0 +1,113 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.task.TopologyContext +import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} +import backtype.storm.tuple.{Fields, Tuple} +import com.miguno.avro.Tweet +import com.miguno.kafkastorm.kafka.{KafkaProducerApp, KafkaProducerAppFactory} +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import java.util +import org.mockito.AdditionalMatchers +import org.mockito.Matchers.argThat +import org.mockito.Mockito.{when => mwhen, _} +import org.scalatest.{FunSpec, GivenWhenThen, Matchers} +import org.scalatest.mock.MockitoSugar +import scala.concurrent.duration._ + +class AvroKafkaSinkBoltSpec extends FunSpec with Matchers with GivenWhenThen with MockitoSugar { + + implicit val specificAvroBinaryInjection: Injection[Tweet, Array[Byte]] = SpecificAvroCodecs.toBinary[Tweet] + + private type AnyAvroSpecificRecordBase = Tweet + + private val AnyTweet = new Tweet("ANY_USER_1", "ANY_TEXT_1", 1234.seconds.toSeconds) + private val AnyTweetInAvroBytes = Injection[Tweet, Array[Byte]](AnyTweet) + private val DummyStormConf = new util.HashMap[Object, Object] + private val DummyStormContext = mock[TopologyContext] + + describe("An AvroKafkaSinkBolt") { + + it("should send pojos of the configured type to Kafka in Avro-encoded binary format") { + Given("a bolt for type Tweet") + val producerApp = mock[KafkaProducerApp] + val producerAppFactory = mock[KafkaProducerAppFactory] + mwhen(producerAppFactory.newInstance()).thenReturn(producerApp) + val bolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory) + bolt.prepare(DummyStormConf, DummyStormContext) + + When("it receives a Tweet pojo") + val tuple = mock[Tuple] + // The `Nil: _*` is required workaround because of a known Scala-Java interop problem related to Scala's treatment + // of Java's varargs. See http://stackoverflow.com/a/13361530/1743580. + mwhen(tuple.getValueByField("pojo")).thenReturn(AnyTweet, Nil: _*) + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("it should send the Avro-encoded pojo to Kafka") + // Note: The simpler Mockito variant of `verify(kafkaProducer).send(AnyTweetInAvroBytes)` is not enough because + // this variant will not verify whether the Array[Byte] parameter passed to `send()` has the correct value. + verify(producerApp).send(AdditionalMatchers.aryEq(AnyTweetInAvroBytes)) + And("it should not send any data to downstream bolts") + verifyZeroInteractions(collector) + } + + it("should ignore pojos of an unexpected type") { + Given("a bolt for type Tweet") + val producerApp = mock[KafkaProducerApp] + val producerAppFactory = mock[KafkaProducerAppFactory] + mwhen(producerAppFactory.newInstance()).thenReturn(producerApp) + val bolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory) + bolt.prepare(DummyStormConf, DummyStormContext) + + When("receiving a non-Tweet pojo") + val tuple = mock[Tuple] + val invalidPojo = "I am not of the expected type!" + // The `Nil: _*` is required workaround because of a known Scala-Java interop problem related to Scala's treatment + // of Java's varargs. See http://stackoverflow.com/a/13361530/1743580. + mwhen(tuple.getValueByField("pojo")).thenReturn(invalidPojo, Nil: _*) + val collector = mock[BasicOutputCollector] + bolt.execute(tuple, collector) + + Then("it should not send any data to Kafka") + verifyZeroInteractions(producerApp) + And("it should not send any data to downstream bolts") + verifyZeroInteractions(collector) + } + + it("should not declare any output fields") { + Given("no bolt") + + When("I create a bolt") + val producerAppFactory = mock[KafkaProducerAppFactory] + val bolt = new AvroKafkaSinkBolt[AnyAvroSpecificRecordBase](producerAppFactory) + + Then("it should declare zero output fields") + val declarer = mock[OutputFieldsDeclarer] + bolt.declareOutputFields(declarer) + // We use ArgumentMatcher as a workaround because Storm's Field class does not implement a proper `equals()` + // method, and Mockito relies on `equals()` for verification. Because of that the following typical approach + // does NOT work: `verify(declarer, times(1)).declare(new Fields())`. + verify(declarer, times(1)).declare(argThat(FieldsEqualTo(new Fields()))) + } + + } + + describe("An AvroKafkaSinkBolt companion object") { + + it("should create an AvroKafkaSinkBolt for the correct type") { + Given("a companion object") + + When("I ask it to create a bolt for type Tweet") + val producerAppFactory = mock[KafkaProducerAppFactory] + val bolt = AvroKafkaSinkBolt.ofType(classOf[Tweet])(producerAppFactory) + + Then("the bolt should be an AvroKafkaSinkBolt") + bolt shouldBe an[AvroKafkaSinkBolt[_]] + And("the bolt should be parameterized with the type Tweet") + bolt.tpe.shouldEqual(manifest[Tweet]) + } + + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala b/src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala new file mode 100644 index 0000000..018f275 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala @@ -0,0 +1,97 @@ +package com.miguno.kafkastorm.storm + +import com.miguno.avro.Tweet +import com.twitter.bijection.Injection +import com.twitter.bijection.avro.SpecificAvroCodecs +import org.scalatest.{FunSpec, GivenWhenThen, Matchers} +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.reflectiveCalls + +class AvroSchemeSpec extends FunSpec with Matchers with GivenWhenThen { + + implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Tweet] + + val fixture = { + val BeginningOfEpoch = 0.seconds + val AnyTimestamp = 1234.seconds + val now = System.currentTimeMillis().millis + + new { + val t1 = new Tweet("ANY_USER_1", "ANY_TEXT_1", now.toSeconds) + val t2 = new Tweet("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds) + val t3 = new Tweet("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds) + + val messages = Seq(t1, t2, t3) + } + } + + describe("An AvroScheme") { + + it("should have a single output field named 'pojo'") { + Given("a scheme") + val scheme = new AvroScheme + + When("I get its output fields") + val outputFields = scheme.getOutputFields() + + Then("there should only be a single field") + outputFields.size() should be(1) + + And("this field should be named 'pojo'") + outputFields.contains("pojo") should be(true) + } + + + it("should deserialize binary records of the configured type into pojos") { + Given("a scheme for type Tweet ") + val scheme = new AvroScheme[Tweet] + And("some binary-encoded Tweet records") + val f = fixture + val encodedTweets = f.messages.map(Injection[Tweet, Array[Byte]]) + + When("I deserialize the records into pojos") + val actualTweets = for { + l <- encodedTweets.map(scheme.deserialize) + tweet <- l.asScala + } yield tweet + + Then("the pojos should be equal to the original pojos") + actualTweets should be(f.messages) + } + + it("should throw a runtime exception when serialization fails") { + Given("a scheme for type Tweet ") + val scheme = new AvroScheme[Tweet] + And("an invalid binary record") + val invalidBytes = Array[Byte](1, 2, 3, 4) + + When("I deserialize the record into a pojo") + + Then("the scheme should throw a runtime exception") + val exception = intercept[RuntimeException] { + scheme.deserialize(invalidBytes) + } + And("the exception should provide a meaningful explanation") + exception.getMessage should be("Could not decode input bytes") + } + + } + + describe("An AvroScheme companion object") { + + it("should create an AvroScheme for the correct type") { + Given("a companion object") + + When("I ask it to create a scheme for type Tweet") + val scheme = AvroScheme.ofType(classOf[Tweet]) + + Then("the scheme should be an AvroScheme") + scheme shouldBe an[AvroScheme[_]] + And("the scheme should be parameterized with the type Tweet") + scheme.tpe.shouldEqual(manifest[Tweet]) + } + + } + +} \ No newline at end of file diff --git a/src/test/scala/com/miguno/kafkastorm/storm/FieldsEqualTo.scala b/src/test/scala/com/miguno/kafkastorm/storm/FieldsEqualTo.scala new file mode 100644 index 0000000..7f8c741 --- /dev/null +++ b/src/test/scala/com/miguno/kafkastorm/storm/FieldsEqualTo.scala @@ -0,0 +1,30 @@ +package com.miguno.kafkastorm.storm + +import backtype.storm.tuple.Fields +import org.mockito.ArgumentMatcher +import scala.collection.JavaConverters._ + +/** + * [[org.mockito.ArgumentMatcher]] for Storm's [[backtype.storm.tuple.Fields]]. + * + * @example {{{ + * // Verify that a single field named "pojo" is declared. + * verify(declarer).declare(argThat(FieldsEqualTo(new Fields("pojo")))) + * }}} + * + * ==Why this approach is required== + * We must use an ArgumentMatcher as a workaround because Storm's Field class does not implement a proper `equals()` + * method, and Mockito relies on `equals()` for verification. Because of that the following intuitive approach for + * Mockito does not work: `verify(declarer, times(1)).declare(new Fields("bytes"))`. + * @param expectedFields + */ +class FieldsEqualTo(val expectedFields: Fields) extends ArgumentMatcher[Fields] { + override def matches(o: scala.Any): Boolean = { + val fields = o.asInstanceOf[Fields].toList.asScala + fields == expectedFields.toList.asScala + } +} + +object FieldsEqualTo { + def apply(expFields: Fields) = new FieldsEqualTo(expFields) +} \ No newline at end of file diff --git a/version.sbt b/version.sbt new file mode 100644 index 0000000..e765444 --- /dev/null +++ b/version.sbt @@ -0,0 +1 @@ +version in ThisBuild := "0.1.0"