Skip to content

Commit

Permalink
partial save
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvaldez89d committed Sep 29, 2023
1 parent 3b9ccd6 commit 3122a6b
Show file tree
Hide file tree
Showing 17 changed files with 288 additions and 2 deletions.
21 changes: 21 additions & 0 deletions my_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from prefect_dbt_flow.flow import dbt_flow
from prefect_dbt_flow.dbt import DbtProject, DbtProfile
import os

dbt_project = DbtProject(
name="project_name",
project_dir=os.getcwd()
)
dbt_profile = DbtProfile(
name="project_name",
project_dir=os.getcwd()
)

run = dbt_flow(
project=dbt_project,
profile = dbt_profile,
run_test_after_model=True,
flow_kwargs={
"target":"dev" #what if you want to change the dbt target ?
}
)
3 changes: 2 additions & 1 deletion prefect_dbt_flow/dbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ class DbtProject:

@dataclass
class DbtProfile:
...
name: str
project_dir: str


@dataclass
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion prefect_dbt_flow/dbt/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from prefect import task

from prefect_dbt_flow.dbt import DbtNode
from prefect_dbt_flow import cmd
from prefect_dbt_flow.dbt import cmd

DBT_RUN_EMOJI = "🏃"
DBT_TEST_EMOJI = "🧪"
Expand Down
Empty file added prefect_dbt_flow_2/__init__.py
Empty file.
Binary file not shown.
25 changes: 25 additions & 0 deletions prefect_dbt_flow_2/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from prefect_dbt_flow_2.utils.flow import dbt_flow

dev_flow = dbt_flow(
dbt_project_name="project_name",
dbt_project_dir="path/to/dbt/project",
dbt_profiles_dir="path/to/dbt_profiles",
dbt_target="dev",
dbt_run_test_after_model=True,
dbt_print_stauts=True,
flow_kwargs= {"name":"xxx1"}
)

prod_flow = dbt_flow(
dbt_project_name="project_name",
dbt_project_dir="path/to/dbt/project",
dbt_profiles_dir="path/to/dbt_profiles",
dbt_target="prod",
dbt_run_test_after_model=True,
dbt_print_stauts=True,
flow_kwargs= {"name":"xxx2"}
)

if __name__ == "__main__":
print(dev_flow)
print(prod_flow)
21 changes: 21 additions & 0 deletions prefect_dbt_flow_2/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import List, Literal
from dataclasses import dataclass
import os
from shutil import which


@dataclass
class DbtConfig:
dbt_project_name: str
dbt_project_dir: str
dbt_profiles_dir: str
dbt_target: str #dev, prod, and others, better to leave it open ???
dbt_run_test_after_model: bool
dbt_print_stauts:bool

@dataclass
class DbtNode:
"""Class representing a dbt node"""
name: str
resource_type: Literal["model", "test"] #keep dbt naming convention
depends_on: List["DbtNode"] #what if the node depends on a macro?
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
23 changes: 23 additions & 0 deletions prefect_dbt_flow_2/utils/cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import subprocess
from typing import List

def _run_cmd(cmd: List[str]) -> str:
"""
Function to execute a command and return its output.
Raises an exception if the command fails.
"""
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

try:
stdout = result.stdout.decode("utf-8") if result.stdout else "No Output"
stderr = result.stderr.decode("utf-8") if result.stderr else "No Output"
except (UnicodeDecodeError, AttributeError):
stdout = "No Output"
stderr = "No Output"

if result.returncode != 0:
raise Exception(
f"Error running cmd '{' '.join(cmd)}':\n\terr: {stderr}\n\tout: {stdout}"
)

return stdout
53 changes: 53 additions & 0 deletions prefect_dbt_flow_2/utils/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from prefect import flow, Flow
from typing import Optional, Any
from prefect_dbt_flow_2.utils import graph, tasks, DbtConfig


def dbt_flow( #What do we want as default values?
dbt_project_name: str,
dbt_project_dir: str,
dbt_profiles_dir: str,
dbt_target: str = "dev",
dbt_run_test_after_model: bool = True,
dbt_print_stauts:bool = False,
flow_kwargs: Optional[dict] = None,
)-> Flow[[], Any]:
"""
Create a Prefect flow for running dbt tasks.
Args:
dbt_project_name (str): The name of the dbt project.
dbt_project_dir (str): The directory where the dbt project is located.
dbt_profiles_dir (str): The directory containing dbt profiles.
flow_kwargs (dict, optional): Additional kwargs for the Prefect flow.
...
Returns:
prefect.Flow: The Prefect flow for running dbt tasks.
"""
all_flow_kwargs = {
"name": dbt_project_name,
**(flow_kwargs or {}),
}



@flow(**all_flow_kwargs)
def run_dbt_flow(): #Why not inside of the flow?
dbt_config=DbtConfig(
dbt_project_name=dbt_project_name,
dbt_project_dir=dbt_project_dir,
dbt_profiles_dir=dbt_profiles_dir,
dbt_target=dbt_target,
dbt_run_test_after_model=dbt_run_test_after_model,
dbt_print_stauts=dbt_print_stauts,
)

# dbt_graph = graph.parse_dbt_nodes_info(dbt_config) #Why not inside of the flow?

# tasks.geneate_tasks_dag(
# dbt_graph=dbt_graph,
# dbt_config=dbt_config,
# )

return run_dbt_flow
46 changes: 46 additions & 0 deletions prefect_dbt_flow_2/utils/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
from prefect import get_run_logger
from typing import Dict

from prefect_dbt_flow_2.utils.cmd import _run_cmd
from prefect_dbt_flow_2.utils import DbtNode

def parse_dbt_nodes_info() -> Dict[str, DbtNode]:
"""
Function to parse dbt nodes from the output of the `dbt ls` command.
Returns a dictionary mapping unique IDs to DbtNode instances.
"""
dbt_ls_command = [
DBT_EXE,
"ls",
"--project-dir",
str(DBT_PROJECT_DIR.absolute()),
"--output",
"json",
"--profiles-dir",
str(DBT_PROJECT_DIR.absolute()),
]
# print(f"\n---debug_parse_dbt_nodes_info__dbt_ls_command:\n{dbt_ls_command}")
cmd_out = _run_cmd(dbt_ls_command)
# print(f"\n---debug_parse_dbt_nodes_info:\n{cmd_out}")
dbt_nodes_info = {}
for raw_dbt_node_data in cmd_out.split("\n"):
if "{" in raw_dbt_node_data:
try:
node_dict = json.loads(raw_dbt_node_data.strip())
if node_dict["resource_type"] == "model" or node_dict["resource_type"] == "test":
dbt_node = DbtNode(
name=node_dict["name"],
unique_id=node_dict["unique_id"],
resource_type=node_dict["resource_type"],
depends_on=node_dict["depends_on"].get("nodes", []),
file_path=DBT_PROJECT_DIR / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
)
dbt_nodes_info[dbt_node.unique_id] = dbt_node
except json.decoder.JSONDecodeError:
get_run_logger().debug(f"Skipping line: {raw_dbt_node_data}")
print("error")

return dbt_nodes_info
96 changes: 96 additions & 0 deletions prefect_dbt_flow_2/utils/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import List, Dict, Optional

from prefect import task
from prefect_dbt_flow_2.utils import DbtConfig, DbtNode

# from prefect_dbt_flow.dbt import DbtNode, _run_cmd
# from prefect_dbt_flow import cmd

DBT_RUN_EMOJI = "🏃"
DBT_TEST_EMOJI = "🧪"


def _task_dbt_run(dbt_node: DbtNode, task_kwargs: Optional[Dict] = None):
all_task_kwargs = {
**(task_kwargs or {}),
"name": f"{DBT_RUN_EMOJI} {dbt_node.name}",
}

@task(**all_task_kwargs)
def dbt_run():
dbt_run_command = [ #what about the other options of dbt run ?
DBT_EXE, #need to provide this
"run",
"-t",
"dev", #this should be and option [dev | prod]
"--project-dir",
str(DBT_PROJECT_DIR.absolute()), #need to provide this env?
"--profiles-dir",
str(DBT_PROJECT_DIR.absolute()), #neet to provide this env?
"-m",
dbt_node.name,
]
_run_cmd(dbt_run_command)

return dbt_run


def _task_dbt_test(dbt_node: DbtNode, task_kwargs: Optional[Dict] = None):
all_task_kwargs = {
**(task_kwargs or {}),
"name": f"{DBT_TEST_EMOJI} {dbt_node.name}",
}

@task(**all_task_kwargs)
def dbt_test():
dbt_run_command = [ #what about the other options of dbt test?
DBT_EXE, #need to provide this
"test",
"-t",
"dev", #this should be and option [dev | prod]
"--project-dir",
str(DBT_PROJECT_DIR.absolute()), #need to provide this env?
"--profiles-dir",
str(DBT_PROJECT_DIR.absolute()), #neet to provide this env?
"-m",
dbt_node.name,
]
_run_cmd(dbt_run_command)


return dbt_test


def generate_tasks_dag(
dbt_graph: List[DbtNode],
dbt_config=DbtConfig,
):
dbt_tasks_dag = list()

if dbt_config.dbt_run_test_after_model:
for node in dbt_graph:
node.name
node.resource_type
node.depends_on

dbt_config.dbt_target
dbt_config.dbt_project_dir
dbt_config.dbt_project_dir
dbt_config.dbt_run_test_after_model
_task_dbt_test()

else:
for node in dbt_graph:
node.name
node.resource_type
node.depends_on

dbt_config.dbt_target
dbt_config.dbt_project_dir
dbt_config.dbt_project_dir
dbt_config.dbt_run_test_after_model
_task_dbt_run()



return dbt_tasks_dag

0 comments on commit 3122a6b

Please sign in to comment.