Skip to content

Latest commit

 

History

History
420 lines (321 loc) · 21.8 KB

structured-streaming-pyspark.md

File metadata and controls

420 lines (321 loc) · 21.8 KB

Structured Streaming + Event Hubs Integration Guide for PySpark

Table of Contents

Linking

Structured streaming integration for Azure Event Hubs is ultimately run on the JVM, so you'll need to import the libraries from the Maven coordinate below:

  groupId = com.microsoft.azure
  artifactId = azure-eventhubs-spark_2.11
  version = 2.3.22

or

  groupId = com.microsoft.azure
  artifactId = azure-eventhubs-spark_2.12
  version = 2.3.22

For Python applications, you need to add this above library and its dependencies when deploying your application. See the Deploying subsection below.

User Configuration

Connection String

An Event Hubs connection string is required to connect to the Event Hubs service. You can get the connection string for your Event Hubs instance from the Azure Portal.

Connection strings must contain an Endpoint, EntityPath (the Event Hub name), SharedAccessKeyName, and SharedAccessKey:

Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};EntityPath={EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}

Event Hubs Configuration

All configuration relating to Event Hubs happens in your Event Hubs configuration dictionary. The configuration dictionary must contain an Event Hubs connection string:

connectionString = "YOUR.CONNECTION.STRING"

ehConf = {}

# For versions before 2.3.15, set the connection string without encryption
# ehConf['eventhubs.connectionString'] = connectionString

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

Please read the Connection String subsection for more information on obtaining a valid connection string.

Additionally, the following configurations are optional:

Option value default query type meaning
eventhubs.consumerGroup string "$Default" streaming and batch A consumer group is a view of an entire event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. More info is available here
eventhubs.startingPositions JSON string end of stream streaming and batch Sets starting positions for specific partitions. If any positions are set in this option, they take priority over any other option. If nothing is configured within this option, then the setting in startingPosition is used. If no position has been set in either option, we will start consuming from the end of the partition.
eventhubs.startingPosition JSON string end of stream streaming and batch The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition.
eventhubs.endingPositions JSON string end of stream batch query The ending position of a batch query on a per partition basis. This works the same as startingPositions.
eventhubs.endingPosition JSON string end of stream batch query The ending position of a batch query. This works the same as startingPosition.
maxEventsPerTrigger long partitionCount * 1000 streaming query Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume.
eventhubs.receiverTimeout string 60 seconds streaming and batch The amount of time Event Hub receive calls will be retried before throwing an exception. The time should be a string in PT%HH%MM%SS format (Please see this example).
eventhubs.operationTimeout string 60 seconds streaming and batch The amount of time Event Hub API calls will be retried before throwing an exception. The time should be a string in PT%HH%MM%SS format (Please see this example).
eventhubs.prefetchCount int 500 streaming and batch Sets the prefetch count for the underlying receiver and controls how many events are received in advance. The acceptable range for prefetch count is [1, 4000]
eventhubs.threadPoolSize int 16 streaming and batch Sets the size of thread pool.
eventhubs.useExclusiveReceiver boolean true streaming and batch Determines if the connector uses epoch (i.e. exclusive) or non-epoch receiver. The connector uses epoch receivers by default. If you decide to use non-epoch receivers please be aware of its limitation of allowing up to 5 concurrent receivers. More info is available here.
eventhubs.maxSilentTime string com.microsoft.azure.eventhubs.EventHubClientOptions.SILENT_OFF streaming and batch Sets the maximum silent time (or SILENT_OFF) for a receiver. In case maxSilentTime is set to a value other than SILENT_OFF, we will try to recreate the receiver if there is no activity for the length of this duration. Note that maxSilentTime should be at least EventHubClientOptions.SILENT_MINIMUM = 30 seconds.
eventhubs.DynamicPartitionDiscovery boolean false streaming query Determines if the number of partitions in the event hubs instance may be updated during the execution. The time should be a string in PT%HH%MM%SS format (Please see this example). If set to true, the number of partitions would be updated every 300 seconds.
eventhubs.slowPartitionAdjustment boolean false streaming query Determines if the the number of events to be read from each partition should be adjusted based on its performance or not. More info is available here.
eventhubs.maxAcceptableBatchReceiveTime string 30 seconds streaming Sets the max time that is acceptable for a partition to receive events in a single batch. The time should be a string in PT%HH%MM%SS format (Please see this example). This value is being used to identify slow partitions when SlowPartitionAdjustment is enabled. Only partitions that take more than this time to receive their portion of events in batch are considered as potential slow partitions. More info is available here.
eventhubs.throttlingStatusPlugin string None streaming The name of a class extending the org.apache.spark.eventhubs.utils.ThrottlingStatusPlugin trait to monitor the performance of partitions when SlowPartitionAdjustment is enabled. More info is available here.
eventhubs.useAadAuth boolean false streaming and batch Determines if AAD authentication should be used instead of the connection string to access Event Hubs. This flag should be set in PySpark to use AAD authentication. More info is available here.
eventhubs.aadAuthCallback string None streaming and batch The name of a callback class extending the org.apache.spark.eventhubs.utils.AadAuthenticationCallback trait to use AAD authentication to access Event Hubs. More info is available here.
eventhubs.AadAuthCallbackParams JSON string None streaming and batch Sets the parameters passed to the AAD authentication callback class. More info is available here.
eventhubs.metricPlugin string None streaming and batch The name of a class extending the org.apache.spark.eventhubs.utils.MetricPlugin trait to monitor send and receive operations performance. org.apache.spark.eventhubs.utils.SimpleLogMetricPlugin implements a simple example that just logs the operation performance.

Constructing an Event Hubs configuration dictionary is involved, but the examples below should (hopefully) be a thorough explanation. The following examples all assume the same configuration dictionary initialized here:

Consumer Group

ehConf['eventhubs.consumerGroup'] = "DESIRED.CONSUMER.GROUP"

Event Position

The position can be an enqueued time, offset, sequence number, the start of the stream, or the end of the stream. These options are selected by setting the corresponding variable in the table below. Only one of offset, seqNo, and enqueuedTime may be in use at any given time.

Option valid values when in use valid value when not in use
offset string containing an integer offset,
"-1" for start of stream,
"@latest" for end of stream
None
seqNo long -1
enqueuedTime string of format "YYYY-MM-DDTHH:MM:SS.ssssZ" None
isInclusive True, False N/A
from datetime import datetime as dt
import json

# Start from beginning of stream
startOffset = "-1"

# End at the current time. This datetime formatting creates the correct string format from a python datetime object
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")


# Create the positions
startingEventPosition = {
  "offset": startOffset,  
  "seqNo": -1,            #not in use
  "enqueuedTime": None,   #not in use
  "isInclusive": True
}

endingEventPosition = {
  "offset": None,           #not in use
  "seqNo": -1,              #not in use
  "enqueuedTime": endTime,
  "isInclusive": True
}


# Put the positions into the Event Hub config dictionary
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

This configuration allows for the batch processing of events occurring from the start of the stream to the current time.

Per Partition Configuration

For advanced users, we have provided the option to configure starting and ending positions on a per partition basis. Consider:

ehName = "YOUR.EVENT.HUB.NAME"

# Create event position for partition 0
positionKey1 = {
  "ehName": ehName,
  "partitionId": 0
}

eventPosition1 = {
  "offset": "@latest",    
  "seqNo": -1,            
  "enqueuedTime": None,   
  "isInclusive": True
}

# Create event position for partition 2
positionKey2 = {
  "ehName": ehName,
  "partitionId": 2
}

eventPosition2 = {
  "offset": None,     
  "seqNo": 100L,        
  "enqueuedTime": None,
  "isInclusive": True
}

# Create more positions for other partitions here if desired

# Put the rules into a map. The position key dictionaries must be made into JSON strings to act as the key.
positionMap = {
  json.dumps(positionKey1) : eventPosition1,
  json.dumps(positionKey2) : eventPosition2
}

# Place the map into the main Event Hub config dictionary
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)

In this case, partition 0 starts reading all new events (i.e. from the end of the partition) while partition 2 starts reading from sequence number 100L. Set up for ending positions follows the same set up.

Receiver Timeout and Operation Timeout

Users wanting to customize the receiver or operation timeout settings can do so using the following example:

# Using formatting methods from python's time object to create string durations
receiverTimeoutDuration = datetime.time(0,3,20).strftime("PT%HH%MM%SS") #200 seconds
operationTimeoutDuration = datetime.time(0,1,0).strftime("PT%HH%MM%SS") #60 seconds

# Setting the receiver timeout to 200 seconds
ehConf["eventhubs.receiverTimeout"] = receiverTimeoutDuration

# Setting the receiver timeout to 60 seconds
ehConf["eventhubs.operationTimeout"] = operationTimeoutDuration

Note that the timeout duration format is different than the enqueuedTime format in the Event Position section. The timeout format must follow the ISO-8601 representation for time (more specifically, it must be a format accepted by java.time.Duration).

IoT Hub

If using IoT Hub, getting your connection string is the only part of the process that is different - all other documentation still applies. Follow these instructions to get your EventHubs-compatible connection string:

  1. Go to the Azure Portal and find your IoT Hub instance
  2. Click on Endpoints under Messaging. Then click on Events.
  3. Find your EventHub-compatible name and EventHub-compatible endpoint.
# Note: Remove '{' and '}' when you add your endpoint and eventhub compatible name. 
connectionString = "Endpoint={YOUR.EVENTHUB.COMPATIBLE.ENDPOINT};EntityPath={YOUR.EVENTHUB.COMPATIBLE.NAME}"
ehConf['eventhubs.connectionString'] = connectionString

Reading Data from Event Hubs

Creating an Event Hubs Source for Streaming Queries

# Source with default settings
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {
  'eventhubs.connectionString' : connectionString
}

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

Creating an Event Hubs Source for Batch Queries

# Source with default settings
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {
  'eventhubs.connectionString' : connectionString
}

# Simple batch query
df = spark \
  .read \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()
df = df.withColumn("body", df["body"].cast("string"))

Each row in the source has the following schema:

Column Type
body binary
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string, json]
connectionDeviceID string
systemProperties map[string, json]

Writing Data to Event Hubs

Here, we describe the support for writting Streaming Queries and Batch Queries to Azure EventHubs. Take note that, today, Azure EventHubs only supports at least once semantics. Consequently, when writing - either Streaming Queries or Batch Queries - to EventHubs, some records may be duplicated; this can happen, for example, if EventHubs needs to retry an event that was not acknowledged by the EventHubs service, event if the service received and stored the event. Structured Streaming cannot prevent such duplicates from ocurring due to these EventHubs write semantics. However, if writing the query is successful, then you can assume that the query output was written at least once. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading.

The Dataframe being written to EventHubs should have the following columns in the schema:

Column Type
body (required) string or binary
partitionId (*optional) string
partitionKey (*optional) string
  • Only one (partitionId or partitionKey) can be set at a time. If both are set, your Structured Streaming job will be stopped.

The body column is the only required option. If a partitionId and partitionKey are not provided, then events will distributed to partitions using a round-robin model. Alternatively, if a partitionId is provided, the query output will be sent to that specific partition exclusively. Sending to a single partition is not a recommended pattern. Finally, if a partionKey is provided, each event will be sent with the provided partitionKey. For more information on how a partitionKey works, click here.

Creating an EventHubs Sink for Streaming Queries

# Set up the Event Hub config dictionary with default settings
writeConnectionString = "YOUR.EVENTHUB.NAME"
ehWriteConf = {
  'eventhubs.connectionString' : writeConnectionString
}

# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
ds = df \
  .select("body") \
  .writeStream \
  .format("eventhubs") \
  .options(**ehWriteConf) \
  .option("checkpointLocation", "///output.txt") \
  .start()

# Write body data from a DataFrame to EventHubs with a partitionKey
ds = df \
  .selectExpr("partitionKey", "body") \
  .writeStream \
  .format("eventhubs") \
  .options(**ehWriteConf) \
  .option("checkpointLocation", "///output.txt") \
  .start()

Writing the output of Batch Queries to EventHubs

# Set up the Event Hub config dictionary with default settings
writeConnectionString = "YOUR.EVENTHUB.NAME"
ehWriteConf = {
  'eventhubs.connectionString' : writeConnectionString
}

# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
ds = df \
  .select("body") \
  .write \
  .format("eventhubs") \
  .options(**ehWriteConf) \
  .save()

# Write body data from a DataFrame to EventHubs with a partitionKey
ds = df \
  .selectExpr("partitionKey", "body") \
  .write \
  .format("eventhubs") \
  .options(**ehWriteConf) \
  .save()

Recovering from Failures with Checkpointing

The connector fully integrates with the Structured Streaming checkpointing mechanism. You can recover the progress and state of you query on failures by setting a checkpoint location in your query. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

Managing Throughput

When you create an Event Hubs namespace, you are prompted to choose how many throughput units you want for your namespace. A single throughput unit (or TU) entitles you to:

  • Up to 1 MB per second of ingress events (events sent into an event hub), but no more than 1000 ingress events or API calls per second.
  • Up to 2 MB per second of egress events (events consumed from an event hub).

With that said, your TUs set an upper bound for the throughput in your streaming application, and this upper bound needs to be set in Spark as well. In Structured Streaming, this is done with the maxEventsPerTrigger option.

Let's say you have 1 TU for a single 4-partition Event Hub instance. This means that Spark is able to consume 2 MB per second from your Event Hub without being throttled. If maxEventsPerTrigger is set such that Spark consumes less than 2 MB, then consumption will happen within a second. You're free to leave it as such or you can increase your maxEventsPerTrigger up to 2 MB per second. If maxEventsPerTrigger is set such that Spark consumes greater than 2 MB, your micro-batch will always take more than one second to be created because consuming from Event Hubs will always take at least one second. You're free to leave it as is or you can increase your TUs to increase throughput.

Serialization of Event Data Properties

Users can pass custom key-value properties in their EventData. These properties are exposed in the Spark SQL schema. Keys are exposed as strings, and values are exposed as json-serialized strings. Native types are supported out of the box. Custom AMQP types need to be handled explicitly by the connector. Below we list the AMQP types we support and how they are handled:

  • Binary - the underlying byte array is serialized.
  • Decimal128 - the underlying byte array is serialized.
  • Decimal32 - the underlying integer representation is serialized.
  • Decimal64 - the underlying long representation is serialized.
  • Symbol - the underlying string is serialized.
  • UnsignedByte - the underlying string is serialized.
  • UnsignedInteger - the underlying string is serialized.
  • UnsignedLong - the underlying string is serialized.
  • UnsignedShort - the underlying string is serialized.

Deploying

As with any Spark applications, spark-submit is used to launch your application. azure-eventhubs-spark_2.11 and its dependencies can be directly added to spark-submit using --packages, such as,

./bin/spark-submit --packages com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.22 ...

For experimenting on spark-shell, you can also use --packages to add azure-eventhubs-spark_2.11 and its dependencies directly,

./bin/spark-shell --packages com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.22 ...

See Application Submission Guide for more details about submitting applications with external dependencies.