Skip to content

Commit

Permalink
[docs][dagster-dbt] Update schedule examples (dagster-io#15496)
Browse files Browse the repository at this point in the history
## Summary & Motivation

We've updated these APIs, let's update the docs. Decided to be explicit
here about how you would deal with the simple (dbt-only) case, as well
as the case where you want to combine dbt and non-dbt assets, as I think
both are valuable.

## How I Tested These Changes
  • Loading branch information
OwenKephart committed Jul 27, 2023
1 parent 0c8ac7b commit 8d20865
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 59 deletions.
55 changes: 37 additions & 18 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -104,28 +104,47 @@ defs = Definitions(

## Scheduling dbt jobs

Once you have your dbt assets, you can define a job that runs some or all of these assets on a schedule:
Once you have your dbt assets, you can define a job to materialize a selection of these assets on a schedule.

```python startafter=start_schedule_assets endbefore=end_schedule_assets file=/integrations/dbt/dbt.py dedent=4
from dagster import ScheduleDefinition, define_asset_job, Definitions
### Scheduling jobs that contain only dbt assets

run_everything_job = define_asset_job("run_everything", selection="*")
In this example, we use the <PyObject module="dagster_dbt" object="build_schedule_from_dbt_selection" /> function to create a job, `daily_dbt_models`, as well as a schedule which will execute this job once a day. We define the set of models we'd like to execute using [dbt's selection syntax](https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work), in this case selecting only the models with the tag `daily`.

# only `order_stats` and its children
run_something_job = define_asset_job("run_something", selection="order_stats*")
```python startafter=start_schedule_assets_dbt_only endbefore=end_schedule_assets_dbt_only file=/integrations/dbt/dbt.py dedent=4
from dagster_dbt import build_schedule_from_dbt_selection, dbt_assets

defs = Definitions(
assets=dbt_assets,
schedules=[
ScheduleDefinition(
job=run_something_job,
cron_schedule="@daily",
),
ScheduleDefinition(
job=run_everything_job,
cron_schedule="@weekly",
),
],
@dbt_assets(manifest=manifest)
def my_dbt_assets():
...

daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
[my_dbt_assets],
job_name="daily_dbt_models",
cron_schedule="@daily",
dbt_select="tag:daily",
)
```

### Scheduling jobs that contain dbt assets and non-dbt assets

In many cases, it's useful to be able to schedule dbt assets alongside non-dbt assets. In this example, we build an <PyObject module="dagster" object="AssetSelection" /> of dbt assets using <PyObject module="dagster_dbt" object="build_dbt_asset_selection" />, then select all assets (dbt-related or not) which are downstream of these dbt models. From there, we create a job that targets that selection of assets and schedule it to run daily.

```python startafter=start_schedule_assets_dbt_downstream endbefore=end_schedule_assets_dbt_downstream file=/integrations/dbt/dbt.py dedent=4
from dagster import define_asset_job, ScheduleDefinition
from dagster_dbt import build_dbt_asset_selection, dbt_assets

@dbt_assets(manifest=manifest)
def my_dbt_assets():
...

# selects all models tagged with "daily", and all their downstream asset dependencies
daily_selection = build_dbt_asset_selection(
[my_dbt_assets], dbt_select="tag:daily"
).downstream()

daily_dbt_assets_and_downstream_schedule = ScheduleDefinition(
job=define_asset_job("daily_assets", selection=daily_selection),
cron_schedule="@daily",
)
```

Expand Down
51 changes: 32 additions & 19 deletions examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,43 @@ def scope_dbt_cli_resource_config():
# end_dbt_cli_resource


def scope_schedule_assets(dbt_assets):
# start_schedule_assets
from dagster import ScheduleDefinition, define_asset_job, Definitions
def scope_schedule_assets_dbt_only(manifest):
# start_schedule_assets_dbt_only
from dagster_dbt import build_schedule_from_dbt_selection, dbt_assets

run_everything_job = define_asset_job("run_everything", selection="*")
@dbt_assets(manifest=manifest)
def my_dbt_assets():
...

daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
[my_dbt_assets],
job_name="daily_dbt_models",
cron_schedule="@daily",
dbt_select="tag:daily",
)
# end_schedule_assets_dbt_only

# only `order_stats` and its children
run_something_job = define_asset_job("run_something", selection="order_stats*")

defs = Definitions(
assets=dbt_assets,
schedules=[
ScheduleDefinition(
job=run_something_job,
cron_schedule="@daily",
),
ScheduleDefinition(
job=run_everything_job,
cron_schedule="@weekly",
),
],
def scope_schedule_assets_dbt_and_downstream(manifest):
# start_schedule_assets_dbt_downstream
from dagster import define_asset_job, ScheduleDefinition
from dagster_dbt import build_dbt_asset_selection, dbt_assets

@dbt_assets(manifest=manifest)
def my_dbt_assets():
...

# selects all models tagged with "daily", and all their downstream asset dependencies
daily_selection = build_dbt_asset_selection(
[my_dbt_assets], dbt_select="tag:daily"
).downstream()

daily_dbt_assets_and_downstream_schedule = ScheduleDefinition(
job=define_asset_job("daily_assets", selection=daily_selection),
cron_schedule="@daily",
)

# end_schedule_assets
# end_schedule_assets_dbt_downstream


def scope_downstream_asset():
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,39 +1,31 @@
import json
from pathlib import Path

from dagster_dbt import (
DbtCliClientResource,
DbtCloudClientResource,
dbt_assets,
load_assets_from_dbt_cloud_job,
load_assets_from_dbt_project,
)

from dagster import Definitions, asset, file_relative_path, with_resources
from dagster import asset
from dagster._core.definitions import materialize
from dagster._core.instance_for_test import environ
from docs_snippets.integrations.dbt.dbt import scope_schedule_assets
from docs_snippets.integrations.dbt.dbt import (
scope_schedule_assets_dbt_and_downstream,
scope_schedule_assets_dbt_only,
)
from docs_snippets.integrations.dbt.dbt_cloud import (
scope_define_instance,
scope_schedule_dbt_cloud_assets,
)


def test_scope_schedule_assets_can_load():
DBT_PROJECT_PATH = file_relative_path(
__file__, "../../../assets_dbt_python/dbt_project"
)
DBT_PROFILES_DIR = file_relative_path(
__file__,
"../../../assets_dbt_python/dbt_project/config",
)
dbt_assets = with_resources(
load_assets_from_dbt_project(DBT_PROJECT_PATH),
{
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_PATH,
profiles_dir=DBT_PROFILES_DIR,
)
},
)
MANIFEST_PATH = Path(__file__).parent / "manifest.json"
manifest = json.loads(MANIFEST_PATH.read_bytes())

scope_schedule_assets(dbt_assets)
scope_schedule_assets_dbt_only(manifest)
scope_schedule_assets_dbt_and_downstream(manifest)


def test_scope_schedule_dbt_cloud_assets_can_load():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ def all_dbt_assets():
...
# Select the dbt assets that have the tag "foo".
my_selection = build_dbt_asset_selection([dbt_assets], dbt_select="tag:foo")
foo_selection = build_dbt_asset_selection([dbt_assets], dbt_select="tag:foo")
# Select the dbt assets that have the tag "foo" and all Dagster assets downstream
# of them (dbt-related or otherwise)
foo_and_downstream_selection = foo_selection.downstream()
"""
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets)
from .dbt_manifest_asset_selection import DbtManifestAssetSelection
Expand Down

0 comments on commit 8d20865

Please sign in to comment.