-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
End time doesn't works #580
Comments
Got the same issue using scala from Azure Databricks, using
In every case, with only that the stream does not stop and still process new events notably after that date: with the date above events from the 29th (today) are processed as well. Ending position seems to have absolutely no effect.
|
@HaimBendanan & @sgautrin |
@nyaghma no way to fix/overcome that? For my use case, I want to have 1 streaming job per hour (00-01, 01-02, etc) I guess that when batch reading from event hub, it can only be done on data that already exists there right? Do you have a suggestion on how to achieve this? I could filter by time < endTime in my spark job, but how can I know that everything was processed and that the job can be stopped? |
@HaimBendanan , There are several configurations that you can use with a streaming job that could help your use case. For instance, you can use triggers to control the timing of the data processing and use checkpoints to keep track of the state of your query. In case you use triggers make sure you set the "maxEventsPerTrigger" properly with respect to your use case as well. With regards to the batch queries you are right, it can read data up to the latest event available in the eventhub at the time of batch execution. |
@nyaghma the default trigger fits my use case, other trigger types don't solve my problem. Is that possible to understand what is the latest processed timestamp from the Spark query object? If yes, I could monitor that timestamp and end the job when reaching the target end time. Would it be possible to add support of end event time in streaming queries? How much effort/work would be required for that? |
@HaimBendanan There is an "enqueuedTime" column in the event schema that you can use for your monitoring purpose. You can iterate over events received in a batch and find the event with the latest timestamp among those. Streaming queries are designed for continuous and real-time data updates, so adding an end time won't match their purpose. |
@nyaghma the enqueue time column can be used to not ingest more data than needed, but I would also need a way to stop the job once the end is reached. Regarding your point about continuous and real-time data updates - we would like to implement a sort of hybrid between batch and streaming, lets call it "streaming in batches". Instead of having one long job running for weeks, we want to run hourly jobs, each job handling the data of a given hour. The advantages of that approach:
For that scenario, it looks like being able to set an end time is necessary. WDYT? |
I still don't think adding an end time to a streaming job is necessary since you can achieve the mentioned goals by combining existing functionalities:
|
@nyaghma Can you give an example of how to achieve that? How can I detect that I reached an enqueue time X and stop the Spark job? |
The 'enqueuedTime' value represents the actual time of enqueuing the message. So you can create a stream in a separate thread, monitoring the 'enqueuedTime' during the stream execution and set desired filters/flags. The parent thread can then stop the stream once a flag is set or after some time of execution. |
@nyaghma how does that reliably solves the problem? The 2 queries would have a different progression status. For example, I could reach enqueue time x in the first query, but still lag behind in the other query. Could you please elaborate or write code? |
isn't "streaming in scheduled batches" called |
I am trying to set up a streaming spark job with a start and end time. I am following this tutorial, but noticed that the endTime property doesn't work. I am using PySpark, here is the code:
(tested with 17 too)
The text was updated successfully, but these errors were encountered: