Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release of 2.0.4 #52

Merged
merged 17 commits into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# spark-eventhubs [![Build Status](https://travis-ci.org/hdinsight/spark-eventhubs.svg?branch=master)](https://travis-ci.org/hdinsight/spark-eventhubs)
This is the source code of EventHubsReceiver for Spark Streaming.

[Here](https://github.com/hdinsight/spark-streaming-data-persistence-examples) is an example project that uses EventHubsReceiver to count and persist messages from Azure Eventhubs.
[Here](https://github.com/hdinsight/spark-eventhubs/tree/master/examples) is the examples that use this library to process streaming data from Azure Eventhubs.

For latest integration of EventHubs and Spark Streaming, the document can be found [here](docs/direct_stream.md).

## Latest Release: 2.0.4

[Change Log](doc/change_log.md)

## Usage

### Getting Officially Released Version
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>spark-streaming-eventhubs_connector_2.11</artifactId>
<version>2.0.4-SNAPSHOT</version>
<version>2.0.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>spark-streaming-eventhubs_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ private[eventhubs] trait EventHubClient extends Serializable {
* return the end point of each partition
* @return a map from eventhubName-partition to (offset, seq)
*/
def endPointOfPartition(retryIfFail: Boolean): Option[Map[EventHubNameAndPartition, (Long, Long)]]
def endPointOfPartition(
retryIfFail: Boolean,
targetEventHubNameAndPartitions: List[EventHubNameAndPartition] = List()):
Option[Map[EventHubNameAndPartition, (Long, Long)]]

/**
* close this client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.eventhubs
import java.io.{IOException, ObjectInputStream}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import com.microsoft.azure.eventhubs.{EventData, PartitionReceiver}

Expand Down Expand Up @@ -122,13 +123,39 @@ private[eventhubs] class EventHubDirectDStream private[eventhubs] (
eventHubClient.close()
}

private def collectPartitionsNeedingLargerProcessingRange(): List[EventHubNameAndPartition] = {
val partitionList = new ListBuffer[EventHubNameAndPartition]
if (fetchedHighestOffsetsAndSeqNums != null) {
for ((ehNameAndPartition, (offset, seqId)) <- fetchedHighestOffsetsAndSeqNums.offsets) {
if (currentOffsetsAndSeqNums.offsets(ehNameAndPartition)._2 >=
fetchedHighestOffsetsAndSeqNums.offsets(ehNameAndPartition)._2) {
partitionList += ehNameAndPartition
}
}
} else {
partitionList ++= eventhubNameAndPartitions
}
partitionList.toList
}

private def fetchLatestOffset(validTime: Time, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
val r = eventHubClient.endPointOfPartition(retryIfFail)
// check if there is any eventhubs partition which potentially has newly arrived message (
// the fetched highest message id is within the next batch's processing engine)
val demandingEhNameAndPartitions = collectPartitionsNeedingLargerProcessingRange()
val r = eventHubClient.endPointOfPartition(retryIfFail, demandingEhNameAndPartitions)
if (r.isDefined) {
fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, r.get)
// merge results
val mergedOffsets = if (fetchedHighestOffsetsAndSeqNums != null) {
fetchedHighestOffsetsAndSeqNums.offsets ++ r.get
} else {
r.get
}
fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime, mergedOffsets)
Some(fetchedHighestOffsetsAndSeqNums.offsets)
} else {
r
}
r
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ class EventHubsClientWrapper extends Serializable with EventHubClient {
private var MAXIMUM_EVENT_RATE: Int = 0
private val DEFAULT_RECEIVER_EPOCH = -1L

override def endPointOfPartition(retryIfFail: Boolean):
Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
override def endPointOfPartition(
retryIfFail: Boolean,
targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]):
Option[Predef.Map[EventHubNameAndPartition, (Long, Long)]] = {
throw new UnsupportedOperationException("endPointOfPartition is not supported by this client" +
" yet, please use RestfulEventHubClient")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ object EventHubsUtils {
eventHubNamespace: String,
progressDir: String,
eventParams: Predef.Map[String, Predef.Map[String, String]]): EventHubDirectDStream = {
if (!ssc.sparkContext.isLocal) {
require(progressDir.startsWith("hdfs://") || progressDir.startsWith("adl://"),
"we only support HDFS/ADLS based progress file storage")
}
val newStream = new EventHubDirectDStream(ssc, eventHubNamespace, progressDir, eventParams)
newStream
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.streaming.eventhubs

import java.net.SocketTimeoutException
import java.time.Duration
import java.util.concurrent.Executors

Expand Down Expand Up @@ -47,7 +48,7 @@ private[eventhubs] class RestfulEventHubClient(
policyKeys: Map[String, Tuple2[String, String]],
threadNum: Int) extends EventHubClient with Logging {

private val RETRY_INTERVAL_SECONDS = Array(2, 4, 8, 16)
private val RETRY_INTERVAL_SECONDS = Array(8, 16, 32, 64, 128)

// will be used to execute requests to EventHub
import Implicits.exec
Expand Down Expand Up @@ -87,46 +88,74 @@ private[eventhubs] class RestfulEventHubClient(
}
}

private def queryPartitionRuntimeInfo[T](
fromResponseBodyToResult: String => T, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, T]] = {
val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]]
for ((eventHubName, numPartitions) <- numPartitionsEventHubs;
partitionId <- 0 until numPartitions) {
futures += Future {
var retryTime = 0
var successfullyFetched = false
var response: HttpResponse[String] = null
val ehNameAndPartition = EventHubNameAndPartition(eventHubName, partitionId)
while (!successfullyFetched) {
logDebug(s"start fetching latest offset of $ehNameAndPartition")
val urlString = fromParametersToURLString(eventHubName, partitionId)
response = Http(urlString).
header("Authorization",
createSasToken(eventHubName,
policyName = policyKeys(eventHubName)._1,
policyKey = policyKeys(eventHubName)._2)).
private def composeQuery[T](
retryIfFail: Boolean,
fromResponseBodyToResult: String => T,
nameAndPartition: EventHubNameAndPartition):
Future[(EventHubNameAndPartition, T)] = {
Future {
var retryTime = 0
var successfullyFetched = false
var response: HttpResponse[String] = null
val ehNameAndPartition = nameAndPartition
val eventHubName = nameAndPartition.eventHubName
val partitionId = nameAndPartition.partitionId
while (!successfullyFetched) {
logDebug(s"start fetching latest offset of $ehNameAndPartition")
val urlString = fromParametersToURLString(eventHubName, partitionId)
try {
response = Http(urlString).header("Authorization",
createSasToken(eventHubName,
policyName = policyKeys(eventHubName)._1,
policyKey = policyKeys(eventHubName)._2)).
header("Content-Type", "application/atom+xml;type=entry;charset=utf-8").
timeout(connTimeoutMs = 3000, readTimeoutMs = 30000).asString
if (response.code != 200) {
if (!retryIfFail || retryTime > RETRY_INTERVAL_SECONDS.length - 1) {
val errorInfoString = s"cannot get latest offset of" +
s" $ehNameAndPartition, status code: ${response.code}, ${response.headers}" +
s" returned error:" +
s" ${response.body}"
s" returned error: ${response.body}"
logError(errorInfoString)
throw new Exception(errorInfoString)
} else {
Thread.sleep(1000 * RETRY_INTERVAL_SECONDS(retryTime))
val retryInterval = 1000 * RETRY_INTERVAL_SECONDS(retryTime)
logError(s"cannot get connect with Event Hubs Rest Endpoint for partition" +
s" $ehNameAndPartition, retry after $retryInterval seconds")
Thread.sleep(retryInterval)
retryTime += 1
}
} else {
successfullyFetched = true
}
} catch {
case e: SocketTimeoutException =>
e.printStackTrace()
logError("Event Hubs return ReadTimeout with 30s as threshold, retrying...")
case e: Exception =>
e.printStackTrace()
throw e
}
val endpointOffset = fromResponseBodyToResult(response.body)
logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset")
(ehNameAndPartition, endpointOffset)
}
val endpointOffset = fromResponseBodyToResult(response.body)
logDebug(s"latest offset of $ehNameAndPartition: $endpointOffset")
(ehNameAndPartition, endpointOffset)
}
}

private def queryPartitionRuntimeInfo[T](
targetEventHubsNameAndPartitions: List[EventHubNameAndPartition],
fromResponseBodyToResult: String => T, retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, T]] = {
val futures = new ListBuffer[Future[(EventHubNameAndPartition, T)]]
if (targetEventHubsNameAndPartitions.isEmpty) {
for ((eventHubName, numPartitions) <- numPartitionsEventHubs;
partitionId <- 0 until numPartitions) {
futures += composeQuery(retryIfFail, fromResponseBodyToResult,
EventHubNameAndPartition(eventHubName, partitionId))
}
} else {
for (targetNameAndPartition <- targetEventHubsNameAndPartitions) {
futures += composeQuery(retryIfFail, fromResponseBodyToResult, targetNameAndPartition)
}
}
aggregateResults(futures.toList)
Expand All @@ -136,9 +165,12 @@ private[eventhubs] class RestfulEventHubClient(
// empty
}

override def endPointOfPartition(retryIfFail: Boolean):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
queryPartitionRuntimeInfo(fromResponseBodyToEndpoint, retryIfFail)
override def endPointOfPartition(
retryIfFail: Boolean,
targetEventHubsNameAndPartitions: List[EventHubNameAndPartition]):
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
queryPartitionRuntimeInfo(targetEventHubsNameAndPartitions,
fromResponseBodyToEndpoint, retryIfFail)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar


test("skip the batch when failed to fetch the latest offset of partitions") {
val eventHubClientMock = mock[EventHubClient]
Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true)).thenReturn(None)
val ehDStream = new EventHubDirectDStream(ssc, eventhubNamespace, progressRootPath.toString,
Map("eh1" -> eventhubParameters))
val eventHubClientMock = mock[EventHubClient]
Mockito.when(eventHubClientMock.endPointOfPartition(retryIfFail = true,
targetEventHubNameAndPartitions = ehDStream.eventhubNameAndPartitions.toList)).
thenReturn(None)
ehDStream.setEventHubClient(eventHubClientMock)
ssc.scheduler.start()
intercept[IllegalStateException] {
Expand All @@ -79,7 +81,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))
),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput)
testProgressTracker(eventhubNamespace,
OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
Expand Down Expand Up @@ -108,7 +110,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.window(Seconds(2), Seconds(1)).map(
eventData => eventData.getProperties.get("output").toInt + 1),
eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput)
testProgressTracker(eventhubNamespace,
OffsetRecord(Time(3000L), Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
Expand Down Expand Up @@ -159,7 +161,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) =>
inputDStream1.flatMap(eventData => eventData.getProperties.asScala).
join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)).
map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)},
map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])},
expectedOutput)
testProgressTracker("namespace1",
OffsetRecord(Time(2000L), Map(EventHubNameAndPartition("eh11", 0) -> (5L, 5L),
Expand Down Expand Up @@ -188,7 +190,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 2) -> (-1L, -1L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput,
rddOperation = Some((rdd: RDD[Int], t: Time) => {
Array(rdd.take(1).toSeq)
Expand Down Expand Up @@ -219,7 +221,7 @@ class EventHubDirectDStreamSuite extends EventHubTestSuiteBase with MockitoSugar
EventHubNameAndPartition("eh1", 1) -> (3L, 3L),
EventHubNameAndPartition("eh1", 2) -> (3L, 3L)))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput,
messagesBeforeEmpty = 4,
numBatchesBeforeNewData = 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ private[eventhubs] trait EventHubTestSuiteBase extends TestSuiteBase {
Whitebox.setInternalState(msg, "systemProperties", systemProperties.asInstanceOf[Any])
property match {
case p @ Tuple2(_, _) =>
msg.getProperties.put(p._1.toString, p._2.toString)
msg.getProperties.put(p._1.toString, p._2.asInstanceOf[AnyRef])
case _ =>
msg.getProperties.put("output", property.toString)
msg.getProperties.put("output", property.asInstanceOf[AnyRef])
}
eventDataArray(offsetSetInQueue) = msg
offsetSetInQueue += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)
val eventHubDirectDStream = ssc.graph.getInputStreams().filter(
_.isInstanceOf[EventHubDirectDStream]).head.asInstanceOf[EventHubDirectDStream]
Expand Down Expand Up @@ -110,7 +110,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
Expand Down Expand Up @@ -193,7 +193,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.window(Seconds(2), Seconds(1)).map(
eventData => eventData.getProperties.get("output").toInt + 1),
eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
Expand Down Expand Up @@ -246,7 +246,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
operation = (inputDStream1: EventHubDirectDStream, inputDStream2: EventHubDirectDStream) =>
inputDStream1.flatMap(eventData => eventData.getProperties.asScala).
join(inputDStream2.flatMap(eventData => eventData.getProperties.asScala)).
map{case (key, (v1, v2)) => (key, v1.toInt + v2.toInt)},
map{case (key, (v1, v2)) => (key, v1.asInstanceOf[Int] + v2.asInstanceOf[Int])},
expectedOutputBeforeRestart,
expectedOutputAfterRestart)
}
Expand Down Expand Up @@ -276,7 +276,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)

testProgressTracker(
Expand Down Expand Up @@ -307,7 +307,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (7L, 7L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputAfterRestart)

testProgressTracker(
Expand Down Expand Up @@ -344,7 +344,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)

testProgressTracker(
Expand Down Expand Up @@ -413,7 +413,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutputBeforeRestart)

testProgressTracker(
Expand Down Expand Up @@ -489,7 +489,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
)),
operation = (inputDStream: EventHubDirectDStream) =>
inputDStream.map(eventData => eventData.getProperties.get("output").toInt + 1),
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
expectedOutput = expectedOutputBeforeRestart)

val currentCheckpointDirectory = ssc.checkpointDir
Expand Down
Loading