diff --git a/my_flow.py b/my_flow.py new file mode 100644 index 0000000..d75d4c4 --- /dev/null +++ b/my_flow.py @@ -0,0 +1,21 @@ +from prefect_dbt_flow.flow import dbt_flow +from prefect_dbt_flow.dbt import DbtProject, DbtProfile +import os + +dbt_project = DbtProject( + name="project_name", + project_dir=os.getcwd() +) +dbt_profile = DbtProfile( + name="project_name", + project_dir=os.getcwd() +) + +run = dbt_flow( + project=dbt_project, + profile = dbt_profile, + run_test_after_model=True, + flow_kwargs={ + "target":"dev" #what if you want to change the dbt target ? + } +) \ No newline at end of file diff --git a/prefect_dbt_flow/dbt/__init__.py b/prefect_dbt_flow/dbt/__init__.py index 538da7b..52e435f 100644 --- a/prefect_dbt_flow/dbt/__init__.py +++ b/prefect_dbt_flow/dbt/__init__.py @@ -10,7 +10,8 @@ class DbtProject: @dataclass class DbtProfile: - ... + name: str + project_dir: str @dataclass diff --git a/prefect_dbt_flow/cmd.py b/prefect_dbt_flow/dbt/cmd.py similarity index 100% rename from prefect_dbt_flow/cmd.py rename to prefect_dbt_flow/dbt/cmd.py diff --git a/prefect_dbt_flow/dbt/tasks.py b/prefect_dbt_flow/dbt/tasks.py index bfcab86..beb3ac1 100644 --- a/prefect_dbt_flow/dbt/tasks.py +++ b/prefect_dbt_flow/dbt/tasks.py @@ -3,7 +3,7 @@ from prefect import task from prefect_dbt_flow.dbt import DbtNode -from prefect_dbt_flow import cmd +from prefect_dbt_flow.dbt import cmd DBT_RUN_EMOJI = "๐Ÿƒ" DBT_TEST_EMOJI = "๐Ÿงช" diff --git a/prefect_dbt_flow_2/__init__.py b/prefect_dbt_flow_2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prefect_dbt_flow_2/__pycache__/__init__.cpython-311.pyc b/prefect_dbt_flow_2/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..f3998b2 Binary files /dev/null and b/prefect_dbt_flow_2/__pycache__/__init__.cpython-311.pyc differ diff --git a/prefect_dbt_flow_2/flow.py b/prefect_dbt_flow_2/flow.py new file mode 100644 index 0000000..20e0ae6 --- /dev/null +++ b/prefect_dbt_flow_2/flow.py @@ -0,0 +1,25 @@ +from prefect_dbt_flow_2.utils.flow import dbt_flow + +dev_flow = dbt_flow( + dbt_project_name="project_name", + dbt_project_dir="path/to/dbt/project", + dbt_profiles_dir="path/to/dbt_profiles", + dbt_target="dev", + dbt_run_test_after_model=True, + dbt_print_stauts=True, + flow_kwargs= {"name":"xxx1"} +) + +prod_flow = dbt_flow( + dbt_project_name="project_name", + dbt_project_dir="path/to/dbt/project", + dbt_profiles_dir="path/to/dbt_profiles", + dbt_target="prod", + dbt_run_test_after_model=True, + dbt_print_stauts=True, + flow_kwargs= {"name":"xxx2"} +) + +if __name__ == "__main__": + print(dev_flow) + print(prod_flow) diff --git a/prefect_dbt_flow_2/utils/__init__.py b/prefect_dbt_flow_2/utils/__init__.py new file mode 100644 index 0000000..de8584e --- /dev/null +++ b/prefect_dbt_flow_2/utils/__init__.py @@ -0,0 +1,21 @@ +from typing import List, Literal +from dataclasses import dataclass +import os +from shutil import which + + +@dataclass +class DbtConfig: + dbt_project_name: str + dbt_project_dir: str + dbt_profiles_dir: str + dbt_target: str #dev, prod, and others, better to leave it open ??? + dbt_run_test_after_model: bool + dbt_print_stauts:bool + +@dataclass +class DbtNode: + """Class representing a dbt node""" + name: str + resource_type: Literal["model", "test"] #keep dbt naming convention + depends_on: List["DbtNode"] #what if the node depends on a macro? diff --git a/prefect_dbt_flow_2/utils/__pycache__/__init__.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..22f1b25 Binary files /dev/null and b/prefect_dbt_flow_2/utils/__pycache__/__init__.cpython-311.pyc differ diff --git a/prefect_dbt_flow_2/utils/__pycache__/cmd.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/cmd.cpython-311.pyc new file mode 100644 index 0000000..526df25 Binary files /dev/null and b/prefect_dbt_flow_2/utils/__pycache__/cmd.cpython-311.pyc differ diff --git a/prefect_dbt_flow_2/utils/__pycache__/flow.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/flow.cpython-311.pyc new file mode 100644 index 0000000..89202e0 Binary files /dev/null and b/prefect_dbt_flow_2/utils/__pycache__/flow.cpython-311.pyc differ diff --git a/prefect_dbt_flow_2/utils/__pycache__/graph.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/graph.cpython-311.pyc new file mode 100644 index 0000000..8b6c671 Binary files /dev/null and b/prefect_dbt_flow_2/utils/__pycache__/graph.cpython-311.pyc differ diff --git a/prefect_dbt_flow_2/utils/__pycache__/tasks.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/tasks.cpython-311.pyc new file mode 100644 index 0000000..2805bc7 Binary files /dev/null and b/prefect_dbt_flow_2/utils/__pycache__/tasks.cpython-311.pyc differ diff --git a/prefect_dbt_flow_2/utils/cmd.py b/prefect_dbt_flow_2/utils/cmd.py new file mode 100644 index 0000000..0cb898d --- /dev/null +++ b/prefect_dbt_flow_2/utils/cmd.py @@ -0,0 +1,23 @@ +import subprocess +from typing import List + +def _run_cmd(cmd: List[str]) -> str: + """ + Function to execute a command and return its output. + Raises an exception if the command fails. + """ + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + try: + stdout = result.stdout.decode("utf-8") if result.stdout else "No Output" + stderr = result.stderr.decode("utf-8") if result.stderr else "No Output" + except (UnicodeDecodeError, AttributeError): + stdout = "No Output" + stderr = "No Output" + + if result.returncode != 0: + raise Exception( + f"Error running cmd '{' '.join(cmd)}':\n\terr: {stderr}\n\tout: {stdout}" + ) + + return stdout \ No newline at end of file diff --git a/prefect_dbt_flow_2/utils/flow.py b/prefect_dbt_flow_2/utils/flow.py new file mode 100644 index 0000000..11adf0a --- /dev/null +++ b/prefect_dbt_flow_2/utils/flow.py @@ -0,0 +1,53 @@ +from prefect import flow, Flow +from typing import Optional, Any +from prefect_dbt_flow_2.utils import graph, tasks, DbtConfig + + +def dbt_flow( #What do we want as default values? + dbt_project_name: str, + dbt_project_dir: str, + dbt_profiles_dir: str, + dbt_target: str = "dev", + dbt_run_test_after_model: bool = True, + dbt_print_stauts:bool = False, + flow_kwargs: Optional[dict] = None, +)-> Flow[[], Any]: + """ + Create a Prefect flow for running dbt tasks. + + Args: + dbt_project_name (str): The name of the dbt project. + dbt_project_dir (str): The directory where the dbt project is located. + dbt_profiles_dir (str): The directory containing dbt profiles. + flow_kwargs (dict, optional): Additional kwargs for the Prefect flow. + ... + + Returns: + prefect.Flow: The Prefect flow for running dbt tasks. + """ + all_flow_kwargs = { + "name": dbt_project_name, + **(flow_kwargs or {}), + } + + + + @flow(**all_flow_kwargs) + def run_dbt_flow(): #Why not inside of the flow? + dbt_config=DbtConfig( + dbt_project_name=dbt_project_name, + dbt_project_dir=dbt_project_dir, + dbt_profiles_dir=dbt_profiles_dir, + dbt_target=dbt_target, + dbt_run_test_after_model=dbt_run_test_after_model, + dbt_print_stauts=dbt_print_stauts, + ) + + # dbt_graph = graph.parse_dbt_nodes_info(dbt_config) #Why not inside of the flow? + + # tasks.geneate_tasks_dag( + # dbt_graph=dbt_graph, + # dbt_config=dbt_config, + # ) + + return run_dbt_flow \ No newline at end of file diff --git a/prefect_dbt_flow_2/utils/graph.py b/prefect_dbt_flow_2/utils/graph.py new file mode 100644 index 0000000..8a960ea --- /dev/null +++ b/prefect_dbt_flow_2/utils/graph.py @@ -0,0 +1,46 @@ +import json +from prefect import get_run_logger +from typing import Dict + +from prefect_dbt_flow_2.utils.cmd import _run_cmd +from prefect_dbt_flow_2.utils import DbtNode + +def parse_dbt_nodes_info() -> Dict[str, DbtNode]: + """ + Function to parse dbt nodes from the output of the `dbt ls` command. + Returns a dictionary mapping unique IDs to DbtNode instances. + """ + dbt_ls_command = [ + DBT_EXE, + "ls", + "--project-dir", + str(DBT_PROJECT_DIR.absolute()), + "--output", + "json", + "--profiles-dir", + str(DBT_PROJECT_DIR.absolute()), + ] + # print(f"\n---debug_parse_dbt_nodes_info__dbt_ls_command:\n{dbt_ls_command}") + cmd_out = _run_cmd(dbt_ls_command) + # print(f"\n---debug_parse_dbt_nodes_info:\n{cmd_out}") + dbt_nodes_info = {} + for raw_dbt_node_data in cmd_out.split("\n"): + if "{" in raw_dbt_node_data: + try: + node_dict = json.loads(raw_dbt_node_data.strip()) + if node_dict["resource_type"] == "model" or node_dict["resource_type"] == "test": + dbt_node = DbtNode( + name=node_dict["name"], + unique_id=node_dict["unique_id"], + resource_type=node_dict["resource_type"], + depends_on=node_dict["depends_on"].get("nodes", []), + file_path=DBT_PROJECT_DIR / node_dict["original_file_path"], + tags=node_dict["tags"], + config=node_dict["config"], + ) + dbt_nodes_info[dbt_node.unique_id] = dbt_node + except json.decoder.JSONDecodeError: + get_run_logger().debug(f"Skipping line: {raw_dbt_node_data}") + print("error") + + return dbt_nodes_info \ No newline at end of file diff --git a/prefect_dbt_flow_2/utils/tasks.py b/prefect_dbt_flow_2/utils/tasks.py new file mode 100644 index 0000000..edb0eb7 --- /dev/null +++ b/prefect_dbt_flow_2/utils/tasks.py @@ -0,0 +1,96 @@ +from typing import List, Dict, Optional + +from prefect import task +from prefect_dbt_flow_2.utils import DbtConfig, DbtNode + +# from prefect_dbt_flow.dbt import DbtNode, _run_cmd +# from prefect_dbt_flow import cmd + +DBT_RUN_EMOJI = "๐Ÿƒ" +DBT_TEST_EMOJI = "๐Ÿงช" + + +def _task_dbt_run(dbt_node: DbtNode, task_kwargs: Optional[Dict] = None): + all_task_kwargs = { + **(task_kwargs or {}), + "name": f"{DBT_RUN_EMOJI} {dbt_node.name}", + } + + @task(**all_task_kwargs) + def dbt_run(): + dbt_run_command = [ #what about the other options of dbt run ? + DBT_EXE, #need to provide this + "run", + "-t", + "dev", #this should be and option [dev | prod] + "--project-dir", + str(DBT_PROJECT_DIR.absolute()), #need to provide this env? + "--profiles-dir", + str(DBT_PROJECT_DIR.absolute()), #neet to provide this env? + "-m", + dbt_node.name, + ] + _run_cmd(dbt_run_command) + + return dbt_run + + +def _task_dbt_test(dbt_node: DbtNode, task_kwargs: Optional[Dict] = None): + all_task_kwargs = { + **(task_kwargs or {}), + "name": f"{DBT_TEST_EMOJI} {dbt_node.name}", + } + + @task(**all_task_kwargs) + def dbt_test(): + dbt_run_command = [ #what about the other options of dbt test? + DBT_EXE, #need to provide this + "test", + "-t", + "dev", #this should be and option [dev | prod] + "--project-dir", + str(DBT_PROJECT_DIR.absolute()), #need to provide this env? + "--profiles-dir", + str(DBT_PROJECT_DIR.absolute()), #neet to provide this env? + "-m", + dbt_node.name, + ] + _run_cmd(dbt_run_command) + + + return dbt_test + + +def generate_tasks_dag( + dbt_graph: List[DbtNode], + dbt_config=DbtConfig, +): + dbt_tasks_dag = list() + + if dbt_config.dbt_run_test_after_model: + for node in dbt_graph: + node.name + node.resource_type + node.depends_on + + dbt_config.dbt_target + dbt_config.dbt_project_dir + dbt_config.dbt_project_dir + dbt_config.dbt_run_test_after_model + _task_dbt_test() + + else: + for node in dbt_graph: + node.name + node.resource_type + node.depends_on + + dbt_config.dbt_target + dbt_config.dbt_project_dir + dbt_config.dbt_project_dir + dbt_config.dbt_run_test_after_model + _task_dbt_run() + + + + return dbt_tasks_dag \ No newline at end of file