Skip to content

Commit

Permalink
implemented snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Gelders authored and Nicolas Gelders committed Oct 19, 2023
1 parent f0ffaef commit 4edcff5
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 3 deletions.
15 changes: 15 additions & 0 deletions examples/jaffle_shop_duckdb/snapshots/orders_snapshot.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% snapshot orders_snapshot %}

{{
config(
target_schema='snapshots',
unique_key='order_id',

strategy='timestamp',
updated_at='order_date',
)
}}

select * from {{ ref('orders') }}

{% endsnapshot %}
23 changes: 23 additions & 0 deletions prefect_dbt_flow/dbt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def dbt_seed(project: DbtProject, profile: DbtProfile, seed: str) -> str:
Function that executes `dbt seed` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
seed: Name of the seed to run.
Returns:
Expand All @@ -97,3 +99,24 @@ def dbt_seed(project: DbtProject, profile: DbtProfile, seed: str) -> str:
dbt_seed_cmd.extend(["--select", seed])

return cmd.run(" ".join(dbt_seed_cmd))


def dbt_snapshot(project: DbtProject, profile: DbtProfile, snapshot: str) -> str:
"""
Function that executes `dbt snapshot` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
snapshot: Name of the snapshot to run.
Returns:
A string representing the output of the `dbt snapshot` command
"""
dbt_seed_cmd = [DBT_EXE, "snapshot"]
dbt_seed_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_seed_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_seed_cmd.extend(["-t", profile.target])
dbt_seed_cmd.extend(["--select", snapshot])

return cmd.run(" ".join(dbt_seed_cmd))
11 changes: 11 additions & 0 deletions prefect_dbt_flow/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def parse_dbt_project(
depends_on=node_dict["depends_on"].get("nodes", []),
)
)

if node_dict["resource_type"] == "test":
models_with_tests.extend(node_dict["depends_on"]["nodes"])

Expand All @@ -55,6 +56,16 @@ def parse_dbt_project(
)
)

if node_dict["resource_type"] == "snapshot":
dbt_graph.append(
DbtNode(
name=node_dict["name"],
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType.SNAPSHOT,
depends_on=node_dict["depends_on"].get("nodes", []),
)
)

except json.decoder.JSONDecodeError:
pass

Expand Down
42 changes: 40 additions & 2 deletions prefect_dbt_flow/dbt/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,47 @@

from prefect_dbt_flow.dbt import DbtNode, DbtProfile, DbtProject, DbtResourceType, cli

DBT_SEED_EMOJI = "🌱"
DBT_RUN_EMOJI = "🏃"
DBT_TEST_EMOJI = "🧪"
DBT_SEED_EMOJI = "🌱"
DBT_SNAPSHOT_EMOJI = "📸"


def _task_dbt_snapshot(
project: DbtProject,
profile: DbtProfile,
dbt_node: DbtNode,
task_kwargs: Optional[Dict] = None,
) -> Task:
"""
Create a Prefect task for running a dbt snapshot. Uses dbt_snapshot from cli module
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
dbt_node: A class that represents the dbt node (model) to run.
task_kwargs: Additional task configuration.
Returns:
dbt_snapshot: Prefect task.
"""
all_task_kwargs = {
**(task_kwargs or {}),
"name": f"{DBT_SNAPSHOT_EMOJI} snapshot_{dbt_node.name}",
}

@task(**all_task_kwargs)
def dbt_snapshot():
"""
Snapshots a dbt snapshot
Returns:
None
"""
dbt_snapshot_output = cli.dbt_snapshot(project, profile, dbt_node.name)
get_run_logger().info(dbt_snapshot_output)

return dbt_snapshot


def _task_dbt_seed(
Expand Down Expand Up @@ -125,7 +163,7 @@ def dbt_test():
RESOURCE_TYPE_TO_TASK = {
DbtResourceType.SEED: _task_dbt_seed,
DbtResourceType.MODEL: _task_dbt_run,
DbtResourceType.SNAPSHOT: _task_dbt_run,
DbtResourceType.SNAPSHOT: _task_dbt_snapshot,
}


Expand Down
2 changes: 1 addition & 1 deletion tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,4 @@ def test_flow_jaffle_shop(duckdb_db_file: Path):
my_dbt_flow()

with duckdb.connect(str(duckdb_db_file)) as ddb:
assert len(ddb.sql("SHOW ALL TABLES").fetchall()) == 8
assert len(ddb.sql("SHOW ALL TABLES").fetchall()) == 9

0 comments on commit 4edcff5

Please sign in to comment.