Skip to content

Commit

Permalink
Exclude missing tasks from the gantt view (#23627)
Browse files Browse the repository at this point in the history
* Exclude missing tasks from the gantt view

Stops the gantt view from crashing if a task no longer exists
in a DAG but there are TaskInstances for that task.

* Fix tests
  • Loading branch information
joelossher authored May 20, 2022
1 parent e09e463 commit 4b731f4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
4 changes: 4 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3323,6 +3323,8 @@ def gantt(self, dag_id, session=None):

tasks = []
for ti in tis:
if not dag.has_task(ti.task_id):
continue
# prev_attempted_tries will reflect the currently running try_number
# or the try_number of the last complete run
# https://issues.apache.org/jira/browse/AIRFLOW-2143
Expand All @@ -3339,6 +3341,8 @@ def gantt(self, dag_id, session=None):
try_count = 1
prev_task_id = ""
for failed_task_instance in ti_fails:
if not dag.has_task(failed_task_instance.task_id):
continue
if tf_count != 0 and failed_task_instance.task_id == prev_task_id:
try_count += 1
else:
Expand Down
54 changes: 51 additions & 3 deletions tests/www/views/test_views_graph_gantt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import pytest

from airflow.configuration import conf
from airflow.models import DAG
from airflow.models import DAG, DagRun
from airflow.models.baseoperator import BaseOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State, TaskInstanceState

DAG_ID = "dag_for_testing_dt_nr_dr_form"
DEFAULT_DATE = timezone.datetime(2017, 9, 1)
Expand Down Expand Up @@ -297,3 +298,50 @@ def test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_c
_assert_run_is_not_in_dropdown(very_close_dagruns[1], data)
_assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data)
_assert_run_is_selected(very_close_dagruns[3], data)


@pytest.mark.parametrize("endpoint", ENDPOINTS)
def test_view_works_with_deleted_tasks(request, admin_client, app, endpoint):
task_to_state = {
"existing-task": TaskInstanceState.SUCCESS,
"deleted-task-success": TaskInstanceState.SUCCESS,
"deleted-task-failed": TaskInstanceState.FAILED,
}
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
for task_id in task_to_state.keys():
BaseOperator(task_id=task_id, dag=dag)

execution_date = timezone.datetime(2022, 3, 14)
dag_run_id = "test-deleted-tasks-dag-run"
with create_session() as session:
dag_run = dag.create_dagrun(
run_id=dag_run_id,
execution_date=execution_date,
data_interval=(execution_date, execution_date + timedelta(minutes=5)),
state=State.SUCCESS,
external_trigger=True,
session=session,
)
for ti in dag_run.task_instances:
ti.refresh_from_task(dag.get_task(ti.task_id))
ti.state = task_to_state[ti.task_id]
ti.start_date = execution_date
ti.end_date = execution_date + timedelta(minutes=5)
session.merge(ti)

def cleanup_database():
with create_session() as session:
session.query(DagRun).filter_by(run_id=dag_run_id).delete()

request.addfinalizer(cleanup_database)

dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
BaseOperator(task_id="existing-task", dag=dag)
app.dag_bag.bag_dag(dag=dag, root_dag=dag)

response = admin_client.get(
f'{endpoint}&execution_date={execution_date.isoformat()}',
data={"username": "test", "password": "test"},
follow_redirects=True,
)
assert response.status_code == 200

0 comments on commit 4b731f4

Please sign in to comment.