diff --git a/README.md b/README.md index 4b7c7760a..dcdf1f3aa 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,14 @@ # spark-eventhubs [![Build Status](https://travis-ci.org/hdinsight/spark-eventhubs.svg?branch=master)](https://travis-ci.org/hdinsight/spark-eventhubs) This is the source code of EventHubsReceiver for Spark Streaming. -[Here](https://github.com/hdinsight/spark-streaming-data-persistence-examples) is an example project that uses EventHubsReceiver to count and persist messages from Azure Eventhubs. +[Here](https://github.com/hdinsight/spark-eventhubs/tree/master/examples) is the examples that use this library to process streaming data from Azure Eventhubs. For latest integration of EventHubs and Spark Streaming, the document can be found [here](docs/direct_stream.md). +## Latest Release: 2.0.4 + +[Change Log](doc/change_log.md) + ## Usage ### Getting Officially Released Version diff --git a/core/pom.xml b/core/pom.xml index cfb380a79..eeb8fc858 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -23,7 +23,7 @@ com.microsoft.azure spark-streaming-eventhubs_connector_2.11 - 2.0.4-SNAPSHOT + 2.0.4 ../pom.xml spark-streaming-eventhubs_2.11 diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala index 64089d35a..9a5c821a5 100644 --- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala +++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubClient.scala @@ -23,7 +23,10 @@ private[eventhubs] trait EventHubClient extends Serializable { * return the end point of each partition * @return a map from eventhubName-partition to (offset, seq) */ - def endPointOfPartition(retryIfFail: Boolean): Option[Map[EventHubNameAndPartition, (Long, Long)]] + def endPointOfPartition( + retryIfFail: Boolean, + targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()): + Option[Map[EventHubNameAndPartition, (Long, Long)]] /** * close this client diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala index fcef12cba..532de5971 100644 --- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala +++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStream.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.eventhubs import java.io.{IOException, ObjectInputStream} import scala.collection.mutable +import scala.collection.mutable.ListBuffer import com.microsoft.azure.eventhubs.{EventData, PartitionReceiver} @@ -122,13 +123,39 @@ private[eventhubs] class EventHubDirectDStream private[eventhubs] ( eventHubClient.close() } + private def collectPartitionsNeedingLargerProcessingRange(): List[EventHubNameAndPartition] = { + val partitionList = new ListBuffer[EventHubNameAndPartition] + if (fetchedHighestOffsetsAndSeqNums != null) { + for ((ehNameAndPartition, (offset, seqId)) <- fetchedHighestOffsetsAndSeqNums.offsets) { + if (currentOffsetsAndSeqNums.offsets(ehNameAndPartition)._2 >= + fetchedHighestOffsetsAndSeqNums.offsets(ehNameAndPartition)._2) { + partitionList += ehNameAndPartition + } + } + } else { + partitionList ++= eventhubNameAndPartitions + } + partitionList.toList + } + private def fetchLatestOffset(validTime: Time, retryIfFail: Boolean): Option[Map[EventHubNameAndPartition, (Long, Long)]] = { - val r = eventHubClient.endPointOfPartition(retryIfFail) + // check if there is any eventhubs partition which potentially has newly arrived message ( + // the fetched highest message id is within the next batch's processing engine) + val demandingEhNameAndPartitions = collectPartitionsNeedingLargerProcessingRange() + val r = eventHubClient.endPointOfPartition(retryIfFail, demandingEhNameAndPartitions) if (r.isDefined) { - fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, r.get) + // merge results + val mergedOffsets = if (fetchedHighestOffsetsAndSeqNums != null) { + fetchedHighestOffsetsAndSeqNums.offsets ++ r.get + } else { + r.get + } + fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, mergedOffsets) + Some(fetchedHighestOffsetsAndSeqNums.offsets) + } else { + r } - r } /** diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala index 507d9db63..dd5385459 100644 --- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala +++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsClientWrapper.scala @@ -176,8 +176,10 @@ class EventHubsClientWrapper extends Serializable with EventHubClient { private var MAXIMUM_EVENT_RATE: Int = 0 private val DEFAULT_RECEIVER_EPOCH = -1L - override def endPointOfPartition(retryIfFail: Boolean): - Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { + override def endPointOfPartition( + retryIfFail: Boolean, + targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]): + Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { throw new UnsupportedOperationException("endPointOfPartition is not supported by this client" + " yet, please use RestfulEventHubClient") } diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala index c67e9aa77..150b76d6e 100644 --- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala +++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsUtils.scala @@ -114,10 +114,6 @@ object EventHubsUtils { eventHubNamespace: String, progressDir: String, eventParams: Predef.Map[String, Predef.Map[String, String]]): EventHubDirectDStream = { - if (!ssc.sparkContext.isLocal) { - require(progressDir.startsWith("hdfs://") || progressDir.startsWith("adl://"), - "we only support HDFS/ADLS based progress file storage") - } val newStream = new EventHubDirectDStream(ssc, eventHubNamespace, progressDir, eventParams) newStream } diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala index 2abe702ee..374d295ae 100644 --- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala +++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/RestfulEventHubClient.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.eventhubs +import java.net.SocketTimeoutException import java.time.Duration import java.util.concurrent.Executors @@ -47,7 +48,7 @@ private[eventhubs] class RestfulEventHubClient( policyKeys: Map[String, Tuple2[String, String]], threadNum: Int) extends EventHubClient with Logging { - private val RETRY_INTERVAL_SECONDS = Array(2, 4, 8, 16) + private val RETRY_INTERVAL_SECONDS = Array(8, 16, 32, 64, 128) // will be used to execute requests to EventHub import Implicits.exec @@ -87,46 +88,74 @@ private[eventhubs] class RestfulEventHubClient( } } - private def queryPartitionRuntimeInfo[T]( - fromResponseBodyToResult: String => T, retryIfFail: Boolean): - Option[Map[EventHubNameAndPartition, T]] = { - val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]] - for ((eventHubName, numPartitions) <- numPartitionsEventHubs; - partitionId <- 0 until numPartitions) { - futures += Future { - var retryTime = 0 - var successfullyFetched = false - var response: HttpResponse[String] = null - val ehNameAndPartition = EventHubNameAndPartition(eventHubName, partitionId) - while (!successfullyFetched) { - logDebug(s"start fetching latest offset of $ehNameAndPartition") - val urlString = fromParametersToURLString(eventHubName, partitionId) - response = Http(urlString). - header("Authorization", - createSasToken(eventHubName, - policyName = policyKeys(eventHubName)._1, - policyKey = policyKeys(eventHubName)._2)). + private def composeQuery[T]( + retryIfFail: Boolean, + fromResponseBodyToResult: String => T, + nameAndPartition: EventHubNameAndPartition): + Future[(EventHubNameAndPartition, T)] = { + Future { + var retryTime = 0 + var successfullyFetched = false + var response: HttpResponse[String] = null + val ehNameAndPartition = nameAndPartition + val eventHubName = nameAndPartition.eventHubName + val partitionId = nameAndPartition.partitionId + while (!successfullyFetched) { + logDebug(s"start fetching latest offset of $ehNameAndPartition") + val urlString = fromParametersToURLString(eventHubName, partitionId) + try { + response = Http(urlString).header("Authorization", + createSasToken(eventHubName, + policyName = policyKeys(eventHubName)._1, + policyKey = policyKeys(eventHubName)._2)). header("Content-Type", "application/atom+xml;type=entry;charset=utf-8"). timeout(connTimeoutMs = 3000, readTimeoutMs = 30000).asString if (response.code != 200) { if (!retryIfFail || retryTime > RETRY_INTERVAL_SECONDS.length - 1) { val errorInfoString = s"cannot get latest offset of" + s" $ehNameAndPartition, status code: ${response.code}, ${response.headers}" + - s" returned error:" + - s" ${response.body}" + s" returned error: ${response.body}" logError(errorInfoString) throw new Exception(errorInfoString) } else { - Thread.sleep(1000 * RETRY_INTERVAL_SECONDS(retryTime)) + val retryInterval = 1000 * RETRY_INTERVAL_SECONDS(retryTime) + logError(s"cannot get connect with Event Hubs Rest Endpoint for partition" + + s" $ehNameAndPartition, retry after $retryInterval seconds") + Thread.sleep(retryInterval) retryTime += 1 } } else { successfullyFetched = true } + } catch { + case e: SocketTimeoutException => + e.printStackTrace() + logError("Event Hubs return ReadTimeout with 30s as threshold, retrying...") + case e: Exception => + e.printStackTrace() + throw e } - val endpointOffset = fromResponseBodyToResult(response.body) - logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset") - (ehNameAndPartition, endpointOffset) + } + val endpointOffset = fromResponseBodyToResult(response.body) + logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset") + (ehNameAndPartition, endpointOffset) + } + } + + private def queryPartitionRuntimeInfo[T]( + targetEventHubsNameAndPartitions: List[EventHubNameAndPartition], + fromResponseBodyToResult: String => T, retryIfFail: Boolean): + Option[Map[EventHubNameAndPartition, T]] = { + val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]] + if (targetEventHubsNameAndPartitions.isEmpty) { + for ((eventHubName, numPartitions) <- numPartitionsEventHubs; + partitionId <- 0 until numPartitions) { + futures += composeQuery(retryIfFail, fromResponseBodyToResult, + EventHubNameAndPartition(eventHubName, partitionId)) + } + } else { + for (targetNameAndPartition <- targetEventHubsNameAndPartitions) { + futures += composeQuery(retryIfFail, fromResponseBodyToResult, targetNameAndPartition) } } aggregateResults(futures.toList) @@ -136,9 +165,12 @@ private[eventhubs] class RestfulEventHubClient( // empty } - override def endPointOfPartition(retryIfFail: Boolean): - Option[Map[EventHubNameAndPartition, (Long, Long)]] = { - queryPartitionRuntimeInfo(fromResponseBodyToEndpoint, retryIfFail) + override def endPointOfPartition( + retryIfFail: Boolean, + targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]): + Option[Map[EventHubNameAndPartition, (Long, Long)]] = { + queryPartitionRuntimeInfo(targetEventHubsNameAndPartitions, + fromResponseBodyToEndpoint, retryIfFail) } } diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala index 23f3896ad..08816f21f 100644 --- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubDirectDStreamSuite.scala @@ -50,10 +50,12 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar test("skip the batch when failed to fetch the latest offset of partitions") { - val eventHubClientMock = mock[EventHubClient] - Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true)).thenReturn(None) val ehDStream = new EventHubDirectDStream(ssc, eventhubNamespace, progressRootPath.toString, Map("eh1" -> eventhubParameters)) + val eventHubClientMock = mock[EventHubClient] + Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true, + targetEventHubNameAndPartitions = ehDStream.eventhubNameAndPartitions.toList)). + thenReturn(None) ehDStream.setEventHubClient(eventHubClientMock) ssc.scheduler.start() intercept[IllegalStateException] { @@ -79,7 +81,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar EventHubNameAndPartition("eh1", 2) -> (3L, 3L))) ), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutput) testProgressTracker(eventhubNamespace, OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L), @@ -108,7 +110,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar ), operation = (inputDStream: EventHubDirectDStream) => inputDStream.window(Seconds(2), Seconds(1)).map( - eventData => eventData.getProperties.get("output").toInt + 1), + eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutput) testProgressTracker(eventhubNamespace, OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L), @@ -159,7 +161,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) => inputDStream1.flatMap(eventData => eventData.getProperties.asScala). join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)). - map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)}, + map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])}, expectedOutput) testProgressTracker("namespace1", OffsetRecord(Time(2000L), Map(EventHubNameAndPartition("eh11", 0) -> (5L, 5L), @@ -188,7 +190,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar EventHubNameAndPartition("eh1", 2) -> (-1L, -1L)) )), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutput, rddOperation = Some((rdd: RDD[Int], t: Time) => { Array(rdd.take(1).toSeq) @@ -219,7 +221,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar EventHubNameAndPartition("eh1", 1) -> (3L, 3L), EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutput, messagesBeforeEmpty = 4, numBatchesBeforeNewData = 5) diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala index 1bf393a99..9a312dcf8 100644 --- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/EventHubTestSuiteBase.scala @@ -278,9 +278,9 @@ private[eventhubs] trait EventHubTestSuiteBase extends TestSuiteBase { Whitebox.setInternalState(msg, "systemProperties", systemProperties.asInstanceOf[Any]) property match { case p @ Tuple2(_, _) => - msg.getProperties.put(p._1.toString, p._2.toString) + msg.getProperties.put(p._1.toString, p._2.asInstanceOf[AnyRef]) case _ => - msg.getProperties.put("output", property.toString) + msg.getProperties.put("output", property.asInstanceOf[AnyRef]) } eventDataArray(offsetSetInQueue) = msg offsetSetInQueue += 1 diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala index 57964ce85..8e62a7302 100644 --- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/ProgressTrackingAndCheckpointSuite.scala @@ -69,7 +69,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 1) -> (5L, 5L), EventHubNameAndPartition("eh1", 2) -> (5L, 5L))), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputBeforeRestart) val eventHubDirectDStream = ssc.graph.getInputStreams().filter( _.isInstanceOf[EventHubDirectDStream]).head.asInstanceOf[EventHubDirectDStream] @@ -110,7 +110,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 1) -> (5L, 5L), EventHubNameAndPartition("eh1", 2) -> (5L, 5L))), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputBeforeRestart, expectedOutputAfterRestart) } @@ -193,7 +193,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 2) -> (5L, 5L))), operation = (inputDStream: EventHubDirectDStream) => inputDStream.window(Seconds(2), Seconds(1)).map( - eventData => eventData.getProperties.get("output").toInt + 1), + eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputBeforeRestart, expectedOutputAfterRestart) } @@ -246,7 +246,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) => inputDStream1.flatMap(eventData => eventData.getProperties.asScala). join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)). - map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)}, + map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])}, expectedOutputBeforeRestart, expectedOutputAfterRestart) } @@ -276,7 +276,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 2) -> (3L, 3L)) )), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputBeforeRestart) testProgressTracker( @@ -307,7 +307,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 2) -> (7L, 7L)) )), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputAfterRestart) testProgressTracker( @@ -344,7 +344,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 2) -> (3L, 3L)) )), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputBeforeRestart) testProgressTracker( @@ -413,7 +413,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 2) -> (3L, 3L)) )), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutputBeforeRestart) testProgressTracker( @@ -489,7 +489,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes EventHubNameAndPartition("eh1", 2) -> (3L, 3L)) )), operation = (inputDStream: EventHubDirectDStream) => - inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1), + inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1), expectedOutput = expectedOutputBeforeRestart) val currentCheckpointDirectory = ssc.checkpointDir diff --git a/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala b/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala index 69a3ea02e..da73a0f8a 100644 --- a/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala +++ b/core/src/test/scala/org/apache/spark/streaming/eventhubs/utils/SimulatedEventHubs.scala @@ -63,8 +63,10 @@ private[eventhubs] class TestEventHubsReceiver( private[eventhubs] class TestRestEventHubClient( latestRecords: Map[EventHubNameAndPartition, (Long, Long)]) extends EventHubClient { - override def endPointOfPartition(retryIfFail: Boolean): - Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { + override def endPointOfPartition( + retryIfFail: Boolean, + targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()): + Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { Some(latestRecords) } @@ -73,8 +75,10 @@ private[eventhubs] class TestRestEventHubClient( private[eventhubs] class FragileEventHubClient private extends EventHubClient { - override def endPointOfPartition(retryIfFail: Boolean): - Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { + override def endPointOfPartition( + retryIfFail: Boolean, + targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()): + Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { import FragileEventHubClient._ callIndex += 1 @@ -113,8 +117,10 @@ private[eventhubs] class FluctuatedEventHubClient( private var callIndex = -1 - override def endPointOfPartition(retryIfFail: Boolean): - Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { + override def endPointOfPartition( + retryIfFail: Boolean, + targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()): + Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = { callIndex += 1 if (callIndex < numBatchesBeforeNewData) { Some(latestRecords.map{ diff --git a/docs/change_log.md b/docs/change_log.md new file mode 100644 index 000000000..4a38062ed --- /dev/null +++ b/docs/change_log.md @@ -0,0 +1,24 @@ +## Change Log + +### 2.0.4 + +* Enable the user to use WASB to store progress files [52](https://github.com/hdinsight/spark-eventhubs/pull/52) +* Optimize the implementation RestfulClient to minimize the sending request number [52](https://github.com/hdinsight/spark-eventhubs/pull/52) +* Release with scalaj jars [52](https://github.com/hdinsight/spark-eventhubs/pull/52) +* Upgrade the Azure EventHubs Client to 0.13 [52](https://github.com/hdinsight/spark-eventhubs/pull/52) +* Disable the user to use WASB as checkpoint when using receiver based stream [35](https://github.com/hdinsight/spark-eventhubs/pull/35) +* Force SparkContext to shutdown when there is any exception thrown from listener (Workaround the issue that Spark swallows the exceptions thrown from listeners) [41](https://github.com/hdinsight/spark-eventhubs/pull/41) +* Fix the ArrayOutOfRange bug when failed to get highest offsets [48](https://github.com/hdinsight/spark-eventhubs/pull/48https://github.com/hdinsight/spark-eventhubs/pull/48) + +#### Breaking Changes + +* Due to the breaking changes in EventHubsClient, EventData.properties is typed as Map instead of the original Map + +### 2.0.3 + +* Fix the flaky test in receiver based stream [21](https://github.com/hdinsight/spark-eventhubs/pull/21) +* Release Direct DStream [25](https://github.com/hdinsight/spark-eventhubs/pull/25) + +### 2.0.2 and previous version + +* Receiver based connection \ No newline at end of file diff --git a/examples/pom.xml b/examples/pom.xml index 224137491..4ac20a9ff 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -23,7 +23,7 @@ com.microsoft.azure spark-streaming-eventhubs_connector_2.11 - 2.0.4-SNAPSHOT + 2.0.4 ../pom.xml spark-streaming-eventhubs_examples_2.11 diff --git a/pom.xml b/pom.xml index 691e45313..8b75a5d92 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.microsoft.azure spark-streaming-eventhubs_connector_2.11 - 2.0.4-SNAPSHOT + 2.0.4 The Apache License, Version 2.0 @@ -126,6 +126,12 @@ true + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + @@ -185,7 +191,7 @@ com.microsoft.azure azure-eventhubs - 0.9.0 + 0.13.0 org.scalaj @@ -284,12 +290,6 @@ 1.0.6 - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes @@ -395,6 +395,7 @@ com.microsoft.azure + org.scalaj org.apache.qpid org.bouncycastle