From 3122a6b69a38c49590c4c2e017657a6c5b00c186 Mon Sep 17 00:00:00 2001 From: David Date: Fri, 29 Sep 2023 17:16:25 +0200 Subject: [PATCH] partial save --- my_flow.py | 21 ++++ prefect_dbt_flow/dbt/__init__.py | 3 +- prefect_dbt_flow/{ => dbt}/cmd.py | 0 prefect_dbt_flow/dbt/tasks.py | 2 +- prefect_dbt_flow_2/__init__.py | 0 .../__pycache__/__init__.cpython-311.pyc | Bin 0 -> 188 bytes prefect_dbt_flow_2/flow.py | 25 +++++ prefect_dbt_flow_2/utils/__init__.py | 21 ++++ .../__pycache__/__init__.cpython-311.pyc | Bin 0 -> 2046 bytes .../utils/__pycache__/cmd.cpython-311.pyc | Bin 0 -> 1398 bytes .../utils/__pycache__/flow.cpython-311.pyc | Bin 0 -> 1931 bytes .../utils/__pycache__/graph.cpython-311.pyc | Bin 0 -> 2416 bytes .../utils/__pycache__/tasks.cpython-311.pyc | Bin 0 -> 1546 bytes prefect_dbt_flow_2/utils/cmd.py | 23 +++++ prefect_dbt_flow_2/utils/flow.py | 53 ++++++++++ prefect_dbt_flow_2/utils/graph.py | 46 +++++++++ prefect_dbt_flow_2/utils/tasks.py | 96 ++++++++++++++++++ 17 files changed, 288 insertions(+), 2 deletions(-) create mode 100644 my_flow.py rename prefect_dbt_flow/{ => dbt}/cmd.py (100%) create mode 100644 prefect_dbt_flow_2/__init__.py create mode 100644 prefect_dbt_flow_2/__pycache__/__init__.cpython-311.pyc create mode 100644 prefect_dbt_flow_2/flow.py create mode 100644 prefect_dbt_flow_2/utils/__init__.py create mode 100644 prefect_dbt_flow_2/utils/__pycache__/__init__.cpython-311.pyc create mode 100644 prefect_dbt_flow_2/utils/__pycache__/cmd.cpython-311.pyc create mode 100644 prefect_dbt_flow_2/utils/__pycache__/flow.cpython-311.pyc create mode 100644 prefect_dbt_flow_2/utils/__pycache__/graph.cpython-311.pyc create mode 100644 prefect_dbt_flow_2/utils/__pycache__/tasks.cpython-311.pyc create mode 100644 prefect_dbt_flow_2/utils/cmd.py create mode 100644 prefect_dbt_flow_2/utils/flow.py create mode 100644 prefect_dbt_flow_2/utils/graph.py create mode 100644 prefect_dbt_flow_2/utils/tasks.py diff --git a/my_flow.py b/my_flow.py new file mode 100644 index 0000000..d75d4c4 --- /dev/null +++ b/my_flow.py @@ -0,0 +1,21 @@ +from prefect_dbt_flow.flow import dbt_flow +from prefect_dbt_flow.dbt import DbtProject, DbtProfile +import os + +dbt_project = DbtProject( + name="project_name", + project_dir=os.getcwd() +) +dbt_profile = DbtProfile( + name="project_name", + project_dir=os.getcwd() +) + +run = dbt_flow( + project=dbt_project, + profile = dbt_profile, + run_test_after_model=True, + flow_kwargs={ + "target":"dev" #what if you want to change the dbt target ? + } +) \ No newline at end of file diff --git a/prefect_dbt_flow/dbt/__init__.py b/prefect_dbt_flow/dbt/__init__.py index 538da7b..52e435f 100644 --- a/prefect_dbt_flow/dbt/__init__.py +++ b/prefect_dbt_flow/dbt/__init__.py @@ -10,7 +10,8 @@ class DbtProject: @dataclass class DbtProfile: - ... + name: str + project_dir: str @dataclass diff --git a/prefect_dbt_flow/cmd.py b/prefect_dbt_flow/dbt/cmd.py similarity index 100% rename from prefect_dbt_flow/cmd.py rename to prefect_dbt_flow/dbt/cmd.py diff --git a/prefect_dbt_flow/dbt/tasks.py b/prefect_dbt_flow/dbt/tasks.py index bfcab86..beb3ac1 100644 --- a/prefect_dbt_flow/dbt/tasks.py +++ b/prefect_dbt_flow/dbt/tasks.py @@ -3,7 +3,7 @@ from prefect import task from prefect_dbt_flow.dbt import DbtNode -from prefect_dbt_flow import cmd +from prefect_dbt_flow.dbt import cmd DBT_RUN_EMOJI = "๐Ÿƒ" DBT_TEST_EMOJI = "๐Ÿงช" diff --git a/prefect_dbt_flow_2/__init__.py b/prefect_dbt_flow_2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prefect_dbt_flow_2/__pycache__/__init__.cpython-311.pyc b/prefect_dbt_flow_2/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f3998b2ce97c0e00bc4aa541f62a999d0181e6ff GIT binary patch literal 188 zcmZ3^%ge<81OX?-QbF`%5CH>>P{wCAAY(d13PUi1CZpdemP7a9w-nG5{Ng_ xkB`sH%PfhH*DI*}#bJ}1pHiBWYFESxG#})GVtyd;ftit!@dE>lC}IYR0RT#ZFRcIo literal 0 HcmV?d00001 diff --git a/prefect_dbt_flow_2/flow.py b/prefect_dbt_flow_2/flow.py new file mode 100644 index 0000000..20e0ae6 --- /dev/null +++ b/prefect_dbt_flow_2/flow.py @@ -0,0 +1,25 @@ +from prefect_dbt_flow_2.utils.flow import dbt_flow + +dev_flow = dbt_flow( + dbt_project_name="project_name", + dbt_project_dir="path/to/dbt/project", + dbt_profiles_dir="path/to/dbt_profiles", + dbt_target="dev", + dbt_run_test_after_model=True, + dbt_print_stauts=True, + flow_kwargs= {"name":"xxx1"} +) + +prod_flow = dbt_flow( + dbt_project_name="project_name", + dbt_project_dir="path/to/dbt/project", + dbt_profiles_dir="path/to/dbt_profiles", + dbt_target="prod", + dbt_run_test_after_model=True, + dbt_print_stauts=True, + flow_kwargs= {"name":"xxx2"} +) + +if __name__ == "__main__": + print(dev_flow) + print(prod_flow) diff --git a/prefect_dbt_flow_2/utils/__init__.py b/prefect_dbt_flow_2/utils/__init__.py new file mode 100644 index 0000000..de8584e --- /dev/null +++ b/prefect_dbt_flow_2/utils/__init__.py @@ -0,0 +1,21 @@ +from typing import List, Literal +from dataclasses import dataclass +import os +from shutil import which + + +@dataclass +class DbtConfig: + dbt_project_name: str + dbt_project_dir: str + dbt_profiles_dir: str + dbt_target: str #dev, prod, and others, better to leave it open ??? + dbt_run_test_after_model: bool + dbt_print_stauts:bool + +@dataclass +class DbtNode: + """Class representing a dbt node""" + name: str + resource_type: Literal["model", "test"] #keep dbt naming convention + depends_on: List["DbtNode"] #what if the node depends on a macro? diff --git a/prefect_dbt_flow_2/utils/__pycache__/__init__.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..22f1b25522f9085f17bda2939a656d074d79321b GIT binary patch literal 2046 zcmaJ?&ubhv6xPgWX7-0&n{4c^18p`YC0){Vq0pOYp*93J#x4z}bT4AOlI;nzGp;mS z*Y#xsIrQj&A^8hjO8%KO^f2g90wssu*4U?<`krQY?X?|iG~YfweUhHur_ug7Iawl5 z_TSBlcQzsaVq;j`fpPi|7_SH?+=@usYFcf(XQSHe`tAgWus$b&R!MS)VQ&~kUT!`60d6g(KtpmF*=7_W#RO^cJJy$DfqTdv)^brL9XfsN7hYkh~h7R^sqmzb_YA_bYVanK-JA1N# z>mc~>s_y{07oNBDL61JjN|W8??(*~Hta9h|w|`e2_9)h4`Y@Zmeem<)RFB?;dTbC3 zIWwH}dMQ_+`=XQrgyfbH46R&Hw49MIm(vBGkx-!J88Lkjv!IJ0=3)|WhFf|X_q!vL zXOIjdgPywR;h`K+Hp56LhMp2q(m-yBR9DfKotUB0EZBsziXwvcgQ>$fWhxCiDI8i) zeuSHyLYPLlg-`*|bR$V3c^lg%A+u;zhk(P>PXJ^2o}KGatjBaNo2m9F)?-@DF8Ze{ z-L>x8^R?{L`5wi3Oy{$i^MUyc`I@-g^&)IQ&-EhM@clgVpxcy7*NJb$G7wT~GWzlqg)>BH%?6Qw=Sr?t!F zJj^38M0p?KQjYJT4*_?to&b#HcyyXSt{uq3y_frk``sr8mGfLyss~@co_V=&xX^w4 z$J1*MH_kqm*I@=uQP=alatm@`4|9O$ZME)P-s7e4KZhA4!d-Y3o~-lm-%`K3m`jU{ z`h@o20)iXM+!f~5(B;vcDwM>(8TlE2_S80h{(0K)i_g!ZNtRi1JdF2+h5Y9FMt%Wg z+@HD+*mo?;%1EtuUCqe7-gwPOv1^`;RD0vK?-9G)x2aX@ldEuozB7aq2Y4^$a^{2? z<5_q5O{E+CG&T=cD|%U`a3Qck><2WWHKs3<~zzb4j_%XhI9K<^xguoXKiJycR}n4 zS-}q;(LZ}1D0dO@zC?{_P02u4SsB~x$X=_Ll$&giB~kBU_WR>6@D-x1sTgM@+hUZl z^*>NF|D~fKvI!1=$nWY+^%$^}Z19?jQ@aelz6#e+t}2*ab}J^Le-PKhPKoKn2S5yB z_Tq!x^^LSH?62nzkW^Pgp6;r5;4^do>PJdf#Y4<=Jj+I!4}C|tfp3ezrq`$w3TiiO zCup@AKC$H-9jxsNZU>=ghoTY(UTCJo4sFL6ZqpW5={UOCa6LZST~Z^H3xWW# zq>OcpP&CVDJDKx={a%dA`vyyjFrK-8M~9RG6*zd zmnI^Mhf8f1IF$29!gq&c(hsN0grxg1Jjr^2oNH z-(lPqW#*EqB7G%r{gTe)Ws>)thn|Qt#K>H_WhbB2stcSlUL_wkJVHCwa}CkB5Cno( zmqXVhyxL~8NgYupOQPKLg4M)PM@L;c>Zhxrz@AmH>Pq{1l&)h*bu`!kOd=EUr^wdf zFAMUAKe=-ItxfB}Ymll@%nVXn57tF6xub;BqQPM`g@+%MEN4&E&OS~&iwaJ;wt zuu$15RQiR=_wU~@|8v6)bLVmPm9O&siRutkqqxfkg_kj2GWi=CxVW>kGfYD|zm+sLQ7e_8Hl7jS$xKa#&(8p%I2bu#}aAj1rqJi=tB z*5Bn*Sm)-fIeP|vICJ35toGxCg1lrf`2-?UTyNv;VDgQucXrL_@U(Cku*;%^wT5q2drAnk6 zB6)q`1Zw{PC_?2g;6LEz0|#rxp%Ma#+g71c)e|$jwqrqxc-}WV^WMzt%x~sBf1k~! z2#lZqSYdlQA-|z88yBX`sSnH(LJ5@|(v=!gS8m7>%Ce($)rQ*D8d@YPj^0f)5?!NV zbd!ywM3U1VU7`j{(Im_i%rwjlOVcdO98+0lb@oj{^Gxm#rnTj>wRV9PPP9grCLWOL z@}H<%l>@cyxch;67br-^c3;?T&vF7~t9MXU0% z@8R8hL4odwW}mxXvX*G}tS$@ki!0RTbCtI3Ft3R#DO3>_?=TURkmrM5Q!r07t+rsi z*>x#%W_ET@G(BMrgcqbS*yf{su=E0*Gcn-3R$`ttA;>E*Pa8mnWGD^gp)yp58UQbi zh%FuJeaR-@Nk<7Go!L>vCWnb{rLQDk^JQQ1iLd%fTc`4g@eI_S&sC3%Lj&Mbt)IX= zWx}}4nI)KMnV)jj1`te)(QLcigrs_QuVX@jW`tiY61RU@7KwDJ~*-=B-wn;&wl(_X2eN81Wi5 zZw%O?hqu)LK@U`fwlyy+Fa}IKtEU@4zR{nM#ReIvBV{Cy^pQ4_ew3cU8GjBl9!KPm zKugH?sya`@63-yXBFQ1iBPjr>N<4|LMBM)XBOIh?C)SSaA`g)X8hhB#=5aLMd{+P5 zW8ACLJ&QBgMP17SN}9fex|Ajsgs zo9#Ycn{l;GYz@b|Ra@M@iUEBD7FC=~{ zG)bX6Hdar;i2Q&6;Nh$${~IU11WG~Pa-2BmxJ!5mRy_ppi)cUA%1nj5vK zSR8wRV|zyJ6DV*$1nQ3KI)RFZD@vP}aUZ`Nf%3-KggbQLuv;9fhF6{UE09o;Bxyp{ z#^*f~Vvf&yCS-H`_9x7dVr6o@I;mWqTzN0de<+ng0%WqX8Y=L9@lGk!a8W14a+pBQ zAWIj+ByuV8?$t1jT!yTa!z^++vRr<8?d!GJRe@+pf7yTF7f(J3mvA4QpGTtX{|4Oz B4)_27 literal 0 HcmV?d00001 diff --git a/prefect_dbt_flow_2/utils/__pycache__/graph.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/graph.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8b6c6718ddb63e2e6908e9fed6427c60e63a91a2 GIT binary patch literal 2416 zcmb^yTWH%>^vafOIq@rT9&Je)Ee`h5HgR7oSqY)-oNaxjBk9I!RusjOU8S<*U0pY8 z=8Sywvx5u@8|%jWm1VRa{p{x$8|-Hw2SJ1&P}tZ0JkmneuN}#;9T&Q# z^SJsk6zV`Q=s)_T!+wPRbcn{F_mG_oAYUU9iIjpe&ex+*1y9C9Ik;Ex7U&G^;NmMGm5{!W>~;|5-kNJHs?9mu#9pZ(SMK5bcoa(8fE`-_%oxF9nMRH$0e0lw0vH| zz@gJ}mcW;F+1Y{!xIeu>W;9W*LY-zQ}vl3}Z zT9_Csdmp0jJ?(h}J8$gM>3LOp1KoeIgg*1$Kugs9mt_yMQ}6Ox2M6QblA0y5rgB8% ziUQUpPFx_I3XP+4Ijj{pa!ca05-FAlr{!E~-qBF>c`mCJ3W6$*yM|`nF4H-I6QLQw zqJZym1)*4!)jU^H9wlCG6-R^rLsqGU4#a9^=$BI(4YRiPl+fw~90 zEZQBSRFqUv=QY)4+=JTP8kY02Dk!{TnJ)_D7OWQXy6wwqYEI6N`f(HjjJq7*q!4!l zumeqX-0Lv=04zu5Zp-ykE3zt`=f&1*AyQIQFDO4?==O%Qcp5iA$6&tZAt7Igiw{i2kG1B1m&bJu3l zE-~$FiJj}pW<+VBl((58mQ^z9v%`=+UeWoz?Ad;}FL?-ZUEl_Um(`qRcVS_Pca$9- zFA7Ak19cPZu}@a&B%H z$XHI%mKqFqABVx8OqGbNzyd53Z;co4+C6SrHXYGj=kPJu)VY=Phj7=fq8C9Fh+1sW zV*9t*&M&WR1xGf6Bh_HS3?`~<(qxks+ikI7i#_lfd95qZ=Ubh5(T5`aYhP5u2{W8n zeeYRs|LTl2IP$~IZ*Oj#t_~hI2anfK7c=AngFR?ZClVUk9^k&aWF#lH zl2e<>scQ0^nLJk=IByP|Ukg~Vfp2&-Huj`{Gd5wwCZ1mWSu-a;GBWd*p{Ps}7 z8cbB~n1kci@X?LS=J14-NIi*|iE~!`*v2Pj{OneIYBN6dbm@0bHGbKQU)~-*@#La0 zHEW)ps}9dKR}a0paljlpS&Oh--`@zpa|f)wU+YF^FHmbUl?gNcR<-BM?(TN1|G}gg zOVyAk1gC0%?daiZlry8;Ry460O&G~5#@leLD=@3kt7i175xu${?On@%6@Cz2bDtUm zi+6W-YlA2_QX4_RLk4|N?|~%!!dw_hAMyNpx{eZ?{E7RNZ0Y`sYiyiUb!5_|~ExTW$F+5r^dqX1sAlW;+l6ln^d0@C?c z>-PZEyc9)QsB_hQEHrGiPYd-M?eiHrVYE*R9W&ZzjZRR9z7Z>@HvE-I^DtBo(60sf HBslUfxc6A3 literal 0 HcmV?d00001 diff --git a/prefect_dbt_flow_2/utils/__pycache__/tasks.cpython-311.pyc b/prefect_dbt_flow_2/utils/__pycache__/tasks.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2805bc7d069ec81dcc01d85a66004d83d0d5d118 GIT binary patch literal 1546 zcmb_c&1)M+6rbH4t=9T-NYYq^q}_vwD~K#odTD8TaO{>OB@U`v2OYXudq#?q{ZMCS zV=8QFN}!h(bSUId0zQ=@g|o|)<<-y{oOvMKT%A{NNjKk`F7U4e$gsZ@*5 zwmz-h!N8R*lM7QvCW5J zi`(KhwWFZ@Sw!tC%XZj|0cB&m0f~h$5}in-s$0qjc)Mk9hQ1Fri7#mY*43**8id~K z7OT|`3-3S%RnlhNQY<#x9_6vDSN-nEdK3ihpmmhgJ9c*#@*=cD1H$bv03Z{%KE2dE zb-|80TrleTPBRh_qx@1O2Vd#ZnzM3qMH--!vr4*AU%k3=dCghAQkRyy!9y<+>MI5} zOwC^6cF+vvOj5GbCNeh;=!5~wfJG(4)Oe+NcgtlhE@z?OII$N4NPWNn literal 0 HcmV?d00001 diff --git a/prefect_dbt_flow_2/utils/cmd.py b/prefect_dbt_flow_2/utils/cmd.py new file mode 100644 index 0000000..0cb898d --- /dev/null +++ b/prefect_dbt_flow_2/utils/cmd.py @@ -0,0 +1,23 @@ +import subprocess +from typing import List + +def _run_cmd(cmd: List[str]) -> str: + """ + Function to execute a command and return its output. + Raises an exception if the command fails. + """ + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + try: + stdout = result.stdout.decode("utf-8") if result.stdout else "No Output" + stderr = result.stderr.decode("utf-8") if result.stderr else "No Output" + except (UnicodeDecodeError, AttributeError): + stdout = "No Output" + stderr = "No Output" + + if result.returncode != 0: + raise Exception( + f"Error running cmd '{' '.join(cmd)}':\n\terr: {stderr}\n\tout: {stdout}" + ) + + return stdout \ No newline at end of file diff --git a/prefect_dbt_flow_2/utils/flow.py b/prefect_dbt_flow_2/utils/flow.py new file mode 100644 index 0000000..11adf0a --- /dev/null +++ b/prefect_dbt_flow_2/utils/flow.py @@ -0,0 +1,53 @@ +from prefect import flow, Flow +from typing import Optional, Any +from prefect_dbt_flow_2.utils import graph, tasks, DbtConfig + + +def dbt_flow( #What do we want as default values? + dbt_project_name: str, + dbt_project_dir: str, + dbt_profiles_dir: str, + dbt_target: str = "dev", + dbt_run_test_after_model: bool = True, + dbt_print_stauts:bool = False, + flow_kwargs: Optional[dict] = None, +)-> Flow[[], Any]: + """ + Create a Prefect flow for running dbt tasks. + + Args: + dbt_project_name (str): The name of the dbt project. + dbt_project_dir (str): The directory where the dbt project is located. + dbt_profiles_dir (str): The directory containing dbt profiles. + flow_kwargs (dict, optional): Additional kwargs for the Prefect flow. + ... + + Returns: + prefect.Flow: The Prefect flow for running dbt tasks. + """ + all_flow_kwargs = { + "name": dbt_project_name, + **(flow_kwargs or {}), + } + + + + @flow(**all_flow_kwargs) + def run_dbt_flow(): #Why not inside of the flow? + dbt_config=DbtConfig( + dbt_project_name=dbt_project_name, + dbt_project_dir=dbt_project_dir, + dbt_profiles_dir=dbt_profiles_dir, + dbt_target=dbt_target, + dbt_run_test_after_model=dbt_run_test_after_model, + dbt_print_stauts=dbt_print_stauts, + ) + + # dbt_graph = graph.parse_dbt_nodes_info(dbt_config) #Why not inside of the flow? + + # tasks.geneate_tasks_dag( + # dbt_graph=dbt_graph, + # dbt_config=dbt_config, + # ) + + return run_dbt_flow \ No newline at end of file diff --git a/prefect_dbt_flow_2/utils/graph.py b/prefect_dbt_flow_2/utils/graph.py new file mode 100644 index 0000000..8a960ea --- /dev/null +++ b/prefect_dbt_flow_2/utils/graph.py @@ -0,0 +1,46 @@ +import json +from prefect import get_run_logger +from typing import Dict + +from prefect_dbt_flow_2.utils.cmd import _run_cmd +from prefect_dbt_flow_2.utils import DbtNode + +def parse_dbt_nodes_info() -> Dict[str, DbtNode]: + """ + Function to parse dbt nodes from the output of the `dbt ls` command. + Returns a dictionary mapping unique IDs to DbtNode instances. + """ + dbt_ls_command = [ + DBT_EXE, + "ls", + "--project-dir", + str(DBT_PROJECT_DIR.absolute()), + "--output", + "json", + "--profiles-dir", + str(DBT_PROJECT_DIR.absolute()), + ] + # print(f"\n---debug_parse_dbt_nodes_info__dbt_ls_command:\n{dbt_ls_command}") + cmd_out = _run_cmd(dbt_ls_command) + # print(f"\n---debug_parse_dbt_nodes_info:\n{cmd_out}") + dbt_nodes_info = {} + for raw_dbt_node_data in cmd_out.split("\n"): + if "{" in raw_dbt_node_data: + try: + node_dict = json.loads(raw_dbt_node_data.strip()) + if node_dict["resource_type"] == "model" or node_dict["resource_type"] == "test": + dbt_node = DbtNode( + name=node_dict["name"], + unique_id=node_dict["unique_id"], + resource_type=node_dict["resource_type"], + depends_on=node_dict["depends_on"].get("nodes", []), + file_path=DBT_PROJECT_DIR / node_dict["original_file_path"], + tags=node_dict["tags"], + config=node_dict["config"], + ) + dbt_nodes_info[dbt_node.unique_id] = dbt_node + except json.decoder.JSONDecodeError: + get_run_logger().debug(f"Skipping line: {raw_dbt_node_data}") + print("error") + + return dbt_nodes_info \ No newline at end of file diff --git a/prefect_dbt_flow_2/utils/tasks.py b/prefect_dbt_flow_2/utils/tasks.py new file mode 100644 index 0000000..edb0eb7 --- /dev/null +++ b/prefect_dbt_flow_2/utils/tasks.py @@ -0,0 +1,96 @@ +from typing import List, Dict, Optional + +from prefect import task +from prefect_dbt_flow_2.utils import DbtConfig, DbtNode + +# from prefect_dbt_flow.dbt import DbtNode, _run_cmd +# from prefect_dbt_flow import cmd + +DBT_RUN_EMOJI = "๐Ÿƒ" +DBT_TEST_EMOJI = "๐Ÿงช" + + +def _task_dbt_run(dbt_node: DbtNode, task_kwargs: Optional[Dict] = None): + all_task_kwargs = { + **(task_kwargs or {}), + "name": f"{DBT_RUN_EMOJI} {dbt_node.name}", + } + + @task(**all_task_kwargs) + def dbt_run(): + dbt_run_command = [ #what about the other options of dbt run ? + DBT_EXE, #need to provide this + "run", + "-t", + "dev", #this should be and option [dev | prod] + "--project-dir", + str(DBT_PROJECT_DIR.absolute()), #need to provide this env? + "--profiles-dir", + str(DBT_PROJECT_DIR.absolute()), #neet to provide this env? + "-m", + dbt_node.name, + ] + _run_cmd(dbt_run_command) + + return dbt_run + + +def _task_dbt_test(dbt_node: DbtNode, task_kwargs: Optional[Dict] = None): + all_task_kwargs = { + **(task_kwargs or {}), + "name": f"{DBT_TEST_EMOJI} {dbt_node.name}", + } + + @task(**all_task_kwargs) + def dbt_test(): + dbt_run_command = [ #what about the other options of dbt test? + DBT_EXE, #need to provide this + "test", + "-t", + "dev", #this should be and option [dev | prod] + "--project-dir", + str(DBT_PROJECT_DIR.absolute()), #need to provide this env? + "--profiles-dir", + str(DBT_PROJECT_DIR.absolute()), #neet to provide this env? + "-m", + dbt_node.name, + ] + _run_cmd(dbt_run_command) + + + return dbt_test + + +def generate_tasks_dag( + dbt_graph: List[DbtNode], + dbt_config=DbtConfig, +): + dbt_tasks_dag = list() + + if dbt_config.dbt_run_test_after_model: + for node in dbt_graph: + node.name + node.resource_type + node.depends_on + + dbt_config.dbt_target + dbt_config.dbt_project_dir + dbt_config.dbt_project_dir + dbt_config.dbt_run_test_after_model + _task_dbt_test() + + else: + for node in dbt_graph: + node.name + node.resource_type + node.depends_on + + dbt_config.dbt_target + dbt_config.dbt_project_dir + dbt_config.dbt_project_dir + dbt_config.dbt_run_test_after_model + _task_dbt_run() + + + + return dbt_tasks_dag \ No newline at end of file