-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[auto-materialize][refactor] Remove bfs_filter_asset_partitions
from auto-materialize logic
#15498
Conversation
@@ -190,18 +190,6 @@ def with_auto_materialize_policy( | |||
PartitionKeyRange(start="2013-01-05-00:00", end="2013-01-05-03:00") | |||
) | |||
}, | |||
("daily", "2013-01-05"): { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously, the fact that we planned to materialize the hourly parent asset would cause us to examine the downstream daily partitioned asset. we'd then notice that the daily asset was missing, and see if it'd make sense to materialize it. we would then conclude that the hourly parent was outdated (as we haven't materialized it yet, we're just planning to), and we'd get a ParentOutdatedAutoMaterializeCondition, causing us to not materialize the asset. Then, once the hourly asset actually was materialized, we'd go through this process again, this time realizing that it was ok to materialize the downstream.
This is fairly confusing for the user (in my opinion) as the timing of us discovering that the daily asset is missing is basically random. Now, we'll only discover that the asset is missing once the hourly parent is actually materialized.
"asset2": {MissingAutoMaterializeCondition()}, | ||
"asset2": { | ||
MissingAutoMaterializeCondition(), | ||
ParentMaterializedAutoMaterializeCondition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we now properly report two conditions at once in these scenarios (both conditions are accurate). this applies to all updated tests in this file.
@@ -336,7 +338,22 @@ | |||
unevaluated_runs=[run([f"asset{i}" for i in range(1, 6)])], | |||
evaluation_delta=datetime.timedelta(minutes=35), | |||
# need to run assets 1, 2 and 3 as they're all part of the same non-subsettable multi asset | |||
expected_run_requests=[run_request(asset_keys=["asset1", "asset2", "asset3", "asset5"])], | |||
# need to run asset 4 as it eagerly updates after asset 1 | |||
expected_run_requests=[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change here is in how we handle non-subsettable multi-assets (we now materialize the eager asset that is downstream of asset 1 in the same tick).
@johannkm we briefly discussed this scenario, and I still feel like either behavior is fine -- this feels slightly more consistent (asset 4 would be executed after asset 1 was fully materialized anyway, so might as well do it in the same run)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is lovely
Summary & Motivation
This replaces the auto-materialize logic with a single toposorted loop through the graph. Each asset is processed one by a single call to
get_auto_materialize_conditions_for_asset
, where we evaluate each condition in order (first materialize conditions, then skip conditions, then discard conditions), building up a set of conditions that apply to each asset, as well as a set of asset partitions we'd like to materialize for each asset.One nice benefit here is that we don't end up needing to explicitly visit the children of assets that we plan to materialize (which can incur a fair amount of expense in the partition mapping layer). Instead, the children inspect each of their parents to see if it would even be possible to materialize at the same time as them, and if not, we completely ignore that asset.
Beyond that, this makes it significantly simpler to add new conditions, as there is a single place to place this information.
Finally, we (at long last!!!) accomplish the dream of unifying freshness-based logic and non-freshness based logic. The freshness calculation gets evaluated as a simple additional condition that happens in the same toposorted loop as everything else.
Right now, there's a fair bit of weirdness in the shape of the AutoMaterializeCondition <> AssetKeyPartitionKey mapping that we pass around. The shape we really want is AssetKey -> [AutoMaterializeCondition -> AssetKeyPartitionKey], as this is the end shape that we end up using inside the AutoMaterializeAssetEvaluation object, so that's what I have going on in the new functions. Realistically, get_auto_materialize_conditions_for_asset could also just return an AutoMaterializeAssetEvaluation directly. A future PR can trim out a bunch of code that requires us to convert to and from different formats, just wanted to minimize the surface area of this already-large PR.
Some tests were changed, I will leave comments on all such changes.
How I Tested These Changes