diff --git a/examples/jaffle_shop_duckdb/snapshots/orders_snapshot.sql b/examples/jaffle_shop_duckdb/snapshots/orders_snapshot.sql new file mode 100644 index 0000000..a494feb --- /dev/null +++ b/examples/jaffle_shop_duckdb/snapshots/orders_snapshot.sql @@ -0,0 +1,15 @@ +{% snapshot orders_snapshot %} + +{{ + config( + target_schema='snapshots', + unique_key='order_id', + + strategy='timestamp', + updated_at='order_date', + ) +}} + +select * from {{ ref('orders') }} + +{% endsnapshot %} \ No newline at end of file diff --git a/prefect_dbt_flow/dbt/cli.py b/prefect_dbt_flow/dbt/cli.py index d7409f7..fa6cb73 100644 --- a/prefect_dbt_flow/dbt/cli.py +++ b/prefect_dbt_flow/dbt/cli.py @@ -85,6 +85,8 @@ def dbt_seed(project: DbtProject, profile: DbtProfile, seed: str) -> str: Function that executes `dbt seed` command Args: + project: A class that represents a dbt project configuration. + profile: A class that represents a dbt profile configuration. seed: Name of the seed to run. Returns: @@ -97,3 +99,24 @@ def dbt_seed(project: DbtProject, profile: DbtProfile, seed: str) -> str: dbt_seed_cmd.extend(["--select", seed]) return cmd.run(" ".join(dbt_seed_cmd)) + + +def dbt_snapshot(project: DbtProject, profile: DbtProfile, snapshot: str) -> str: + """ + Function that executes `dbt snapshot` command + + Args: + project: A class that represents a dbt project configuration. + profile: A class that represents a dbt profile configuration. + snapshot: Name of the snapshot to run. + + Returns: + A string representing the output of the `dbt snapshot` command + """ + dbt_seed_cmd = [DBT_EXE, "snapshot"] + dbt_seed_cmd.extend(["--project-dir", str(project.project_dir)]) + dbt_seed_cmd.extend(["--profiles-dir", str(project.profiles_dir)]) + dbt_seed_cmd.extend(["-t", profile.target]) + dbt_seed_cmd.extend(["--select", snapshot]) + + return cmd.run(" ".join(dbt_seed_cmd)) diff --git a/prefect_dbt_flow/dbt/graph.py b/prefect_dbt_flow/dbt/graph.py index 9773e58..053f5fc 100644 --- a/prefect_dbt_flow/dbt/graph.py +++ b/prefect_dbt_flow/dbt/graph.py @@ -42,6 +42,7 @@ def parse_dbt_project( depends_on=node_dict["depends_on"].get("nodes", []), ) ) + if node_dict["resource_type"] == "test": models_with_tests.extend(node_dict["depends_on"]["nodes"]) @@ -55,6 +56,16 @@ def parse_dbt_project( ) ) + if node_dict["resource_type"] == "snapshot": + dbt_graph.append( + DbtNode( + name=node_dict["name"], + unique_id=node_dict["unique_id"], + resource_type=DbtResourceType.SNAPSHOT, + depends_on=node_dict["depends_on"].get("nodes", []), + ) + ) + except json.decoder.JSONDecodeError: pass diff --git a/prefect_dbt_flow/dbt/tasks.py b/prefect_dbt_flow/dbt/tasks.py index fdf5839..666d0bb 100644 --- a/prefect_dbt_flow/dbt/tasks.py +++ b/prefect_dbt_flow/dbt/tasks.py @@ -6,9 +6,47 @@ from prefect_dbt_flow.dbt import DbtNode, DbtProfile, DbtProject, DbtResourceType, cli -DBT_SEED_EMOJI = "๐ŸŒฑ" DBT_RUN_EMOJI = "๐Ÿƒ" DBT_TEST_EMOJI = "๐Ÿงช" +DBT_SEED_EMOJI = "๐ŸŒฑ" +DBT_SNAPSHOT_EMOJI = "๐Ÿ“ธ" + + +def _task_dbt_snapshot( + project: DbtProject, + profile: DbtProfile, + dbt_node: DbtNode, + task_kwargs: Optional[Dict] = None, +) -> Task: + """ + Create a Prefect task for running a dbt snapshot. Uses dbt_snapshot from cli module + + Args: + project: A class that represents a dbt project configuration. + profile: A class that represents a dbt profile configuration. + dbt_node: A class that represents the dbt node (model) to run. + task_kwargs: Additional task configuration. + + Returns: + dbt_snapshot: Prefect task. + """ + all_task_kwargs = { + **(task_kwargs or {}), + "name": f"{DBT_SNAPSHOT_EMOJI} snapshot_{dbt_node.name}", + } + + @task(**all_task_kwargs) + def dbt_snapshot(): + """ + Snapshots a dbt snapshot + + Returns: + None + """ + dbt_snapshot_output = cli.dbt_snapshot(project, profile, dbt_node.name) + get_run_logger().info(dbt_snapshot_output) + + return dbt_snapshot def _task_dbt_seed( @@ -125,7 +163,7 @@ def dbt_test(): RESOURCE_TYPE_TO_TASK = { DbtResourceType.SEED: _task_dbt_seed, DbtResourceType.MODEL: _task_dbt_run, - DbtResourceType.SNAPSHOT: _task_dbt_run, + DbtResourceType.SNAPSHOT: _task_dbt_snapshot, } diff --git a/tests/test_flow.py b/tests/test_flow.py index 8b60093..53c8360 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -162,4 +162,4 @@ def test_flow_jaffle_shop(duckdb_db_file: Path): my_dbt_flow() with duckdb.connect(str(duckdb_db_file)) as ddb: - assert len(ddb.sql("SHOW ALL TABLES").fetchall()) == 8 + assert len(ddb.sql("SHOW ALL TABLES").fetchall()) == 9