Skip to content

Commit

Permalink
producer code for scala w/akka for event hubs
Browse files Browse the repository at this point in the history
  • Loading branch information
kmansel committed Jun 25, 2018
1 parent 71c79f0 commit dcd900e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
7 changes: 7 additions & 0 deletions samples/kafka/akka-scala/producer/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name := "scala-event-hubs-producer"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
13 changes: 13 additions & 0 deletions samples/kafka/akka-scala/producer/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
akka.kafka.producer {
#Akka kafka producer properties can be defined here


# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
41 changes: 41 additions & 0 deletions samples/kafka/akka-scala/producer/scala/ProducerMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import java.util.concurrent.TimeUnit.SECONDS

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps

object ProducerMain {

def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem.apply("akka-stream-kafka")
implicit val materializer: ActorMaterializer = ActorMaterializer()

// grab our settings from the resources/application.conf file
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)

// topic to send the message to on event hubs
val topic = "sampleTopic"

// loop until the program reaches 100
Source(1 to 100)
.throttle(1, FiniteDuration(1, SECONDS), 1, ThrottleMode.Shaping)
.map(num => {
//construct our message here
val message = s"Akka Scala Producer Message # ${num}"
println(s"Message sent to topic - $topic - $message")
new ProducerRecord[Array[Byte], String](topic, message.getBytes, message.toString)
})
.runWith(Producer.plainSink(producerSettings))
.onComplete(_ => {
println("All messages sent!")
system.terminate()
})(scala.concurrent.ExecutionContext.global)
}
}

0 comments on commit dcd900e

Please sign in to comment.