-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LIN-705: Return taskgraph for pipeline integrations to have correct data dependencies #854
Conversation
# add edges between each pair of sink/sources | ||
|
||
inter_artifact_taskgraph.graph.add_edges_from( | ||
itertools.product( | ||
from_session_sink_nodes, to_session_source_nodes | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This "stitching" of artifacts based on session dependency may interpolate more dependency information than what the user intended/provided. For instance, if a session results in two sink nodes of A
and B
and the user provides dependency info from A
to C
in a different session, then this current logic will interpolate dependency relation between B
and C
as well. Is this something we desire?
In contrast, existing implementation of combined_taskgraph
directly uses user-specified dependency information to stitch artifacts between sessions. Hence, for the example above, had the user specified dependency between A
and C
only, they will not get the interpolated dependency between B
and C
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a related note, is the plan to replace combined_taskgraph
with inter_artifact_taskgraph
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great point which is why I wanted a review of this first.
I wanted to do this stitching because of the following case:
Session 1 will run Task A
and Task B
which creates Artifact B
.
Session 2 will run Task C
and Task D
which creates Artifact D
.
In the case that Task C reads from the output of Task B the users cannot specify this dependency easily.
User can only specify dependencies based on Artifacts, say they can only say to run B >> D
(B before D).
If we don't enforce dependencies on a session level, Task C may run before Task B.
If you do, you get the problems described above which is osme of the dependencies will be interpolated. C may not depend on B if a user specifies B >> D.
I personally lean towards over specifying dependencies (bad for performance since less parallelization and bad for readability) instead of potentially having the case above (user literally cannot specify the dependency they want unless they hack our graph)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second point is good, yeah I think combined_taskgraph
is only used for validating dependencies, which should be done on the inter_artifact_taskgraph
or inter_session_taskgraph
. combined_taskgraph
may not be needed,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interpreting
Task C
as common code factored out by LineaPy (e.g.,task_common_var_for_artifact_D_and_downstream
)Task D
as a task responsible for generatingArtifact D
(e.g.,task_D
)
I think the point you made above is valid. That is, Task B
should be set as a dependency for Task C
, but relying solely on the user-provided information will miss this, which may result in erroneous parallelization. In fact, the way existing combined_taskgraph
constructs inter-session dependency seems subject to this error.
As we discussed on a call, the challenge would lie in whether/how we can identify source node(s) related to the target artifact only. That is, had Task C
above been NOT related to generation of Artifact D
, then we would ideally NOT create dependency between Task B
and Task C
even if Task C
is a source node of Session 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can give a counter example of why we should not create dependency between Task B
and Task C
. Let's say someone is working on the two notebooks(the first notebook has Task A
and Task B
and the second notebook has Task C
and Task D
like the same setup in this thread) at the same time(maybe she/he likes two screens/computers).
Task A
writes to a side-effect (let's saya.csv
) and save artifact a.Task C
reads from the side-effect created byTask A
; i.e., read froma.csv
, and write to another side-effect (let's sayc.csv
) save artifact c.Task B
reads from the side-effect created byTask C
; i.e., read fromc.csv
, and save artifact b.
In this case, we should not create dependency between Task B
and Task C
; it should be vice versa and it can only rely on user provide us this information. I'm not saying this is a common scenario, but without tracking input and output side-effects and their versions, I don't see it is possible to add any extra new dependencies that users do not specify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mingjerli The scenario you outlined above seems to be a case of circular dependency between two sessions, which we are not supporting (error message raised)?
3465154
to
368896e
Compare
@@ -42,6 +41,7 @@ def __init__( | |||
target_artifacts: List[LineaArtifactDef], | |||
input_parameters: List[str] = [], | |||
reuse_pre_computed_artifacts: List[LineaArtifactDef] = [], | |||
dependencies: TaskGraphEdge = {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migration of dependencies
from BasePipelineWriter
to ArtifactCollection
makes implementation neater but is conceptually a bit awkward. Perhaps we may need to rethink/redefine what ArtifactCollection
is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think conceptually we have a different idea on what ArtifactCollection
is. I've personally felt its our internal representation of Pipeline, so any pipeline attributes which shouldnt change should also be reflected in its corresponding ArtifactCollection.
If you feel strongly, lets discuss internally as a team.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial intention, if I remember correctly, was to use it as a general collection of artifacts that can mutate into different components (one of which is the pipeline). Given that it has not found use outside pipeline-related applications, I am good with this scoping as long as we are clear about it (e.g., we need to update docstring for ArtifactCollection
to clarify that it is an internal data structure to generate pipelines).
artifact_collection: ArtifactCollection, pipeline_name: str | ||
) -> Dict[str, TaskDefinition]: | ||
) -> Tuple[Dict[str, TaskDefinition], TaskGraph]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify this by returning "augmented" TaskGraph
where each task node has the corresponding TaskDefinition
"attached" as an attribute. Specifically, I am thinking we can do:
def get_artifact_task_definition_graph(
artifact_collection: ArtifactCollection, pipeline_name: str
) -> TaskGraph:
[...]
# "Attach" each task definition to the corresponding node in task graph
for taskname in task_graph.nodes:
task_graph.nodes[taskname]["task_definition"] = task_definitions[taskname]
return task_graph
And the same for get_session_task_definition_graph()
and get_allsessions_task_definition_graph()
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't actually do this, the nodes in TaskGraph need to be hashable and TaskDefinitions are not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that's too bad! Then, let's proceed with the current solution; we may find a better alternative later.
linear_first_for_artifact_linear_second_and_downstream >> linear_second | ||
|
||
linear_first_for_artifact_linear_second_and_downstream >> linear_third | ||
|
||
linear_second >> linear_third |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This over-specified dependency looks a bit awkward but should be fine in execution, right?
cc @mingjerli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine in execution.
The reason why this thing looks awkward is within a session, the algorithm gives us a minimal edge representation of the DAG but it is no longer the case when we handle user-specified inter-session dependencies.
The benefit of minimal edge representation is if you break any edge, the DAG must break into to two graphs; with any extra edges, it is no longer the case. In practice, if I want to manually break one pipeline into two pipelines, with minimal edge representation, I only need to find out the exact edge I want to remove; this is will make debugging dependencies related issues much easier than other representation. I also believe the minimal edge representation is unique, which is more elegant in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so this is another instance of correct but less optimal, which we decided to be okay as per @andycui97's discussion with DMX.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issues I raised have been addressed all. The rest looks good to me.
There is a merge conflict due to a recent docs update, but it should be pretty straightforward to resolve. Approving the PR assuming this resolution.
Description
get_task_definitions
toget_task_graph
, which also returns a TaskGraph object specifying dependencies needed between tasks.create_inter_artifact_taskgraph
,create_inter_session_taskgraph
to return TaskGraphs associated with their corresponding breakdowns of per artifact and per session.Fixes LIN-705
Type of change
How Has This Been Tested?
Added new diamond test which snapshotted should cover dependencies specified between artifacts in different sessions to point to the upstream root deps of the target artifact.
Specifically
a >> a >> linear_first_for_artifact_linear_second_and_downstream
even though the dependency is defined as{"linear_third": {"a"}}
.For more details refer to discussion in this thread.
Update snapshots and manually verify a few that they are correct for changes.