Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-materialize][refactor] Remove bfs_filter_asset_partitions from auto-materialize logic #15498

Merged
merged 12 commits into from
Jul 27, 2023

Conversation

OwenKephart
Copy link
Contributor

@OwenKephart OwenKephart commented Jul 24, 2023

Summary & Motivation

This replaces the auto-materialize logic with a single toposorted loop through the graph. Each asset is processed one by a single call to get_auto_materialize_conditions_for_asset, where we evaluate each condition in order (first materialize conditions, then skip conditions, then discard conditions), building up a set of conditions that apply to each asset, as well as a set of asset partitions we'd like to materialize for each asset.

One nice benefit here is that we don't end up needing to explicitly visit the children of assets that we plan to materialize (which can incur a fair amount of expense in the partition mapping layer). Instead, the children inspect each of their parents to see if it would even be possible to materialize at the same time as them, and if not, we completely ignore that asset.

Beyond that, this makes it significantly simpler to add new conditions, as there is a single place to place this information.

Finally, we (at long last!!!) accomplish the dream of unifying freshness-based logic and non-freshness based logic. The freshness calculation gets evaluated as a simple additional condition that happens in the same toposorted loop as everything else.

Right now, there's a fair bit of weirdness in the shape of the AutoMaterializeCondition <> AssetKeyPartitionKey mapping that we pass around. The shape we really want is AssetKey -> [AutoMaterializeCondition -> AssetKeyPartitionKey], as this is the end shape that we end up using inside the AutoMaterializeAssetEvaluation object, so that's what I have going on in the new functions. Realistically, get_auto_materialize_conditions_for_asset could also just return an AutoMaterializeAssetEvaluation directly. A future PR can trim out a bunch of code that requires us to convert to and from different formats, just wanted to minimize the surface area of this already-large PR.

Some tests were changed, I will leave comments on all such changes.

How I Tested These Changes

@@ -190,18 +190,6 @@ def with_auto_materialize_policy(
PartitionKeyRange(start="2013-01-05-00:00", end="2013-01-05-03:00")
)
},
("daily", "2013-01-05"): {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, the fact that we planned to materialize the hourly parent asset would cause us to examine the downstream daily partitioned asset. we'd then notice that the daily asset was missing, and see if it'd make sense to materialize it. we would then conclude that the hourly parent was outdated (as we haven't materialized it yet, we're just planning to), and we'd get a ParentOutdatedAutoMaterializeCondition, causing us to not materialize the asset. Then, once the hourly asset actually was materialized, we'd go through this process again, this time realizing that it was ok to materialize the downstream.

This is fairly confusing for the user (in my opinion) as the timing of us discovering that the daily asset is missing is basically random. Now, we'll only discover that the asset is missing once the hourly parent is actually materialized.

"asset2": {MissingAutoMaterializeCondition()},
"asset2": {
MissingAutoMaterializeCondition(),
ParentMaterializedAutoMaterializeCondition(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now properly report two conditions at once in these scenarios (both conditions are accurate). this applies to all updated tests in this file.

@@ -336,7 +338,22 @@
unevaluated_runs=[run([f"asset{i}" for i in range(1, 6)])],
evaluation_delta=datetime.timedelta(minutes=35),
# need to run assets 1, 2 and 3 as they're all part of the same non-subsettable multi asset
expected_run_requests=[run_request(asset_keys=["asset1", "asset2", "asset3", "asset5"])],
# need to run asset 4 as it eagerly updates after asset 1
expected_run_requests=[
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change here is in how we handle non-subsettable multi-assets (we now materialize the eager asset that is downstream of asset 1 in the same tick).

@johannkm we briefly discussed this scenario, and I still feel like either behavior is fine -- this feels slightly more consistent (asset 4 would be executed after asset 1 was fully materialized anyway, so might as well do it in the same run)

Base automatically changed from owen/dsc1.4 to master July 26, 2023 22:30
Copy link
Contributor

@johannkm johannkm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is lovely

@OwenKephart OwenKephart merged commit 36df542 into master Jul 27, 2023
@OwenKephart OwenKephart deleted the owen/dsc1.5 branch July 27, 2023 23:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants