-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplement replicate()
using the Active Memory Manager
#6578
Comments
Two separate users raised the need for this: https://dask.discourse.group/t/how-do-i-get-workers-to-retain-a-large-object-needed-over-several-task-graphs/2138/5 |
Is there still an appetite to fix this? I just ran into the race condition of #6713 and was wondering if |
@seydar there is some appetite but we're currently lacking visibility on the benefit it would provide to users, so it's not on the roadmap at the moment. |
@crusaderky I was getting the same error as #6713, but I've been able to trace down the error a little more precisely to #8375. Ultimately, I guess I'd like better support for replication when a worker is restarted in the middle of a task. |
@seydar sorry I may not have been clear. |
@crusaderky Dask workers are crashing (mentioned in #8377 and #8375), which is then leading to the same error in #6713. This comment talks about replicate being the root cause of the worker count unexpectedly dropping while a worker is being restarted, so I figured I would follow up. Looking more closely at the scheduler.py code that is mentioned is #6713, it looks like maybe a solution that doesn't involve |
@seydar, what I'm asking is: why do you call |
Sorry, I'm not calling |
If you're not calling |
This issue is very tightly related to #4906.
replicate()
, as well as its wrapperscatter(..., broadcast=True)
, have several issues:Proposed design
Reimplement replicate() on top of the Active Memory Manager.
The command will just start an AMM policy for the involved keys.
The policy will track the keys and, every two seconds, create new replicas if there aren't enough.
Once the keys cease to exist, or the client calls
replicate(n=1)
on the same keys, the policy detaches itself from the AMM.The number of desired replicas will be tracked through a
replicate: <n>
annotation on the involved keys.Side effects
replicate()
becomes non-blocking. It won't wait for replication to complete and won't even wait for keys to become in-memory.replicate(n=inf)
will become the default option; it means that if at any point in the future a new worker joins the cluster, the key will be immediately replicated onto it.Optional additional feature
Alternatively, the client may also annotate the keys directly when building the graph:
For this to work, when a new key lands on the scheduler, there must be machinery that parses the annotations, detects the
replicate
tag, and invokes Scheduler.replicate() under the hood.This can be neatly implemented by a Scheduler plugin.
Proposed contentious breaking chances
replicate()
command will fail if the AMM is not enabled.The text was updated successfully, but these errors were encountered: