Skip to content

Commit

Permalink
release of 2.0.4 (#52)
Browse files Browse the repository at this point in the history
* ignore scalastyle output

* test eventhubs 0.12

* release of 2.0.4

* fix compilation error

* fix NPE

* further fix NPE

* fix classcastexception

* fix failed test cases

* include scalaj to jar file

* do not limit to use WASB

* upgrade to 0.13

* release note of 2.0.4

* longer waiting interval

* restendpoint (#18)
  • Loading branch information
CodingCat committed Mar 28, 2017
1 parent f687de7 commit b79cf97
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 74 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# spark-eventhubs [![Build Status](https://travis-ci.org/hdinsight/spark-eventhubs.svg?branch=master)](https://travis-ci.org/hdinsight/spark-eventhubs)
This is the source code of EventHubsReceiver for Spark Streaming.

[Here](https://github.com/hdinsight/spark-streaming-data-persistence-examples) is an example project that uses EventHubsReceiver to count and persist messages from Azure Eventhubs.
[Here](https://github.com/hdinsight/spark-eventhubs/tree/master/examples) is the examples that use this library to process streaming data from Azure Eventhubs.

For latest integration of EventHubs and Spark Streaming, the document can be found [here](docs/direct_stream.md).

## Latest Release: 2.0.4

[Change Log](doc/change_log.md)

## Usage

### Getting Officially Released Version
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>spark-streaming-eventhubs_connector_2.11</artifactId>
<version>2.0.4-SNAPSHOT</version>
<version>2.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>spark-streaming-eventhubs_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ private[eventhubs] trait EventHubClient extends Serializable {
* return the end point of each partition
* @return a map from eventhubName-partition to (offset, seq)
*/
def endPointOfPartition(retryIfFail: Boolean): Option[Map[EventHubNameAndPartition, (Long, Long)]]
def endPointOfPartition(
retryIfFail: Boolean,
targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()):
Option[Map[EventHubNameAndPartition, (Long, Long)]]

/**
* close this client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.eventhubs
import java.io.{IOException, ObjectInputStream}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import com.microsoft.azure.eventhubs.{EventData, PartitionReceiver}

Expand Down Expand Up @@ -122,13 +123,39 @@ private[eventhubs] class EventHubDirectDStream private[eventhubs] (
eventHubClient.close()
}

private def collectPartitionsNeedingLargerProcessingRange(): List[EventHubNameAndPartition] = {
val partitionList = new ListBuffer[EventHubNameAndPartition]
if (fetchedHighestOffsetsAndSeqNums != null) {
for ((ehNameAndPartition, (offset, seqId)) <- fetchedHighestOffsetsAndSeqNums.offsets) {
if (currentOffsetsAndSeqNums.offsets(ehNameAndPartition)._2 >=
fetchedHighestOffsetsAndSeqNums.offsets(ehNameAndPartition)._2) {
partitionList += ehNameAndPartition
}
}
} else {
partitionList ++= eventhubNameAndPartitions
}
partitionList.toList
}

private def fetchLatestOffset(validTime: Time, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
val r = eventHubClient.endPointOfPartition(retryIfFail)
// check if there is any eventhubs partition which potentially has newly arrived message (
// the fetched highest message id is within the next batch's processing engine)
val demandingEhNameAndPartitions = collectPartitionsNeedingLargerProcessingRange()
val r = eventHubClient.endPointOfPartition(retryIfFail, demandingEhNameAndPartitions)
if (r.isDefined) {
fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, r.get)
// merge results
val mergedOffsets = if (fetchedHighestOffsetsAndSeqNums != null) {
fetchedHighestOffsetsAndSeqNums.offsets ++ r.get
} else {
r.get
}
fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, mergedOffsets)
Some(fetchedHighestOffsetsAndSeqNums.offsets)
} else {
r
}
r
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ class EventHubsClientWrapper extends Serializable with EventHubClient {
private var MAXIMUM_EVENT_RATE: Int = 0
private val DEFAULT_RECEIVER_EPOCH = -1L

override def endPointOfPartition(retryIfFail: Boolean):
Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
override def endPointOfPartition(
retryIfFail: Boolean,
targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]):
Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
throw new UnsupportedOperationException("endPointOfPartition is not supported by this client" +
" yet, please use RestfulEventHubClient")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ object EventHubsUtils {
eventHubNamespace: String,
progressDir: String,
eventParams: Predef.Map[String, Predef.Map[String, String]]): EventHubDirectDStream = {
if (!ssc.sparkContext.isLocal) {
require(progressDir.startsWith("hdfs://") || progressDir.startsWith("adl://"),
"we only support HDFS/ADLS based progress file storage")
}
val newStream = new EventHubDirectDStream(ssc, eventHubNamespace, progressDir, eventParams)
newStream
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.streaming.eventhubs

import java.net.SocketTimeoutException
import java.time.Duration
import java.util.concurrent.Executors

Expand Down Expand Up @@ -47,7 +48,7 @@ private[eventhubs] class RestfulEventHubClient(
policyKeys: Map[String, Tuple2[String, String]],
threadNum: Int) extends EventHubClient with Logging {

private val RETRY_INTERVAL_SECONDS = Array(2, 4, 8, 16)
private val RETRY_INTERVAL_SECONDS = Array(8, 16, 32, 64, 128)

// will be used to execute requests to EventHub
import Implicits.exec
Expand Down Expand Up @@ -87,46 +88,74 @@ private[eventhubs] class RestfulEventHubClient(
}
}

private def queryPartitionRuntimeInfo[T](
fromResponseBodyToResult: String => T, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, T]] = {
val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]]
for ((eventHubName, numPartitions) <- numPartitionsEventHubs;
partitionId <- 0 until numPartitions) {
futures += Future {
var retryTime = 0
var successfullyFetched = false
var response: HttpResponse[String] = null
val ehNameAndPartition = EventHubNameAndPartition(eventHubName, partitionId)
while (!successfullyFetched) {
logDebug(s"start fetching latest offset of $ehNameAndPartition")
val urlString = fromParametersToURLString(eventHubName, partitionId)
response = Http(urlString).
header("Authorization",
createSasToken(eventHubName,
policyName = policyKeys(eventHubName)._1,
policyKey = policyKeys(eventHubName)._2)).
private def composeQuery[T](
retryIfFail: Boolean,
fromResponseBodyToResult: String => T,
nameAndPartition: EventHubNameAndPartition):
Future[(EventHubNameAndPartition, T)] = {
Future {
var retryTime = 0
var successfullyFetched = false
var response: HttpResponse[String] = null
val ehNameAndPartition = nameAndPartition
val eventHubName = nameAndPartition.eventHubName
val partitionId = nameAndPartition.partitionId
while (!successfullyFetched) {
logDebug(s"start fetching latest offset of $ehNameAndPartition")
val urlString = fromParametersToURLString(eventHubName, partitionId)
try {
response = Http(urlString).header("Authorization",
createSasToken(eventHubName,
policyName = policyKeys(eventHubName)._1,
policyKey = policyKeys(eventHubName)._2)).
header("Content-Type", "application/atom+xml;type=entry;charset=utf-8").
timeout(connTimeoutMs = 3000, readTimeoutMs = 30000).asString
if (response.code != 200) {
if (!retryIfFail || retryTime > RETRY_INTERVAL_SECONDS.length - 1) {
val errorInfoString = s"cannot get latest offset of" +
s" $ehNameAndPartition, status code: ${response.code}, ${response.headers}" +
s" returned error:" +
s" ${response.body}"
s" returned error: ${response.body}"
logError(errorInfoString)
throw new Exception(errorInfoString)
} else {
Thread.sleep(1000 * RETRY_INTERVAL_SECONDS(retryTime))
val retryInterval = 1000 * RETRY_INTERVAL_SECONDS(retryTime)
logError(s"cannot get connect with Event Hubs Rest Endpoint for partition" +
s" $ehNameAndPartition, retry after $retryInterval seconds")
Thread.sleep(retryInterval)
retryTime += 1
}
} else {
successfullyFetched = true
}
} catch {
case e: SocketTimeoutException =>
e.printStackTrace()
logError("Event Hubs return ReadTimeout with 30s as threshold, retrying...")
case e: Exception =>
e.printStackTrace()
throw e
}
val endpointOffset = fromResponseBodyToResult(response.body)
logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset")
(ehNameAndPartition, endpointOffset)
}
val endpointOffset = fromResponseBodyToResult(response.body)
logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset")
(ehNameAndPartition, endpointOffset)
}
}

private def queryPartitionRuntimeInfo[T](
targetEventHubsNameAndPartitions: List[EventHubNameAndPartition],
fromResponseBodyToResult: String => T, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, T]] = {
val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]]
if (targetEventHubsNameAndPartitions.isEmpty) {
for ((eventHubName, numPartitions) <- numPartitionsEventHubs;
partitionId <- 0 until numPartitions) {
futures += composeQuery(retryIfFail, fromResponseBodyToResult,
EventHubNameAndPartition(eventHubName, partitionId))
}
} else {
for (targetNameAndPartition <- targetEventHubsNameAndPartitions) {
futures += composeQuery(retryIfFail, fromResponseBodyToResult, targetNameAndPartition)
}
}
aggregateResults(futures.toList)
Expand All @@ -136,9 +165,12 @@ private[eventhubs] class RestfulEventHubClient(
// empty
}

override def endPointOfPartition(retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
queryPartitionRuntimeInfo(fromResponseBodyToEndpoint, retryIfFail)
override def endPointOfPartition(
retryIfFail: Boolean,
targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
queryPartitionRuntimeInfo(targetEventHubsNameAndPartitions,
fromResponseBodyToEndpoint, retryIfFail)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar


test("skip the batch when failed to fetch the latest offset of partitions") {
val eventHubClientMock = mock[EventHubClient]
Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true)).thenReturn(None)
val ehDStream = new EventHubDirectDStream(ssc, eventhubNamespace, progressRootPath.toString,
Map("eh1" -> eventhubParameters))
val eventHubClientMock = mock[EventHubClient]
Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true,
targetEventHubNameAndPartitions = ehDStream.eventhubNameAndPartitions.toList)).
thenReturn(None)
ehDStream.setEventHubClient(eventHubClientMock)
ssc.scheduler.start()
intercept[IllegalStateException] {
Expand All @@ -79,7 +81,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))
),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput)
testProgressTracker(eventhubNamespace,
OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
Expand Down Expand Up @@ -108,7 +110,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.window(Seconds(2), Seconds(1)).map(
eventData => eventData.getProperties.get("output").toInt + 1),
eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput)
testProgressTracker(eventhubNamespace,
OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
Expand Down Expand Up @@ -159,7 +161,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) =>
inputDStream1.flatMap(eventData => eventData.getProperties.asScala).
join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)).
map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)},
map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])},
expectedOutput)
testProgressTracker("namespace1",
OffsetRecord(Time(2000L), Map(EventHubNameAndPartition("eh11", 0) -> (5L, 5L),
Expand Down Expand Up @@ -188,7 +190,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 2) -> (-1L, -1L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput,
rddOperation = Some((rdd: RDD[Int], t: Time) => {
Array(rdd.take(1).toSeq)
Expand Down Expand Up @@ -219,7 +221,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 1) -> (3L, 3L),
EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput,
messagesBeforeEmpty = 4,
numBatchesBeforeNewData = 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ private[eventhubs] trait EventHubTestSuiteBase extends TestSuiteBase {
Whitebox.setInternalState(msg, "systemProperties", systemProperties.asInstanceOf[Any])
property match {
case p @ Tuple2(_, _) =>
msg.getProperties.put(p._1.toString, p._2.toString)
msg.getProperties.put(p._1.toString, p._2.asInstanceOf[AnyRef])
case _ =>
msg.getProperties.put("output", property.toString)
msg.getProperties.put("output", property.asInstanceOf[AnyRef])
}
eventDataArray(offsetSetInQueue) = msg
offsetSetInQueue += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)
val eventHubDirectDStream = ssc.graph.getInputStreams().filter(
_.isInstanceOf[EventHubDirectDStream]).head.asInstanceOf[EventHubDirectDStream]
Expand Down Expand Up @@ -110,7 +110,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
Expand Down Expand Up @@ -193,7 +193,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.window(Seconds(2), Seconds(1)).map(
eventData => eventData.getProperties.get("output").toInt + 1),
eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
Expand Down Expand Up @@ -246,7 +246,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) =>
inputDStream1.flatMap(eventData => eventData.getProperties.asScala).
join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)).
map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)},
map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])},
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
Expand Down Expand Up @@ -276,7 +276,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)

testProgressTracker(
Expand Down Expand Up @@ -307,7 +307,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (7L, 7L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputAfterRestart)

testProgressTracker(
Expand Down Expand Up @@ -344,7 +344,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)

testProgressTracker(
Expand Down Expand Up @@ -413,7 +413,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)

testProgressTracker(
Expand Down Expand Up @@ -489,7 +489,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput = expectedOutputBeforeRestart)

val currentCheckpointDirectory = ssc.checkpointDir
Expand Down
Loading

0 comments on commit b79cf97

Please sign in to comment.