This repository has been archived by the owner on Jan 9, 2023. It is now read-only.
forked from tomasfarias/airflow-dbt-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
use_dbt_artifacts_dag.py
75 lines (64 loc) · 2.38 KB
/
use_dbt_artifacts_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""Sample DAG to showcase pulling dbt artifacts from XCOM."""
import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow_dbt_python.dbt.operators import DbtRunOperator
def process_dbt_artifacts(**context):
"""Report which model or models took the longest to compile and execute."""
run_results = context["ti"].xcom_pull(
key="run_results.json", task_ids="dbt_run_daily"
)
longest_compile = None
longest_execute = None
for result in run_results["results"]:
if result["status"] != "success":
continue
model_id = result["unique_id"]
for timing in result["timing"]:
duration = (
dt.datetime.strptime(
timing["started_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
)
- dt.datetime.strptime(
timing["completed_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
)
).total_seconds()
if timing["name"] == "execute":
if longest_execute is None or duration > longest_execute[1]:
longest_execute = (model_id, duration)
elif timing["name"] == "compile":
if longest_compile is None or duration > longest_compile[1]:
longest_compile = (model_id, duration)
print(
f"{longest_execute[0]} took the longest to execute with a time of "
f"{longest_execute[1]} seconds!"
)
print(
f"{longest_compile[0]} took the longest to compile with a time of "
f"{longest_compile[1]} seconds!"
)
with DAG(
dag_id="example_dbt_artifacts",
schedule_interval="0 0 * * *",
start_date=days_ago(1),
catchup=False,
dagrun_timeout=dt.timedelta(minutes=60),
) as dag:
dbt_run = DbtRunOperator(
task_id="dbt_run_daily",
project_dir="/path/to/my/dbt/project/",
profiles_dir="~/.dbt/",
select=["+tag:daily"],
exclude=["tag:deprecated"],
target="production",
profile="my-project",
full_refresh=True,
do_xcom_push_artifacts=["manifest.json", "run_results.json"],
)
process_artifacts = PythonOperator(
task_id="process_artifacts",
python_callable=process_dbt_artifacts,
provide_context=True,
)
dbt_run >> process_artifacts