diff --git a/README.md b/README.md
index 4b7c7760a..dcdf1f3aa 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,14 @@
# spark-eventhubs [![Build Status](https://travis-ci.org/hdinsight/spark-eventhubs.svg?branch=master)](https://travis-ci.org/hdinsight/spark-eventhubs)
This is the source code of EventHubsReceiver for Spark Streaming.
-[Here](https://github.com/hdinsight/spark-streaming-data-persistence-examples) is an example project that uses EventHubsReceiver to count and persist messages from Azure Eventhubs.
+[Here](https://github.com/hdinsight/spark-eventhubs/tree/master/examples) is the examples that use this library to process streaming data from Azure Eventhubs.
For latest integration of EventHubs and Spark Streaming, the document can be found [here](docs/direct_stream.md).
+## Latest Release: 2.0.4
+
+[Change Log](doc/change_log.md)
+
## Usage
### Getting Officially Released Version
diff --git a/core/pom.xml b/core/pom.xml
index cfb380a79..eeb8fc858 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -23,7 +23,7 @@
com.microsoft.azure
spark-streaming-eventhubs_connector_2.11
- 2.0.4-SNAPSHOT
+ 2.0.4
../pom.xml
spark-streaming-eventhubs_2.11
diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala
index 64089d35a..9a5c821a5 100644
--- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala
+++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala
@@ -23,7 +23,10 @@ private[eventhubs] trait EventHubClient extends Serializable {
* return the end point of each partition
* @return a map from eventhubName-partition to (offset, seq)
*/
- def endPointOfPartition(retryIfFail: Boolean): Option[Map[EventHubNameAndPartition, (Long, Long)]]
+ def endPointOfPartition(
+ retryIfFail: Boolean,
+ targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()):
+ Option[Map[EventHubNameAndPartition, (Long, Long)]]
/**
* close this client
diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala
index fcef12cba..532de5971 100644
--- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala
+++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.eventhubs
import java.io.{IOException, ObjectInputStream}
import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
import com.microsoft.azure.eventhubs.{EventData, PartitionReceiver}
@@ -122,13 +123,39 @@ private[eventhubs] class EventHubDirectDStream private[eventhubs] (
eventHubClient.close()
}
+ private def collectPartitionsNeedingLargerProcessingRange(): List[EventHubNameAndPartition] = {
+ val partitionList = new ListBuffer[EventHubNameAndPartition]
+ if (fetchedHighestOffsetsAndSeqNums != null) {
+ for ((ehNameAndPartition, (offset, seqId)) <- fetchedHighestOffsetsAndSeqNums.offsets) {
+ if (currentOffsetsAndSeqNums.offsets(ehNameAndPartition)._2 >=
+ fetchedHighestOffsetsAndSeqNums.offsets(ehNameAndPartition)._2) {
+ partitionList += ehNameAndPartition
+ }
+ }
+ } else {
+ partitionList ++= eventhubNameAndPartitions
+ }
+ partitionList.toList
+ }
+
private def fetchLatestOffset(validTime: Time, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
- val r = eventHubClient.endPointOfPartition(retryIfFail)
+ // check if there is any eventhubs partition which potentially has newly arrived message (
+ // the fetched highest message id is within the next batch's processing engine)
+ val demandingEhNameAndPartitions = collectPartitionsNeedingLargerProcessingRange()
+ val r = eventHubClient.endPointOfPartition(retryIfFail, demandingEhNameAndPartitions)
if (r.isDefined) {
- fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, r.get)
+ // merge results
+ val mergedOffsets = if (fetchedHighestOffsetsAndSeqNums != null) {
+ fetchedHighestOffsetsAndSeqNums.offsets ++ r.get
+ } else {
+ r.get
+ }
+ fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, mergedOffsets)
+ Some(fetchedHighestOffsetsAndSeqNums.offsets)
+ } else {
+ r
}
- r
}
/**
diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala
index 507d9db63..dd5385459 100644
--- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala
@@ -176,8 +176,10 @@ class EventHubsClientWrapper extends Serializable with EventHubClient {
private var MAXIMUM_EVENT_RATE: Int = 0
private val DEFAULT_RECEIVER_EPOCH = -1L
- override def endPointOfPartition(retryIfFail: Boolean):
- Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
+ override def endPointOfPartition(
+ retryIfFail: Boolean,
+ targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]):
+ Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
throw new UnsupportedOperationException("endPointOfPartition is not supported by this client" +
" yet, please use RestfulEventHubClient")
}
diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala
index c67e9aa77..150b76d6e 100644
--- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala
+++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala
@@ -114,10 +114,6 @@ object EventHubsUtils {
eventHubNamespace: String,
progressDir: String,
eventParams: Predef.Map[String, Predef.Map[String, String]]): EventHubDirectDStream = {
- if (!ssc.sparkContext.isLocal) {
- require(progressDir.startsWith("hdfs://") || progressDir.startsWith("adl://"),
- "we only support HDFS/ADLS based progress file storage")
- }
val newStream = new EventHubDirectDStream(ssc, eventHubNamespace, progressDir, eventParams)
newStream
}
diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala
index 2abe702ee..374d295ae 100644
--- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala
+++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.eventhubs
+import java.net.SocketTimeoutException
import java.time.Duration
import java.util.concurrent.Executors
@@ -47,7 +48,7 @@ private[eventhubs] class RestfulEventHubClient(
policyKeys: Map[String, Tuple2[String, String]],
threadNum: Int) extends EventHubClient with Logging {
- private val RETRY_INTERVAL_SECONDS = Array(2, 4, 8, 16)
+ private val RETRY_INTERVAL_SECONDS = Array(8, 16, 32, 64, 128)
// will be used to execute requests to EventHub
import Implicits.exec
@@ -87,46 +88,74 @@ private[eventhubs] class RestfulEventHubClient(
}
}
- private def queryPartitionRuntimeInfo[T](
- fromResponseBodyToResult: String => T, retryIfFail: Boolean):
- Option[Map[EventHubNameAndPartition, T]] = {
- val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]]
- for ((eventHubName, numPartitions) <- numPartitionsEventHubs;
- partitionId <- 0 until numPartitions) {
- futures += Future {
- var retryTime = 0
- var successfullyFetched = false
- var response: HttpResponse[String] = null
- val ehNameAndPartition = EventHubNameAndPartition(eventHubName, partitionId)
- while (!successfullyFetched) {
- logDebug(s"start fetching latest offset of $ehNameAndPartition")
- val urlString = fromParametersToURLString(eventHubName, partitionId)
- response = Http(urlString).
- header("Authorization",
- createSasToken(eventHubName,
- policyName = policyKeys(eventHubName)._1,
- policyKey = policyKeys(eventHubName)._2)).
+ private def composeQuery[T](
+ retryIfFail: Boolean,
+ fromResponseBodyToResult: String => T,
+ nameAndPartition: EventHubNameAndPartition):
+ Future[(EventHubNameAndPartition, T)] = {
+ Future {
+ var retryTime = 0
+ var successfullyFetched = false
+ var response: HttpResponse[String] = null
+ val ehNameAndPartition = nameAndPartition
+ val eventHubName = nameAndPartition.eventHubName
+ val partitionId = nameAndPartition.partitionId
+ while (!successfullyFetched) {
+ logDebug(s"start fetching latest offset of $ehNameAndPartition")
+ val urlString = fromParametersToURLString(eventHubName, partitionId)
+ try {
+ response = Http(urlString).header("Authorization",
+ createSasToken(eventHubName,
+ policyName = policyKeys(eventHubName)._1,
+ policyKey = policyKeys(eventHubName)._2)).
header("Content-Type", "application/atom+xml;type=entry;charset=utf-8").
timeout(connTimeoutMs = 3000, readTimeoutMs = 30000).asString
if (response.code != 200) {
if (!retryIfFail || retryTime > RETRY_INTERVAL_SECONDS.length - 1) {
val errorInfoString = s"cannot get latest offset of" +
s" $ehNameAndPartition, status code: ${response.code}, ${response.headers}" +
- s" returned error:" +
- s" ${response.body}"
+ s" returned error: ${response.body}"
logError(errorInfoString)
throw new Exception(errorInfoString)
} else {
- Thread.sleep(1000 * RETRY_INTERVAL_SECONDS(retryTime))
+ val retryInterval = 1000 * RETRY_INTERVAL_SECONDS(retryTime)
+ logError(s"cannot get connect with Event Hubs Rest Endpoint for partition" +
+ s" $ehNameAndPartition, retry after $retryInterval seconds")
+ Thread.sleep(retryInterval)
retryTime += 1
}
} else {
successfullyFetched = true
}
+ } catch {
+ case e: SocketTimeoutException =>
+ e.printStackTrace()
+ logError("Event Hubs return ReadTimeout with 30s as threshold, retrying...")
+ case e: Exception =>
+ e.printStackTrace()
+ throw e
}
- val endpointOffset = fromResponseBodyToResult(response.body)
- logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset")
- (ehNameAndPartition, endpointOffset)
+ }
+ val endpointOffset = fromResponseBodyToResult(response.body)
+ logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset")
+ (ehNameAndPartition, endpointOffset)
+ }
+ }
+
+ private def queryPartitionRuntimeInfo[T](
+ targetEventHubsNameAndPartitions: List[EventHubNameAndPartition],
+ fromResponseBodyToResult: String => T, retryIfFail: Boolean):
+ Option[Map[EventHubNameAndPartition, T]] = {
+ val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]]
+ if (targetEventHubsNameAndPartitions.isEmpty) {
+ for ((eventHubName, numPartitions) <- numPartitionsEventHubs;
+ partitionId <- 0 until numPartitions) {
+ futures += composeQuery(retryIfFail, fromResponseBodyToResult,
+ EventHubNameAndPartition(eventHubName, partitionId))
+ }
+ } else {
+ for (targetNameAndPartition <- targetEventHubsNameAndPartitions) {
+ futures += composeQuery(retryIfFail, fromResponseBodyToResult, targetNameAndPartition)
}
}
aggregateResults(futures.toList)
@@ -136,9 +165,12 @@ private[eventhubs] class RestfulEventHubClient(
// empty
}
- override def endPointOfPartition(retryIfFail: Boolean):
- Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
- queryPartitionRuntimeInfo(fromResponseBodyToEndpoint, retryIfFail)
+ override def endPointOfPartition(
+ retryIfFail: Boolean,
+ targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]):
+ Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
+ queryPartitionRuntimeInfo(targetEventHubsNameAndPartitions,
+ fromResponseBodyToEndpoint, retryIfFail)
}
}
diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala
index 23f3896ad..08816f21f 100644
--- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala
+++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala
@@ -50,10 +50,12 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
test("skip the batch when failed to fetch the latest offset of partitions") {
- val eventHubClientMock = mock[EventHubClient]
- Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true)).thenReturn(None)
val ehDStream = new EventHubDirectDStream(ssc, eventhubNamespace, progressRootPath.toString,
Map("eh1" -> eventhubParameters))
+ val eventHubClientMock = mock[EventHubClient]
+ Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true,
+ targetEventHubNameAndPartitions = ehDStream.eventhubNameAndPartitions.toList)).
+ thenReturn(None)
ehDStream.setEventHubClient(eventHubClientMock)
ssc.scheduler.start()
intercept[IllegalStateException] {
@@ -79,7 +81,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))
),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput)
testProgressTracker(eventhubNamespace,
OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
@@ -108,7 +110,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.window(Seconds(2), Seconds(1)).map(
- eventData => eventData.getProperties.get("output").toInt + 1),
+ eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput)
testProgressTracker(eventhubNamespace,
OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
@@ -159,7 +161,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) =>
inputDStream1.flatMap(eventData => eventData.getProperties.asScala).
join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)).
- map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)},
+ map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])},
expectedOutput)
testProgressTracker("namespace1",
OffsetRecord(Time(2000L), Map(EventHubNameAndPartition("eh11", 0) -> (5L, 5L),
@@ -188,7 +190,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 2) -> (-1L, -1L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput,
rddOperation = Some((rdd: RDD[Int], t: Time) => {
Array(rdd.take(1).toSeq)
@@ -219,7 +221,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 1) -> (3L, 3L),
EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput,
messagesBeforeEmpty = 4,
numBatchesBeforeNewData = 5)
diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala
index 1bf393a99..9a312dcf8 100644
--- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala
+++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala
@@ -278,9 +278,9 @@ private[eventhubs] trait EventHubTestSuiteBase extends TestSuiteBase {
Whitebox.setInternalState(msg, "systemProperties", systemProperties.asInstanceOf[Any])
property match {
case p @ Tuple2(_, _) =>
- msg.getProperties.put(p._1.toString, p._2.toString)
+ msg.getProperties.put(p._1.toString, p._2.asInstanceOf[AnyRef])
case _ =>
- msg.getProperties.put("output", property.toString)
+ msg.getProperties.put("output", property.asInstanceOf[AnyRef])
}
eventDataArray(offsetSetInQueue) = msg
offsetSetInQueue += 1
diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala
index 57964ce85..8e62a7302 100644
--- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala
@@ -69,7 +69,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)
val eventHubDirectDStream = ssc.graph.getInputStreams().filter(
_.isInstanceOf[EventHubDirectDStream]).head.asInstanceOf[EventHubDirectDStream]
@@ -110,7 +110,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
@@ -193,7 +193,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.window(Seconds(2), Seconds(1)).map(
- eventData => eventData.getProperties.get("output").toInt + 1),
+ eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
@@ -246,7 +246,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) =>
inputDStream1.flatMap(eventData => eventData.getProperties.asScala).
join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)).
- map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)},
+ map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])},
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
@@ -276,7 +276,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)
testProgressTracker(
@@ -307,7 +307,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (7L, 7L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputAfterRestart)
testProgressTracker(
@@ -344,7 +344,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)
testProgressTracker(
@@ -413,7 +413,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)
testProgressTracker(
@@ -489,7 +489,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
- inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
+ inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput = expectedOutputBeforeRestart)
val currentCheckpointDirectory = ssc.checkpointDir
diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala
index 69a3ea02e..da73a0f8a 100644
--- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala
+++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala
@@ -63,8 +63,10 @@ private[eventhubs] class TestEventHubsReceiver(
private[eventhubs] class TestRestEventHubClient(
latestRecords: Map[EventHubNameAndPartition, (Long, Long)]) extends EventHubClient {
- override def endPointOfPartition(retryIfFail: Boolean):
- Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
+ override def endPointOfPartition(
+ retryIfFail: Boolean,
+ targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()):
+ Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
Some(latestRecords)
}
@@ -73,8 +75,10 @@ private[eventhubs] class TestRestEventHubClient(
private[eventhubs] class FragileEventHubClient private extends EventHubClient {
- override def endPointOfPartition(retryIfFail: Boolean):
- Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
+ override def endPointOfPartition(
+ retryIfFail: Boolean,
+ targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()):
+ Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
import FragileEventHubClient._
callIndex += 1
@@ -113,8 +117,10 @@ private[eventhubs] class FluctuatedEventHubClient(
private var callIndex = -1
- override def endPointOfPartition(retryIfFail: Boolean):
- Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
+ override def endPointOfPartition(
+ retryIfFail: Boolean,
+ targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()):
+ Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
callIndex += 1
if (callIndex < numBatchesBeforeNewData) {
Some(latestRecords.map{
diff --git a/docs/change_log.md b/docs/change_log.md
new file mode 100644
index 000000000..4a38062ed
--- /dev/null
+++ b/docs/change_log.md
@@ -0,0 +1,24 @@
+## Change Log
+
+### 2.0.4
+
+* Enable the user to use WASB to store progress files [52](https://github.com/hdinsight/spark-eventhubs/pull/52)
+* Optimize the implementation RestfulClient to minimize the sending request number [52](https://github.com/hdinsight/spark-eventhubs/pull/52)
+* Release with scalaj jars [52](https://github.com/hdinsight/spark-eventhubs/pull/52)
+* Upgrade the Azure EventHubs Client to 0.13 [52](https://github.com/hdinsight/spark-eventhubs/pull/52)
+* Disable the user to use WASB as checkpoint when using receiver based stream [35](https://github.com/hdinsight/spark-eventhubs/pull/35)
+* Force SparkContext to shutdown when there is any exception thrown from listener (Workaround the issue that Spark swallows the exceptions thrown from listeners) [41](https://github.com/hdinsight/spark-eventhubs/pull/41)
+* Fix the ArrayOutOfRange bug when failed to get highest offsets [48](https://github.com/hdinsight/spark-eventhubs/pull/48https://github.com/hdinsight/spark-eventhubs/pull/48)
+
+#### Breaking Changes
+
+* Due to the breaking changes in EventHubsClient, EventData.properties is typed as Map instead of the original Map
+
+### 2.0.3
+
+* Fix the flaky test in receiver based stream [21](https://github.com/hdinsight/spark-eventhubs/pull/21)
+* Release Direct DStream [25](https://github.com/hdinsight/spark-eventhubs/pull/25)
+
+### 2.0.2 and previous version
+
+* Receiver based connection
\ No newline at end of file
diff --git a/examples/pom.xml b/examples/pom.xml
index 224137491..4ac20a9ff 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -23,7 +23,7 @@
com.microsoft.azure
spark-streaming-eventhubs_connector_2.11
- 2.0.4-SNAPSHOT
+ 2.0.4
../pom.xml
spark-streaming-eventhubs_examples_2.11
diff --git a/pom.xml b/pom.xml
index 691e45313..8b75a5d92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.microsoft.azure
spark-streaming-eventhubs_connector_2.11
- 2.0.4-SNAPSHOT
+ 2.0.4
The Apache License, Version 2.0
@@ -126,6 +126,12 @@
true
+
+
+ ossrh
+ https://oss.sonatype.org/content/repositories/snapshots
+
+
@@ -185,7 +191,7 @@
com.microsoft.azure
azure-eventhubs
- 0.9.0
+ 0.13.0
org.scalaj
@@ -284,12 +290,6 @@
1.0.6
-
-
- ossrh
- https://oss.sonatype.org/content/repositories/snapshots
-
-
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
@@ -395,6 +395,7 @@
com.microsoft.azure
+ org.scalaj
org.apache.qpid
org.bouncycastle