Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rpc endpoint for direct stream #561

Merged
merged 11 commits into from
Nov 12, 2020
Prev Previous commit
create RPC endpoint for direct stream
  • Loading branch information
Navid Yaghmazadeh committed Nov 12, 2020
commit b25972c8fa5624c394b58b4c0fe18ea439eecc5a
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,27 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
import org.apache.spark.streaming.eventhubs.EventHubsDirectDStream
import org.apache.spark.{ SparkContext, TaskContext }
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.SparkEnv

/**
* Helper to create Direct DStreams which consume events from Event Hubs.
*/
object EventHubsUtils extends Logging {

var partitionPerformanceReceiverRef: RpcEndpointRef = null

private def createRpcEndpoint() = {
if (partitionPerformanceReceiverRef == null) {
// RPC endpoint for partition performance communication in the driver
val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker
val partitionPerformanceReceiver: PartitionPerformanceReceiver =
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker)
partitionPerformanceReceiverRef = SparkEnv.get.rpcEnv
.setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver)
}
}

/**
* Creates a Direct DStream which consumes from the Event Hubs instance
* specified in the [[EventHubsConf]].
Expand All @@ -56,6 +71,7 @@ object EventHubsUtils extends Logging {
* @return An [[EventHubsDirectDStream]]
*/
def createDirectStream(ssc: StreamingContext, ehConf: EventHubsConf): EventHubsDirectDStream = {
createRpcEndpoint()
new EventHubsDirectDStream(ssc, ehConf, EventHubsClient.apply)
}

Expand All @@ -69,6 +85,7 @@ object EventHubsUtils extends Logging {
*/
def createDirectStream(jssc: JavaStreamingContext,
ehConf: EventHubsConf): JavaInputDStream[EventData] = {
createRpcEndpoint()
new JavaInputDStream(createDirectStream(jssc.ssc, ehConf))
}

Expand All @@ -85,6 +102,7 @@ object EventHubsUtils extends Logging {
def createRDD(sc: SparkContext,
ehConf: EventHubsConf,
offsetRanges: Array[OffsetRange]): EventHubsRDD = {
createRpcEndpoint()
new EventHubsRDD(sc, ehConf.trimmed, offsetRanges)
}

Expand All @@ -101,6 +119,7 @@ object EventHubsUtils extends Logging {
def createRDD(jsc: JavaSparkContext,
ehConf: EventHubsConf,
offsetRanges: Array[OffsetRange]): JavaRDD[EventData] = {
createRpcEndpoint()
new JavaRDD(createRDD(jsc.sc, ehConf, offsetRanges))
}

Expand Down