-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][experimental] Catch errors for DAG API #46264
Conversation
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
36e3605
to
cf2457a
Compare
upstream_node = self.idx_to_task[self.input_task_idx] | ||
|
||
elif isinstance(upstream_node.dag_node, InputNode): | ||
if direct_input is not None and not direct_input: | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that the following example is invalid?
with InputNode() as dag_input:
a = dag_input[0]
ray_dag = add.bind(a, dag_input)
In my understanding, a
is an InputAttributeNode
. Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right!
The problem was that it's now unclear what dag_input
should resolve to. Actually I'm not sure how the non-compiled DAG handles this, let me check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm seems that this is not handled well in non-compiled DAG either:
In [7]: @ray.remote
...: class Actor:
...: def foo(self, *args):
...: return args
...: def bar(self):
...: return self.foo()
...:
In [8]: a = Actor.remote()
In [9]: with InputNode() as inp:
...: dag = a.foo.bind(inp.foo, inp[0], inp)
...:
In [10]: dag.execute(1, foo='hi')
Out[10]: ObjectRef(c2668a65bda616c10d375aa2d30b6f75e1f3eed60100000001000000)
In [11]: ray.get(dag.execute(1, foo="hi"))
Out[11]: ('hi', 1, <ray.dag.input_node.DAGInputData at 0x7f272264d340>)
@@ -1175,13 +1249,33 @@ def execute( | |||
|
|||
self._get_or_compile() | |||
|
|||
self._check_inputs(args, kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this to the end of _get_or_compile()
since it is always called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer to do it separately because get_or_compile
shouldn't need to depend on the inputs to dag.execute()
(which can change each time dag.execute() is called).
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Why are these changes needed?
Catch unhandled errors in DAG API:
Related issue number
Closes #46222.