Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rpc endpoint for direct stream #561

Merged
merged 11 commits into from
Nov 12, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,27 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
import org.apache.spark.streaming.eventhubs.EventHubsDirectDStream
import org.apache.spark.{ SparkContext, TaskContext }
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.SparkEnv

/**
* Helper to create Direct DStreams which consume events from Event Hubs.
*/
object EventHubsUtils extends Logging {

var partitionPerformanceReceiverRef: RpcEndpointRef = null

private def createRpcEndpoint() = {
if (partitionPerformanceReceiverRef == null) {
// RPC endpoint for partition performance communication in the driver
val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker
val partitionPerformanceReceiver: PartitionPerformanceReceiver =
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker)
partitionPerformanceReceiverRef = SparkEnv.get.rpcEnv
.setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver)
}
}

/**
* Creates a Direct DStream which consumes from the Event Hubs instance
* specified in the [[EventHubsConf]].
Expand All @@ -56,6 +71,7 @@ object EventHubsUtils extends Logging {
* @return An [[EventHubsDirectDStream]]
*/
def createDirectStream(ssc: StreamingContext, ehConf: EventHubsConf): EventHubsDirectDStream = {
createRpcEndpoint()
new EventHubsDirectDStream(ssc, ehConf, EventHubsClient.apply)
}

Expand All @@ -69,6 +85,7 @@ object EventHubsUtils extends Logging {
*/
def createDirectStream(jssc: JavaStreamingContext,
ehConf: EventHubsConf): JavaInputDStream[EventData] = {
createRpcEndpoint()
new JavaInputDStream(createDirectStream(jssc.ssc, ehConf))
}

Expand All @@ -85,6 +102,7 @@ object EventHubsUtils extends Logging {
def createRDD(sc: SparkContext,
ehConf: EventHubsConf,
offsetRanges: Array[OffsetRange]): EventHubsRDD = {
createRpcEndpoint()
new EventHubsRDD(sc, ehConf.trimmed, offsetRanges)
}

Expand All @@ -101,6 +119,7 @@ object EventHubsUtils extends Logging {
def createRDD(jsc: JavaSparkContext,
ehConf: EventHubsConf,
offsetRanges: Array[OffsetRange]): JavaRDD[EventData] = {
createRpcEndpoint()
new JavaRDD(createRDD(jsc.sc, ehConf, offsetRanges))
}

Expand Down