Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End time doesn't works #580

Closed
HaimBendanan opened this issue Jan 28, 2021 · 12 comments
Closed

End time doesn't works #580

HaimBendanan opened this issue Jan 28, 2021 · 12 comments
Assignees

Comments

@HaimBendanan
Copy link

HaimBendanan commented Jan 28, 2021

I am trying to set up a streaming spark job with a start and end time. I am following this tutorial, but noticed that the endTime property doesn't work. I am using PySpark, here is the code:

connectionString = "***********"

ehConf = {}

ehConf['eventhubs.connectionString'] = self.spark.sparkContext._gateway.jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

ehConf['eventhubs.consumerGroup'] = "myConsumerGroup"


startTime = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
endTime = (datetime.now()+timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

startingEventPosition = {
    "offset": None,  # not in use
    "seqNo": -1,  # not in use
    "enqueuedTime": startTime,
    "isInclusive": True
}

endingEventPosition = {
    "offset": None,  # not in use
    "seqNo": -1,  # not in use
    "enqueuedTime": endTime,
    "isInclusive": True
}

# Put the positions into the Event Hub config dictionary
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

df = (self.spark
      .readStream
      .format("eventhubs")
      .options(**ehConf)
      .load())
  • Actual behavior: The job continue to process data that arrives after endTime
  • Expected behavior: The job should not process data with enqueue time > endTime
  • Spark version: Tested on 2.4 and 3, on Hdinsight and Databricks
  • spark-eventhubs artifactId and version: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18
    (tested with 17 too)
@sgautrin
Copy link

sgautrin commented Jan 29, 2021

Got the same issue using scala from Azure Databricks, using setEndingPosition on EventHubsConf (used together with setStartingPosition as well); I tried with

  • .setEndingPosition(EventPosition.fromEndOfStream)
  • .setEndingPosition(EventPosition.fromEnqueuedTime(java.time.Instant.now))
  • .setEndingPosition(EventPosition.fromEnqueuedTime(java.time.Instant.parse("2021-01-26T12:00:00Z"))) to specify an exact point in time

In every case, with only that the stream does not stop and still process new events notably after that date: with the date above events from the 29th (today) are processed as well. Ending position seems to have absolutely no effect.

  • Databricks runtimes: 6.6.x-scala2.11 (spark 2.4.5, scala 2.11), 6.4.x-scala2.11 (spark 2.4.5, scala 2.11) (6.6.x-scala2.11 seem to have been replaced on databricks by 6.4.x-scala2.11 but with the same spark/scala versions)
  • spark eventhubs: tested with com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 and com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.18

@nyaghma nyaghma self-assigned this Jan 29, 2021
@nyaghma
Copy link
Contributor

nyaghma commented Jan 29, 2021

@HaimBendanan & @sgautrin
The endingPosition / endingPositions is being used only in batch queries. Since in a streaming query there is no end position, this value would be simply ignored even if it's been set in the client code. If you want to read events to a certain point use a batch query instead of a streaming query.

@HaimBendanan
Copy link
Author

HaimBendanan commented Jan 29, 2021

@nyaghma no way to fix/overcome that? For my use case, I want to have 1 streaming job per hour (00-01, 01-02, etc)
The job would stream in real time (most of the time). Having hourly jobs allows for easier monitoring and easier rerun capabilities.

I guess that when batch reading from event hub, it can only be done on data that already exists there right?

Do you have a suggestion on how to achieve this? I could filter by time < endTime in my spark job, but how can I know that everything was processed and that the job can be stopped?

@nyaghma
Copy link
Contributor

nyaghma commented Jan 29, 2021

@HaimBendanan , There are several configurations that you can use with a streaming job that could help your use case. For instance, you can use triggers to control the timing of the data processing and use checkpoints to keep track of the state of your query. In case you use triggers make sure you set the "maxEventsPerTrigger" properly with respect to your use case as well.

With regards to the batch queries you are right, it can read data up to the latest event available in the eventhub at the time of batch execution.

@HaimBendanan
Copy link
Author

@nyaghma the default trigger fits my use case, other trigger types don't solve my problem. Is that possible to understand what is the latest processed timestamp from the Spark query object? If yes, I could monitor that timestamp and end the job when reaching the target end time.

Would it be possible to add support of end event time in streaming queries? How much effort/work would be required for that?

@nyaghma
Copy link
Contributor

nyaghma commented Jan 29, 2021

@HaimBendanan There is an "enqueuedTime" column in the event schema that you can use for your monitoring purpose. You can iterate over events received in a batch and find the event with the latest timestamp among those.

Streaming queries are designed for continuous and real-time data updates, so adding an end time won't match their purpose.

@HaimBendanan
Copy link
Author

HaimBendanan commented Jan 29, 2021

@nyaghma the enqueue time column can be used to not ingest more data than needed, but I would also need a way to stop the job once the end is reached.

Regarding your point about continuous and real-time data updates - we would like to implement a sort of hybrid between batch and streaming, lets call it "streaming in batches". Instead of having one long job running for weeks, we want to run hourly jobs, each job handling the data of a given hour.

The advantages of that approach:

  • Easy monitoring (same as for our existing batch infra)
  • Ability to re-run specific hours, in parallel of real time processing
  • Ability to write the results to a custom folder hierarchy (year/month/day/hour), without relying on "partitionBy"
  • Avoids long running jobs (better for cluster management, clean job internal state every hour)
  • Seamless deployment - old job stops after an hour while new job starts

For that scenario, it looks like being able to set an end time is necessary. WDYT?

@nyaghma
Copy link
Contributor

nyaghma commented Feb 1, 2021

I still don't think adding an end time to a streaming job is necessary since you can achieve the mentioned goals by combining existing functionalities:

  • A streaming status could be monitored and stopped if specific criteria are met.
  • Batch queries could be used to re-run specific hours.

@HaimBendanan
Copy link
Author

HaimBendanan commented Feb 1, 2021

  • A streaming status could be monitored and stopped if specific criteria are met.

@nyaghma Can you give an example of how to achieve that? How can I detect that I reached an enqueue time X and stop the Spark job?
The latest progress field of the query object contains a timestamp field, but it represents the batch run time, not the event hub enqueued time.

@nyaghma
Copy link
Contributor

nyaghma commented Feb 1, 2021

The 'enqueuedTime' value represents the actual time of enqueuing the message. So you can create a stream in a separate thread, monitoring the 'enqueuedTime' during the stream execution and set desired filters/flags. The parent thread can then stop the stream once a flag is set or after some time of execution.

@HaimBendanan
Copy link
Author

@nyaghma how does that reliably solves the problem? The 2 queries would have a different progression status. For example, I could reach enqueue time x in the first query, but still lag behind in the other query. Could you please elaborate or write code?

@ganeshchand
Copy link

isn't "streaming in scheduled batches" called Trigger.Once?

@nyaghma nyaghma closed this as completed Jun 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants