- Linking
- User Configurations
- Reading Data from Event Hubs
- Writing Data to Event Hubs
- Recovering from Failures with Checkpointing
- Managing Throughput
- Serialization of Event Data Properties
- Deploying
Structured streaming integration for Azure Event Hubs is ultimately run on the JVM, so you'll need to import the libraries from the Maven coordinate below:
groupId = com.microsoft.azure
artifactId = azure-eventhubs-spark_2.11
version = 2.3.22
or
groupId = com.microsoft.azure
artifactId = azure-eventhubs-spark_2.12
version = 2.3.22
For Python applications, you need to add this above library and its dependencies when deploying your application. See the Deploying subsection below.
An Event Hubs connection string is required to connect to the Event Hubs service. You can get the connection string for your Event Hubs instance from the Azure Portal.
Connection strings must contain an Endpoint
, EntityPath
(the Event Hub name), SharedAccessKeyName
, and SharedAccessKey
:
Endpoint=sb://{NAMESPACE}.servicebus.windows.net/{EVENT_HUB_NAME};EntityPath={EVENT_HUB_NAME};SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}
All configuration relating to Event Hubs happens in your Event Hubs configuration dictionary. The configuration dictionary must contain an Event Hubs connection string:
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {}
# For versions before 2.3.15, set the connection string without encryption
# ehConf['eventhubs.connectionString'] = connectionString
# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
Please read the Connection String subsection for more information on obtaining a valid connection string.
Additionally, the following configurations are optional:
Option | value | default | query type | meaning |
---|---|---|---|---|
eventhubs.consumerGroup | string |
"$Default" | streaming and batch | A consumer group is a view of an entire event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. More info is available here |
eventhubs.startingPositions | JSON string |
end of stream | streaming and batch | Sets starting positions for specific partitions. If any positions are set in this option, they take priority over any other option. If nothing is configured within this option, then the setting in startingPosition is used. If no position has been set in either option, we will start consuming from the end of the partition. |
eventhubs.startingPosition | JSON string |
end of stream | streaming and batch | The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions , then we use the EventPosition set in startingPosition . If nothing is set in either option, we will begin consuming from the end of the partition. |
eventhubs.endingPositions | JSON string |
end of stream | batch query | The ending position of a batch query on a per partition basis. This works the same as startingPositions . |
eventhubs.endingPosition | JSON string |
end of stream | batch query | The ending position of a batch query. This works the same as startingPosition . |
maxEventsPerTrigger | long |
partitionCount * 1000 |
streaming query | Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. |
eventhubs.receiverTimeout | string |
60 seconds | streaming and batch | The amount of time Event Hub receive calls will be retried before throwing an exception. The time should be a string in PT%HH%MM%SS format (Please see this example). |
eventhubs.operationTimeout | string |
60 seconds | streaming and batch | The amount of time Event Hub API calls will be retried before throwing an exception. The time should be a string in PT%HH%MM%SS format (Please see this example). |
eventhubs.prefetchCount | int |
500 |
streaming and batch | Sets the prefetch count for the underlying receiver and controls how many events are received in advance. The acceptable range for prefetch count is [1, 4000] |
eventhubs.threadPoolSize | int |
16 |
streaming and batch | Sets the size of thread pool. |
eventhubs.useExclusiveReceiver | boolean |
true |
streaming and batch | Determines if the connector uses epoch (i.e. exclusive) or non-epoch receiver. The connector uses epoch receivers by default. If you decide to use non-epoch receivers please be aware of its limitation of allowing up to 5 concurrent receivers. More info is available here. |
eventhubs.maxSilentTime | string |
com.microsoft.azure.eventhubs.EventHubClientOptions.SILENT_OFF |
streaming and batch | Sets the maximum silent time (or SILENT_OFF ) for a receiver. In case maxSilentTime is set to a value other than SILENT_OFF , we will try to recreate the receiver if there is no activity for the length of this duration. Note that maxSilentTime should be at least EventHubClientOptions.SILENT_MINIMUM = 30 seconds. |
eventhubs.DynamicPartitionDiscovery | boolean |
false |
streaming query | Determines if the number of partitions in the event hubs instance may be updated during the execution. The time should be a string in PT%HH%MM%SS format (Please see this example). If set to true , the number of partitions would be updated every 300 seconds. |
eventhubs.slowPartitionAdjustment | boolean |
false |
streaming query | Determines if the the number of events to be read from each partition should be adjusted based on its performance or not. More info is available here. |
eventhubs.maxAcceptableBatchReceiveTime | string |
30 seconds | streaming | Sets the max time that is acceptable for a partition to receive events in a single batch. The time should be a string in PT%HH%MM%SS format (Please see this example). This value is being used to identify slow partitions when SlowPartitionAdjustment is enabled. Only partitions that take more than this time to receive their portion of events in batch are considered as potential slow partitions. More info is available here. |
eventhubs.throttlingStatusPlugin | string |
None | streaming | The name of a class extending the org.apache.spark.eventhubs.utils.ThrottlingStatusPlugin trait to monitor the performance of partitions when SlowPartitionAdjustment is enabled. More info is available here. |
eventhubs.useAadAuth | boolean |
false |
streaming and batch | Determines if AAD authentication should be used instead of the connection string to access Event Hubs. This flag should be set in PySpark to use AAD authentication. More info is available here. |
eventhubs.aadAuthCallback | string |
None | streaming and batch | The name of a callback class extending the org.apache.spark.eventhubs.utils.AadAuthenticationCallback trait to use AAD authentication to access Event Hubs. More info is available here. |
eventhubs.AadAuthCallbackParams | JSON string |
None | streaming and batch | Sets the parameters passed to the AAD authentication callback class. More info is available here. |
eventhubs.metricPlugin | string |
None | streaming and batch | The name of a class extending the org.apache.spark.eventhubs.utils.MetricPlugin trait to monitor send and receive operations performance. org.apache.spark.eventhubs.utils.SimpleLogMetricPlugin implements a simple example that just logs the operation performance. |
Constructing an Event Hubs configuration dictionary is involved, but the examples below should (hopefully) be a thorough explanation. The following examples all assume the same configuration dictionary initialized here:
ehConf['eventhubs.consumerGroup'] = "DESIRED.CONSUMER.GROUP"
The position can be an enqueued time, offset, sequence number, the start of the stream, or the end of the stream. These options are selected by setting the corresponding variable in the table below. Only one of offset
, seqNo
, and enqueuedTime
may be in use at any given time.
Option | valid values when in use | valid value when not in use |
---|---|---|
offset |
string containing an integer offset, "-1" for start of stream, "@latest" for end of stream |
None |
seqNo |
long | -1 |
enqueuedTime |
string of format "YYYY-MM-DDTHH:MM:SS.ssssZ" | None |
isInclusive |
True, False | N/A |
from datetime import datetime as dt
import json
# Start from beginning of stream
startOffset = "-1"
# End at the current time. This datetime formatting creates the correct string format from a python datetime object
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
# Create the positions
startingEventPosition = {
"offset": startOffset,
"seqNo": -1, #not in use
"enqueuedTime": None, #not in use
"isInclusive": True
}
endingEventPosition = {
"offset": None, #not in use
"seqNo": -1, #not in use
"enqueuedTime": endTime,
"isInclusive": True
}
# Put the positions into the Event Hub config dictionary
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)
This configuration allows for the batch processing of events occurring from the start of the stream to the current time.
For advanced users, we have provided the option to configure starting and ending positions on a per partition basis. Consider:
ehName = "YOUR.EVENT.HUB.NAME"
# Create event position for partition 0
positionKey1 = {
"ehName": ehName,
"partitionId": 0
}
eventPosition1 = {
"offset": "@latest",
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
# Create event position for partition 2
positionKey2 = {
"ehName": ehName,
"partitionId": 2
}
eventPosition2 = {
"offset": None,
"seqNo": 100L,
"enqueuedTime": None,
"isInclusive": True
}
# Create more positions for other partitions here if desired
# Put the rules into a map. The position key dictionaries must be made into JSON strings to act as the key.
positionMap = {
json.dumps(positionKey1) : eventPosition1,
json.dumps(positionKey2) : eventPosition2
}
# Place the map into the main Event Hub config dictionary
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)
In this case, partition 0 starts reading all new events (i.e. from the end of the partition) while partition 2 starts reading from sequence number 100L. Set up for ending positions follows the same set up.
Users wanting to customize the receiver or operation timeout settings can do so using the following example:
# Using formatting methods from python's time object to create string durations
receiverTimeoutDuration = datetime.time(0,3,20).strftime("PT%HH%MM%SS") #200 seconds
operationTimeoutDuration = datetime.time(0,1,0).strftime("PT%HH%MM%SS") #60 seconds
# Setting the receiver timeout to 200 seconds
ehConf["eventhubs.receiverTimeout"] = receiverTimeoutDuration
# Setting the receiver timeout to 60 seconds
ehConf["eventhubs.operationTimeout"] = operationTimeoutDuration
Note that the timeout duration format is different than the enqueuedTime
format in the Event Position section. The timeout format must follow the ISO-8601 representation for time (more specifically, it must be a format accepted by java.time.Duration
).
If using IoT Hub, getting your connection string is the only part of the process that is different - all other documentation still applies. Follow these instructions to get your EventHubs-compatible connection string:
- Go to the Azure Portal and find your IoT Hub instance
- Click on Endpoints under Messaging. Then click on Events.
- Find your
EventHub-compatible name
andEventHub-compatible endpoint
.
# Note: Remove '{' and '}' when you add your endpoint and eventhub compatible name.
connectionString = "Endpoint={YOUR.EVENTHUB.COMPATIBLE.ENDPOINT};EntityPath={YOUR.EVENTHUB.COMPATIBLE.NAME}"
ehConf['eventhubs.connectionString'] = connectionString
# Source with default settings
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {
'eventhubs.connectionString' : connectionString
}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
# Source with default settings
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {
'eventhubs.connectionString' : connectionString
}
# Simple batch query
df = spark \
.read \
.format("eventhubs") \
.options(**ehConf) \
.load()
df = df.withColumn("body", df["body"].cast("string"))
Each row in the source has the following schema:
Column | Type |
---|---|
body | binary |
offset | string |
sequenceNumber | long |
enqueuedTime | timestamp |
publisher | string |
partitionKey | string |
properties | map[string, json] |
connectionDeviceID | string |
systemProperties | map[string, json] |
Here, we describe the support for writting Streaming Queries and Batch Queries to Azure EventHubs. Take note that, today, Azure EventHubs only supports at least once semantics. Consequently, when writing - either Streaming Queries or Batch Queries - to EventHubs, some records may be duplicated; this can happen, for example, if EventHubs needs to retry an event that was not acknowledged by the EventHubs service, event if the service received and stored the event. Structured Streaming cannot prevent such duplicates from ocurring due to these EventHubs write semantics. However, if writing the query is successful, then you can assume that the query output was written at least once. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading.
The Dataframe being written to EventHubs should have the following columns in the schema:
Column | Type |
---|---|
body (required) | string or binary |
partitionId (*optional) | string |
partitionKey (*optional) | string |
- Only one (partitionId or partitionKey) can be set at a time. If both are set, your Structured Streaming job will be stopped.
The body column is the only required option. If a partitionId and partitionKey are not provided, then events will distributed to partitions using a round-robin model. Alternatively, if a partitionId is provided, the query output will be sent to that specific partition exclusively. Sending to a single partition is not a recommended pattern. Finally, if a partionKey is provided, each event will be sent with the provided partitionKey. For more information on how a partitionKey works, click here.
# Set up the Event Hub config dictionary with default settings
writeConnectionString = "YOUR.EVENTHUB.NAME"
ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}
# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
ds = df \
.select("body") \
.writeStream \
.format("eventhubs") \
.options(**ehWriteConf) \
.option("checkpointLocation", "///output.txt") \
.start()
# Write body data from a DataFrame to EventHubs with a partitionKey
ds = df \
.selectExpr("partitionKey", "body") \
.writeStream \
.format("eventhubs") \
.options(**ehWriteConf) \
.option("checkpointLocation", "///output.txt") \
.start()
# Set up the Event Hub config dictionary with default settings
writeConnectionString = "YOUR.EVENTHUB.NAME"
ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}
# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
ds = df \
.select("body") \
.write \
.format("eventhubs") \
.options(**ehWriteConf) \
.save()
# Write body data from a DataFrame to EventHubs with a partitionKey
ds = df \
.selectExpr("partitionKey", "body") \
.write \
.format("eventhubs") \
.options(**ehWriteConf) \
.save()
The connector fully integrates with the Structured Streaming checkpointing mechanism. You can recover the progress and state of you query on failures by setting a checkpoint location in your query. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
When you create an Event Hubs namespace, you are prompted to choose how many throughput units you want for your namespace. A single throughput unit (or TU) entitles you to:
- Up to 1 MB per second of ingress events (events sent into an event hub), but no more than 1000 ingress events or API calls per second.
- Up to 2 MB per second of egress events (events consumed from an event hub).
With that said, your TUs set an upper bound for the throughput in your streaming application, and this upper bound needs to
be set in Spark as well. In Structured Streaming, this is done with the maxEventsPerTrigger
option.
Let's say you have 1 TU for a single 4-partition Event Hub instance. This means that Spark is able to consume 2 MB per second
from your Event Hub without being throttled. If maxEventsPerTrigger
is set such that Spark consumes less than 2 MB, then consumption
will happen within a second. You're free to leave it as such or you can increase your maxEventsPerTrigger
up to 2 MB per second.
If maxEventsPerTrigger
is set such that Spark consumes greater than 2 MB, your micro-batch will always take more than one second
to be created because consuming from Event Hubs will always take at least one second. You're free to leave it as is or you can increase
your TUs to increase throughput.
Users can pass custom key-value properties in their EventData
. These properties are exposed in the Spark SQL schema. Keys
are exposed as strings, and values are exposed as json-serialized strings. Native types are supported out of the box. Custom
AMQP types need to be handled explicitly by the connector. Below we list the AMQP types we support and how they are handled:
Binary
- the underlying byte array is serialized.Decimal128
- the underlying byte array is serialized.Decimal32
- the underlying integer representation is serialized.Decimal64
- the underlying long representation is serialized.Symbol
- the underlying string is serialized.UnsignedByte
- the underlying string is serialized.UnsignedInteger
- the underlying string is serialized.UnsignedLong
- the underlying string is serialized.UnsignedShort
- the underlying string is serialized.
As with any Spark applications, spark-submit
is used to launch your application. azure-eventhubs-spark_2.11
and its dependencies can be directly added to spark-submit
using --packages
, such as,
./bin/spark-submit --packages com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.22 ...
For experimenting on spark-shell
, you can also use --packages
to add azure-eventhubs-spark_2.11
and its dependencies directly,
./bin/spark-shell --packages com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.22 ...
See Application Submission Guide for more details about submitting applications with external dependencies.