Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIN-705: Return taskgraph for pipeline integrations to have correct data dependencies #854

Merged
merged 9 commits into from
Dec 16, 2022

Conversation

andycui97
Copy link
Contributor

@andycui97 andycui97 commented Dec 7, 2022

Description

  • Change from get_task_definitions to get_task_graph, which also returns a TaskGraph object specifying dependencies needed between tasks.
  • Added create_inter_artifact_taskgraph, create_inter_session_taskgraph to return TaskGraphs associated with their corresponding breakdowns of per artifact and per session.
  • Moved dependencies to be attribute of ArtifactCollection instead of being passed in through writer.
    • This better matches with dependencies being an attribute of Pipeline and PipelineORM saving dependencies. These are clearly not mean to be write time (called during "export") type attributes and should be in Artifact Collection.
  • Add helper functions to TaskGraph as it is being used not just internally represent dependencies for NodeCollection but also by integration contributors now.
  • Added and updated tests
  • Update docs

Fixes LIN-705

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • This change requires a documentation update

How Has This Been Tested?

Added new diamond test which snapshotted should cover dependencies specified between artifacts in different sessions to point to the upstream root deps of the target artifact.
Specifically a >> a >> linear_first_for_artifact_linear_second_and_downstream even though the dependency is defined as {"linear_third": {"a"}}.
For more details refer to discussion in this thread.

Update snapshots and manually verify a few that they are correct for changes.

Comment on lines 221 to 247
# add edges between each pair of sink/sources

inter_artifact_taskgraph.graph.add_edges_from(
itertools.product(
from_session_sink_nodes, to_session_source_nodes
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "stitching" of artifacts based on session dependency may interpolate more dependency information than what the user intended/provided. For instance, if a session results in two sink nodes of A and B and the user provides dependency info from A to C in a different session, then this current logic will interpolate dependency relation between B and C as well. Is this something we desire?

In contrast, existing implementation of combined_taskgraph directly uses user-specified dependency information to stitch artifacts between sessions. Hence, for the example above, had the user specified dependency between A and C only, they will not get the interpolated dependency between B and C.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a related note, is the plan to replace combined_taskgraph with inter_artifact_taskgraph?

Copy link
Contributor Author

@andycui97 andycui97 Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point which is why I wanted a review of this first.

I wanted to do this stitching because of the following case:

Session 1 will run Task A and Task B which creates Artifact B.
Session 2 will run Task C and Task D which creates Artifact D.

In the case that Task C reads from the output of Task B the users cannot specify this dependency easily.

User can only specify dependencies based on Artifacts, say they can only say to run B >> D (B before D).

If we don't enforce dependencies on a session level, Task C may run before Task B.

If you do, you get the problems described above which is osme of the dependencies will be interpolated. C may not depend on B if a user specifies B >> D.

I personally lean towards over specifying dependencies (bad for performance since less parallelization and bad for readability) instead of potentially having the case above (user literally cannot specify the dependency they want unless they hack our graph)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second point is good, yeah I think combined_taskgraph is only used for validating dependencies, which should be done on the inter_artifact_taskgraph or inter_session_taskgraph. combined_taskgraph may not be needed,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interpreting

  • Task C as common code factored out by LineaPy (e.g., task_common_var_for_artifact_D_and_downstream)
  • Task D as a task responsible for generating Artifact D (e.g., task_D)

I think the point you made above is valid. That is, Task B should be set as a dependency for Task C, but relying solely on the user-provided information will miss this, which may result in erroneous parallelization. In fact, the way existing combined_taskgraph constructs inter-session dependency seems subject to this error.

As we discussed on a call, the challenge would lie in whether/how we can identify source node(s) related to the target artifact only. That is, had Task C above been NOT related to generation of Artifact D, then we would ideally NOT create dependency between Task B and Task C even if Task C is a source node of Session 2.

Copy link
Contributor

@mingjerli mingjerli Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give a counter example of why we should not create dependency between Task B and Task C. Let's say someone is working on the two notebooks(the first notebook has Task A and Task B and the second notebook has Task C and Task D like the same setup in this thread) at the same time(maybe she/he likes two screens/computers).

  • Task A writes to a side-effect (let's say a.csv) and save artifact a.
  • Task C reads from the side-effect created by Task A; i.e., read from a.csv, and write to another side-effect (let's say c.csv) save artifact c.
  • Task B reads from the side-effect created by Task C; i.e., read from c.csv, and save artifact b.

In this case, we should not create dependency between Task B and Task C; it should be vice versa and it can only rely on user provide us this information. I'm not saying this is a common scenario, but without tracking input and output side-effects and their versions, I don't see it is possible to add any extra new dependencies that users do not specify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingjerli The scenario you outlined above seems to be a case of circular dependency between two sessions, which we are not supporting (error message raised)?

@andycui97 andycui97 changed the title Taskgraph Return taskgraph for pipeline integrations to have correct data dependencies Dec 14, 2022
@andycui97 andycui97 changed the title Return taskgraph for pipeline integrations to have correct data dependencies LIN-705: Return taskgraph for pipeline integrations to have correct data dependencies Dec 14, 2022
@andycui97 andycui97 marked this pull request as ready for review December 14, 2022 22:17
@andycui97 andycui97 force-pushed the taskgraph branch 2 times, most recently from 3465154 to 368896e Compare December 15, 2022 16:58
@@ -42,6 +41,7 @@ def __init__(
target_artifacts: List[LineaArtifactDef],
input_parameters: List[str] = [],
reuse_pre_computed_artifacts: List[LineaArtifactDef] = [],
dependencies: TaskGraphEdge = {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration of dependencies from BasePipelineWriter to ArtifactCollection makes implementation neater but is conceptually a bit awkward. Perhaps we may need to rethink/redefine what ArtifactCollection is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think conceptually we have a different idea on what ArtifactCollection is. I've personally felt its our internal representation of Pipeline, so any pipeline attributes which shouldnt change should also be reflected in its corresponding ArtifactCollection.

If you feel strongly, lets discuss internally as a team.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial intention, if I remember correctly, was to use it as a general collection of artifacts that can mutate into different components (one of which is the pipeline). Given that it has not found use outside pipeline-related applications, I am good with this scoping as long as we are clear about it (e.g., we need to update docstring for ArtifactCollection to clarify that it is an internal data structure to generate pipelines).

artifact_collection: ArtifactCollection, pipeline_name: str
) -> Dict[str, TaskDefinition]:
) -> Tuple[Dict[str, TaskDefinition], TaskGraph]:
Copy link
Contributor

@yoonspark yoonspark Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify this by returning "augmented" TaskGraph where each task node has the corresponding TaskDefinition "attached" as an attribute. Specifically, I am thinking we can do:

def get_artifact_task_definition_graph(
    artifact_collection: ArtifactCollection, pipeline_name: str
) -> TaskGraph:

    [...]

    # "Attach" each task definition to the corresponding node in task graph
    for taskname in task_graph.nodes:
        task_graph.nodes[taskname]["task_definition"] = task_definitions[taskname]

    return task_graph

And the same for get_session_task_definition_graph() and get_allsessions_task_definition_graph() as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't actually do this, the nodes in TaskGraph need to be hashable and TaskDefinitions are not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's too bad! Then, let's proceed with the current solution; we may find a better alternative later.

Comment on lines +383 to +387
linear_first_for_artifact_linear_second_and_downstream >> linear_second

linear_first_for_artifact_linear_second_and_downstream >> linear_third

linear_second >> linear_third
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This over-specified dependency looks a bit awkward but should be fine in execution, right?

cc @mingjerli

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine in execution.

The reason why this thing looks awkward is within a session, the algorithm gives us a minimal edge representation of the DAG but it is no longer the case when we handle user-specified inter-session dependencies.

The benefit of minimal edge representation is if you break any edge, the DAG must break into to two graphs; with any extra edges, it is no longer the case. In practice, if I want to manually break one pipeline into two pipelines, with minimal edge representation, I only need to find out the exact edge I want to remove; this is will make debugging dependencies related issues much easier than other representation. I also believe the minimal edge representation is unique, which is more elegant in my opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so this is another instance of correct but less optimal, which we decided to be okay as per @andycui97's discussion with DMX.

Copy link
Contributor

@yoonspark yoonspark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issues I raised have been addressed all. The rest looks good to me.

There is a merge conflict due to a recent docs update, but it should be pretty straightforward to resolve. Approving the PR assuming this resolution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants