diff --git a/.dev/DEV.md b/.dev/DEV.md index d4d53a2..0e1c884 100644 --- a/.dev/DEV.md +++ b/.dev/DEV.md @@ -82,4 +82,4 @@ Settings "client_kwargs": { "endpoint_url": "http://minio:9000" } -} \ No newline at end of file +} diff --git a/.dev/requirements.txt b/.dev/requirements.txt index fa79a4c..a0d2db5 100644 --- a/.dev/requirements.txt +++ b/.dev/requirements.txt @@ -3,4 +3,4 @@ dbt-duckdb==1.5.2 prefect prefect-aws s3fs -poetry \ No newline at end of file +poetry diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 698cbaa..f393423 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,4 +1,4 @@ -name: 'publish' +name: "publish" on: push: branches: @@ -13,8 +13,8 @@ jobs: - name: Checkout Code uses: actions/checkout@v2 with: - ref: ${{ github.head_ref }} # checkout the correct branch name - fetch-depth: 0 # fetch the whole repo history + ref: ${{ github.head_ref }} # checkout the correct branch name + fetch-depth: 0 # fetch the whole repo history - name: Git Version uses: codacy/git-version@2.2.0 id: git-version diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 830efc5..16e4f6f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -1,5 +1,5 @@ -name: 'tests' -on: [push, pull_request] +name: "tests" +on: [push] jobs: linting: @@ -17,7 +17,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: setup poetry run: | - pip install poetry==${{ env.POETRY_VERSION }} + pip install poetry==${{ env.POETRY_VERSION }} poetry config virtualenvs.create false poetry install --no-interaction --no-ansi - name: run precommit hooks @@ -37,8 +37,8 @@ jobs: python-version: ${{ matrix.python-version }} - name: setup poetry run: | - pip install poetry==${{ env.POETRY_VERSION }} + pip install poetry==${{ env.POETRY_VERSION }} poetry config virtualenvs.create false poetry install --no-interaction --no-ansi - name: run pytest - run: poetry run pytest \ No newline at end of file + run: poetry run pytest diff --git a/.gitignore b/.gitignore index 112c446..77b5929 100644 --- a/.gitignore +++ b/.gitignore @@ -170,4 +170,4 @@ logs/ venv/ env/ test.env -__pycache__ \ No newline at end of file +__pycache__ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9ac0502..2eb13df 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,28 +1,26 @@ repos: - - repo: local + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.0.292 hooks: - - id: isort - name: isort - stages: [commit] - types: [python] - entry: poetry run isort . - language: system - pass_filenames: false - always_run: true - - id: black - name: black - stages: [commit] - types: [python] - entry: poetry run black . - language: system - pass_filenames: false - always_run: true - id: ruff - name: ruff - stages: [commit] - types: [python] - entry: poetry run ruff . - language: system - pass_filenames: false - always_run: true - fail_fast: true \ No newline at end of file + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.2.0 + hooks: + - id: end-of-file-fixer + - id: trailing-whitespace + - id: check-yaml + - repo: https://github.com/psf/black + rev: 22.3.0 + hooks: + - id: black + files: (prefect_dbt_flow/|tests/) + - repo: https://github.com/pycqa/isort + rev: 5.11.5 + hooks: + - id: isort + files: (prefect_dbt_flow/|tests/) + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.942 + hooks: + - id: mypy + files: prefect_dbt_flow/ diff --git a/README.md b/README.md index b134d11..32a52ba 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,8 @@

# prefect-dbt-flow -prefect-dbt-flow is a Python library that enables Prefect to convert dbt workflows into independent tasks within a Prefect flow. This integration simplifies the orchestration and execution of dbt models and tests using Prefect, allowing you to build robust data pipelines and monitor your dbt projects efficiently. - +prefect-dbt-flow is a Python library that enables Prefect to convert dbt workflows into independent tasks within a Prefect flow. This integration simplifies the orchestration and execution of dbt models and tests using Prefect, allowing you to build robust data pipelines and monitor your dbt projects efficiently. + dbt is an immensely popular tool for building and testing data transformation models, and Prefect is a versatile workflow management system. This integration brings together the best of both worlds, empowering data engineers and analysts to create robust data pipelines. The key features include: @@ -55,12 +55,12 @@ my_flow = dbt_flow( if __name__ == "__main__": my_flow() ``` -![jaffle_shop_dag](./docs/images/jaffle_shop_dag.png) +![jaffle_shop_dag](./docs/images/jaffle_shop_dag.png) For more information consult the [docs](https://datarootsio.github.io/prefect-dbt-flow/) ## Inspiration -prefect-dbt-flow draws inspiration from various projects in the data engineering and workflow orchestration space, including: +prefect-dbt-flow draws inspiration from various projects in the data engineering and workflow orchestration space, including: - [astronomer-cosmos](https://github.com/astronomer/astronomer-cosmos) - [dbt + Dagster](https://docs.dagster.io/integrations/dbt) - [Anna Geller](https://github.com/anna-geller/prefect-dataplatform) diff --git a/docs/api/cli.md b/docs/api/cli.md index 2fbf1b7..887ee24 100644 --- a/docs/api/cli.md +++ b/docs/api/cli.md @@ -1 +1 @@ -::: prefect_dbt_flow.dbt.cli \ No newline at end of file +::: prefect_dbt_flow.dbt.cli diff --git a/docs/api/flow.md b/docs/api/flow.md index 0e9dfaf..870a66c 100644 --- a/docs/api/flow.md +++ b/docs/api/flow.md @@ -1 +1 @@ -::: prefect_dbt_flow.flow \ No newline at end of file +::: prefect_dbt_flow.flow diff --git a/docs/api/graph.md b/docs/api/graph.md index a4f223d..0ddb3ea 100644 --- a/docs/api/graph.md +++ b/docs/api/graph.md @@ -1 +1 @@ -::: prefect_dbt_flow.dbt.graph \ No newline at end of file +::: prefect_dbt_flow.dbt.graph diff --git a/docs/api/models.md b/docs/api/models.md index 8211e80..f671b6e 100644 --- a/docs/api/models.md +++ b/docs/api/models.md @@ -1 +1 @@ -::: prefect_dbt_flow.dbt \ No newline at end of file +::: prefect_dbt_flow.dbt diff --git a/docs/api/tasks.md b/docs/api/tasks.md index 25c946d..36a8b7f 100644 --- a/docs/api/tasks.md +++ b/docs/api/tasks.md @@ -1 +1 @@ -::: prefect_dbt_flow.dbt.tasks \ No newline at end of file +::: prefect_dbt_flow.dbt.tasks diff --git a/docs/getting_started.md b/docs/getting_started.md index 1bad23a..315518a 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -20,7 +20,7 @@ cd prefect-dbt-flow/example/jaffle_shop Ensure that you have Docker Compose installed on your system. If you haven't already installed it, refer to the [Docker Compose Installation Guide](https://docs.docker.com/compose/install/) for instructions. ### 3. Start the Docker Containers -The following command will launch three services defined in the docker-compose file: +The following command will launch three services defined in the docker-compose file: - A PostgreSQL database, - A Prefect server accessible at: `http://0.0.0.0:4200/` or `http://localhost:4200/` @@ -87,4 +87,4 @@ prefect-dbt-flow integrates with Prefect's monitoring and error handling capabil For more information on these features, consult the [Prefect documentation.](https://docs.prefect.io/2.10.12/api-ref/prefect/flows/#prefect.flows.flow) ## Conclusion -prefect-dbt-flow simplifies the orchestration and management of dbt workflows within a Prefect flow. By following the steps in this guide, you can easily create and execute data pipelines that incorporate dbt projects. Be aware of breaking changes as this library is actively developed, and consult the changelog for updates. Happy data engineering! :rocket: \ No newline at end of file +prefect-dbt-flow simplifies the orchestration and management of dbt workflows within a Prefect flow. By following the steps in this guide, you can easily create and execute data pipelines that incorporate dbt projects. Be aware of breaking changes as this library is actively developed, and consult the changelog for updates. Happy data engineering! :rocket: diff --git a/docs/how_it_works.md b/docs/how_it_works.md index 9d2a19c..2c12c5d 100644 --- a/docs/how_it_works.md +++ b/docs/how_it_works.md @@ -6,4 +6,4 @@ Here, we briefly explain how prefect-dbt-flow works under the hood. 3. Based on the dbt node dependencies we set the dependencies between the Prefect tasks. 4. Lastly we wrap all the Prefect tasks in a Prefect flow and return it to the user. -If you want a more detailed explanation we would recommend you to read the source code of the source code. \ No newline at end of file +If you want a more detailed explanation we would recommend you to read the source code of the source code. diff --git a/docs/index.md b/docs/index.md index 4ab0748..612c7a5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1 +1 @@ ---8<-- "README.md" \ No newline at end of file +--8<-- "README.md" diff --git a/docs/license.md b/docs/license.md index 64ad9b4..f409d45 100644 --- a/docs/license.md +++ b/docs/license.md @@ -1 +1 @@ ---8<-- "LICENSE" \ No newline at end of file +--8<-- "LICENSE" diff --git a/docs/stylesheets/extra.css b/docs/stylesheets/extra.css index 72b248b..767f8aa 100644 --- a/docs/stylesheets/extra.css +++ b/docs/stylesheets/extra.css @@ -1,3 +1,3 @@ :root { --md-primary-fg-color: #00b189; - } \ No newline at end of file + } diff --git a/examples/jaffle_shop/Dockerfile b/examples/jaffle_shop/Dockerfile index beecbc7..162a901 100644 --- a/examples/jaffle_shop/Dockerfile +++ b/examples/jaffle_shop/Dockerfile @@ -1,4 +1,4 @@ FROM prefecthq/prefect:2.10.17-python3.11 COPY ./requirements.txt ./requirements.txt -RUN pip install -r requirements.txt --no-cache-dir \ No newline at end of file +RUN pip install -r requirements.txt --no-cache-dir diff --git a/examples/jaffle_shop/models/staging/stg_payments.sql b/examples/jaffle_shop/models/staging/stg_payments.sql index 700cf7f..f718596 100644 --- a/examples/jaffle_shop/models/staging/stg_payments.sql +++ b/examples/jaffle_shop/models/staging/stg_payments.sql @@ -1,5 +1,5 @@ with source as ( - + {#- Normally we would select from the table here, but we are using seeds to load our data in this project diff --git a/examples/jaffle_shop/profiles.yml b/examples/jaffle_shop/profiles.yml index d51a335..457b9b7 100644 --- a/examples/jaffle_shop/profiles.yml +++ b/examples/jaffle_shop/profiles.yml @@ -9,4 +9,4 @@ example_jaffle_shop: port: 5432 dbname: data schema: example - connect_timeout: 30 \ No newline at end of file + connect_timeout: 30 diff --git a/examples/jaffle_shop/requirements.txt b/examples/jaffle_shop/requirements.txt index a7797aa..9f37316 100644 --- a/examples/jaffle_shop/requirements.txt +++ b/examples/jaffle_shop/requirements.txt @@ -1,2 +1,2 @@ dbt-postgres==1.6.5 -prefect-dbt-flow \ No newline at end of file +prefect-dbt-flow diff --git a/examples/jaffle_shop/seeds/raw_customers.csv b/examples/jaffle_shop/seeds/raw_customers.csv index e386bb3..b3e6747 100644 --- a/examples/jaffle_shop/seeds/raw_customers.csv +++ b/examples/jaffle_shop/seeds/raw_customers.csv @@ -98,4 +98,4 @@ id,first_name,last_name 97,Shirley,D. 98,Nicole,M. 99,Mary,G. -100,Jean,M. \ No newline at end of file +100,Jean,M. diff --git a/examples/jaffle_shop/seeds/raw_orders.csv b/examples/jaffle_shop/seeds/raw_orders.csv index 45160e1..c487062 100644 --- a/examples/jaffle_shop/seeds/raw_orders.csv +++ b/examples/jaffle_shop/seeds/raw_orders.csv @@ -97,4 +97,4 @@ id,user_id,order_date,status 96,90,2018-04-06,placed 97,89,2018-04-07,placed 98,41,2018-04-07,placed -99,85,2018-04-09,placed \ No newline at end of file +99,85,2018-04-09,placed diff --git a/examples/jaffle_shop/seeds/raw_payments.csv b/examples/jaffle_shop/seeds/raw_payments.csv index 3989cb2..a587baa 100644 --- a/examples/jaffle_shop/seeds/raw_payments.csv +++ b/examples/jaffle_shop/seeds/raw_payments.csv @@ -111,4 +111,4 @@ id,order_id,payment_method,amount 110,96,gift_card,1700 111,97,bank_transfer,1400 112,98,bank_transfer,1000 -113,99,credit_card,2400 \ No newline at end of file +113,99,credit_card,2400 diff --git a/examples/jaffle_shop_duckdb/models/staging/stg_payments.sql b/examples/jaffle_shop_duckdb/models/staging/stg_payments.sql index 700cf7f..f718596 100644 --- a/examples/jaffle_shop_duckdb/models/staging/stg_payments.sql +++ b/examples/jaffle_shop_duckdb/models/staging/stg_payments.sql @@ -1,5 +1,5 @@ with source as ( - + {#- Normally we would select from the table here, but we are using seeds to load our data in this project diff --git a/examples/jaffle_shop_duckdb/seeds/raw_customers.csv b/examples/jaffle_shop_duckdb/seeds/raw_customers.csv index e386bb3..b3e6747 100644 --- a/examples/jaffle_shop_duckdb/seeds/raw_customers.csv +++ b/examples/jaffle_shop_duckdb/seeds/raw_customers.csv @@ -98,4 +98,4 @@ id,first_name,last_name 97,Shirley,D. 98,Nicole,M. 99,Mary,G. -100,Jean,M. \ No newline at end of file +100,Jean,M. diff --git a/examples/jaffle_shop_duckdb/seeds/raw_orders.csv b/examples/jaffle_shop_duckdb/seeds/raw_orders.csv index 45160e1..c487062 100644 --- a/examples/jaffle_shop_duckdb/seeds/raw_orders.csv +++ b/examples/jaffle_shop_duckdb/seeds/raw_orders.csv @@ -97,4 +97,4 @@ id,user_id,order_date,status 96,90,2018-04-06,placed 97,89,2018-04-07,placed 98,41,2018-04-07,placed -99,85,2018-04-09,placed \ No newline at end of file +99,85,2018-04-09,placed diff --git a/examples/jaffle_shop_duckdb/seeds/raw_payments.csv b/examples/jaffle_shop_duckdb/seeds/raw_payments.csv index 3989cb2..a587baa 100644 --- a/examples/jaffle_shop_duckdb/seeds/raw_payments.csv +++ b/examples/jaffle_shop_duckdb/seeds/raw_payments.csv @@ -111,4 +111,4 @@ id,order_id,payment_method,amount 110,96,gift_card,1700 111,97,bank_transfer,1400 112,98,bank_transfer,1000 -113,99,credit_card,2400 \ No newline at end of file +113,99,credit_card,2400 diff --git a/examples/sample_project/models/my_model_c.sql b/examples/sample_project/models/my_model_c.sql index 4abf3de..63c6061 100644 --- a/examples/sample_project/models/my_model_c.sql +++ b/examples/sample_project/models/my_model_c.sql @@ -1,3 +1,3 @@ select id from {{ ref('my_model_a') }} union -select id from {{ ref('my_model_b') }} \ No newline at end of file +select id from {{ ref('my_model_b') }} diff --git a/prefect_dbt_flow/__init__.py b/prefect_dbt_flow/__init__.py index 8d63712..45b07a1 100644 --- a/prefect_dbt_flow/__init__.py +++ b/prefect_dbt_flow/__init__.py @@ -3,9 +3,9 @@ # ruff: noqa: F401 # fmt: off from prefect_dbt_flow.dbt import ( - DbtDagOptions, - DbtNode, - DbtProfile, + DbtDagOptions, + DbtNode, + DbtProfile, DbtProject ) from prefect_dbt_flow.flow import dbt_flow diff --git a/prefect_dbt_flow/dbt/tasks.py b/prefect_dbt_flow/dbt/tasks.py index f616dda..fdf5839 100644 --- a/prefect_dbt_flow/dbt/tasks.py +++ b/prefect_dbt_flow/dbt/tasks.py @@ -1,7 +1,8 @@ """Code for generate prefect DAG, includes dbt run and test functions""" -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional -from prefect import get_run_logger, task +from prefect import Task, get_run_logger, task +from prefect.futures import PrefectFuture from prefect_dbt_flow.dbt import DbtNode, DbtProfile, DbtProject, DbtResourceType, cli @@ -15,7 +16,7 @@ def _task_dbt_seed( profile: DbtProfile, dbt_node: DbtNode, task_kwargs: Optional[Dict] = None, -): +) -> Task: """ Create a Prefect task for running a dbt seed. Uses dbt_seed from cli module @@ -52,7 +53,7 @@ def _task_dbt_run( profile: DbtProfile, dbt_node: DbtNode, task_kwargs: Optional[Dict] = None, -): +) -> Task: """ Create a Prefect task for running a dbt model. Uses dbt_run from cli module @@ -89,7 +90,7 @@ def _task_dbt_test( profile: DbtProfile, dbt_node: DbtNode, task_kwargs: Optional[Dict] = None, -): +) -> Task: """ Create a Prefect task for testing a dbt model. Uses dbt_test from cli module @@ -157,7 +158,7 @@ def generate_tasks_dag( for dbt_node in dbt_graph } - submitted_tasks: Dict[str, Any] = {} + submitted_tasks: Dict[str, PrefectFuture] = {} while node := _get_next_node(dbt_graph, list(submitted_tasks.keys())): run_task = all_tasks[node.unique_id] task_dependencies = [ diff --git a/prefect_dbt_flow/flow.py b/prefect_dbt_flow/flow.py index fbfcf9d..63e81aa 100644 --- a/prefect_dbt_flow/flow.py +++ b/prefect_dbt_flow/flow.py @@ -1,7 +1,7 @@ """Functions to create a prefect flow for a dbt project.""" -from typing import Any, Optional +from typing import Optional -from prefect import flow +from prefect import Flow, flow from prefect_dbt_flow.dbt import DbtDagOptions, DbtProfile, DbtProject, graph, tasks @@ -11,7 +11,7 @@ def dbt_flow( profile: Optional[DbtProfile] = None, dag_options: Optional[DbtDagOptions] = None, flow_kwargs: Optional[dict] = None, -) -> Any: +) -> Flow: """ Create a PrefectFlow for executing a dbt project.