Skip to content

Commit

Permalink
Replaced all days_ago functions with datetime functions (#23237)
Browse files Browse the repository at this point in the history
Co-authored-by: Dev232001 <thedevhooda@gmail.com>
  • Loading branch information
uranusjr and Dev232001 authored May 23, 2022
1 parent 35620ed commit f352ee6
Show file tree
Hide file tree
Showing 26 changed files with 105 additions and 113 deletions.
5 changes: 3 additions & 2 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
"""Example DAG demonstrating the usage of the SubDagOperator."""

# [START example_subdag_operator]
import datetime

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'example_subdag_operator'

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=days_ago(2),
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@once",
tags=['example'],
) as dag:
Expand Down
7 changes: 3 additions & 4 deletions tests/api/common/test_delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
# specific language governing permissions and limitations
# under the License.


import pytest

from airflow import models
from airflow.api.common.delete_dag import delete_dag
from airflow.exceptions import AirflowException, DagNotFound
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -73,11 +72,11 @@ def setup_dag_models(self, for_sub_dag=False):

task = EmptyOperator(
task_id='dummy',
dag=models.DAG(dag_id=self.key, default_args={'start_date': days_ago(2)}),
dag=models.DAG(dag_id=self.key, default_args={'start_date': timezone.datetime(2022, 1, 1)}),
owner='airflow',
)

test_date = days_ago(1)
test_date = timezone.datetime(2022, 1, 1)
with create_session() as session:
session.add(DM(dag_id=self.key, fileloc=self.dag_file_path, is_subdag=for_sub_dag))
dr = DR(dag_id=self.key, run_type=DagRunType.MANUAL, run_id="test", execution_date=test_date)
Expand Down
21 changes: 12 additions & 9 deletions tests/api/common/test_mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

from datetime import timedelta
import datetime
from typing import Callable

import pytest
Expand All @@ -34,7 +34,6 @@
)
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -62,12 +61,12 @@ def create_dags(cls, dagbag):
cls.dag2 = dagbag.get_dag('example_subdag_operator')
cls.dag3 = dagbag.get_dag('example_trigger_target_dag')
cls.dag4 = dagbag.get_dag('test_mapped_classic')
cls.execution_dates = [days_ago(2), days_ago(1)]
cls.execution_dates = [timezone.datetime(2022, 1, 1), timezone.datetime(2022, 1, 2)]
start_date3 = cls.dag3.start_date
cls.dag3_execution_dates = [
start_date3,
start_date3 + timedelta(days=1),
start_date3 + timedelta(days=2),
start_date3 + datetime.timedelta(days=1),
start_date3 + datetime.timedelta(days=2),
]

@pytest.fixture(autouse=True)
Expand All @@ -76,7 +75,7 @@ def setup(self):
clear_db_runs()
drs = _create_dagruns(
self.dag1,
[_DagRunInfo(d, (d, d + timedelta(days=1))) for d in self.execution_dates],
[_DagRunInfo(d, (d, d + datetime.timedelta(days=1))) for d in self.execution_dates],
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
Expand All @@ -88,7 +87,7 @@ def setup(self):
[
_DagRunInfo(
self.dag2.start_date,
(self.dag2.start_date, self.dag2.start_date + timedelta(days=1)),
(self.dag2.start_date, self.dag2.start_date + datetime.timedelta(days=1)),
),
],
state=State.RUNNING,
Expand All @@ -112,7 +111,7 @@ def setup(self):
[
_DagRunInfo(
self.dag4.start_date,
(self.dag4.start_date, self.dag4.start_date + timedelta(days=1)),
(self.dag4.start_date, self.dag4.start_date + datetime.timedelta(days=1)),
)
],
state=State.SUCCESS,
Expand Down Expand Up @@ -482,7 +481,11 @@ def setup_class(cls):
cls.dag1.sync_to_db()
cls.dag2 = dagbag.dags['example_subdag_operator']
cls.dag2.sync_to_db()
cls.execution_dates = [days_ago(2), days_ago(1), days_ago(0)]
cls.execution_dates = [
timezone.datetime(2022, 1, 1),
timezone.datetime(2022, 1, 2),
timezone.datetime(2022, 1, 3),
]

def setup_method(self):
clear_db_runs()
Expand Down
33 changes: 14 additions & 19 deletions tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
from urllib.parse import quote_plus

import pytest
from parameterized import parameterized

from airflow import DAG
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
Expand All @@ -28,8 +28,8 @@
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import create_user, delete_user
from tests.test_utils.db import clear_db_runs, clear_db_xcom
Expand Down Expand Up @@ -61,7 +61,7 @@ def configured_app(minimal_app_for_api):
class TestGetExtraLinks:
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app, session) -> None:
self.default_time = datetime(2020, 1, 1)
self.default_time = timezone.datetime(2020, 1, 1)

clear_db_runs()
clear_db_xcom()
Expand Down Expand Up @@ -90,40 +90,35 @@ def teardown_method(self) -> None:
clear_db_xcom()

def _create_dag(self):
with DAG(
dag_id="TEST_DAG_ID",
default_args=dict(
start_date=self.default_time,
),
) as dag:
with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": self.default_time}) as dag:
BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1")
BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"])
return dag

@parameterized.expand(
@pytest.mark.parametrize(
"url, expected_title, expected_detail",
[
(
"missing_dag",
pytest.param(
"/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links",
"DAG not found",
'DAG with ID = "INVALID" not found',
id="missing_dag",
),
(
"missing_dag_run",
pytest.param(
"/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links",
"DAG Run not found",
'DAG Run with ID = "INVALID" not found',
id="missing_dag_run",
),
(
"missing_task",
pytest.param(
"/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links",
"Task not found",
'Task with ID = "INVALID" not found',
id="missing_task",
),
]
],
)
def test_should_respond_404(self, name, url, expected_title, expected_detail):
del name
def test_should_respond_404(self, url, expected_title, expected_detail):
response = self.client.get(url, environ_overrides={'REMOTE_USER': "test"})

assert 404 == response.status_code
Expand Down
12 changes: 5 additions & 7 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import io
import json
import logging
Expand All @@ -24,7 +24,6 @@
import unittest
from argparse import ArgumentParser
from contextlib import redirect_stdout
from datetime import datetime
from unittest import mock

import pytest
Expand All @@ -38,14 +37,13 @@
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_pools, clear_db_runs

DEFAULT_DATE = days_ago(1)
DEFAULT_DATE = timezone.datetime(2022, 1, 1)
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
)
Expand Down Expand Up @@ -374,7 +372,7 @@ def test_task_states_for_dag_run(self):

dag2 = DagBag().dags['example_python_operator']
task2 = dag2.get_task(task_id='print_the_context')
default_date2 = timezone.make_aware(datetime(2016, 1, 9))
default_date2 = timezone.datetime(2016, 1, 9)
dag2.clear()
dagrun = dag2.create_dagrun(
state=State.RUNNING,
Expand Down Expand Up @@ -417,7 +415,7 @@ def test_task_states_for_dag_run_when_dag_run_not_exists(self):
task_states_for_dag_run should return an AirflowException when invalid dag id is passed
"""
with pytest.raises(DagRunNotFound):
default_date2 = timezone.make_aware(datetime(2016, 1, 9))
default_date2 = timezone.datetime(2016, 1, 9)
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
Expand Down Expand Up @@ -455,7 +453,7 @@ def setUp(self) -> None:
self.run_id = "test_run"
self.dag_path = os.path.join(ROOT_FOLDER, "dags", "test_logging_in_dag.py")
reset(self.dag_id)
self.execution_date = timezone.make_aware(datetime(2017, 1, 1))
self.execution_date = timezone.datetime(2017, 1, 1)
self.execution_date_str = self.execution_date.isoformat()
self.task_args = ['tasks', 'run', self.dag_id, self.task_id, '--local', self.execution_date_str]
self.log_dir = conf.get_mandatory_value('logging', 'base_log_folder')
Expand Down
18 changes: 8 additions & 10 deletions tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#

import datetime
import os
from unittest import mock
Expand All @@ -34,7 +33,6 @@
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -113,7 +111,7 @@ def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag):

# Create dag with a start of 1 day ago, but an sla of 0
# so we'll already have an sla_miss on the books.
test_start_date = days_ago(1)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -142,7 +140,7 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag
# Create dag with a start of 1 day ago, but an sla of 0
# so we'll already have an sla_miss on the books.
# Pass anything besides a timedelta object to the sla argument.
test_start_date = days_ago(1)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -170,7 +168,7 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dum

# Create dag with a start of 2 days ago, but an sla of 1 day
# ago so we'll already have an sla_miss on the books
test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -206,7 +204,7 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, dag_mak

# Create dag with a start of 2 days ago, but an sla of 1 day
# ago so we'll already have an sla_miss on the books
test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
with dag_maker(
dag_id='test_sla_miss',
default_args={'start_date': test_start_date, 'sla': datetime.timedelta(days=1)},
Expand Down Expand Up @@ -247,7 +245,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c

sla_callback = MagicMock(side_effect=RuntimeError('Could not call function'))

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -277,7 +275,7 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(
):
session = settings.Session()

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
email1 = 'test1@test.com'
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
Expand Down Expand Up @@ -317,7 +315,7 @@ def test_dag_file_processor_sla_miss_email_exception(
# Mock the callback function so we can verify that it was not called
mock_send_email.side_effect = RuntimeError('Could not send an email')

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down Expand Up @@ -347,7 +345,7 @@ def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag):
"""
session = settings.Session()

test_start_date = days_ago(2)
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
Expand Down
6 changes: 4 additions & 2 deletions tests/dags/test_default_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pendulum

from airflow.models import DAG
from airflow.utils.dates import days_ago

args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
args = {'owner': 'airflow', 'retries': 3, 'start_date': pendulum.datetime(2022, 1, 1)}

tree_dag = DAG(
dag_id='test_tree_view',
Expand Down
9 changes: 3 additions & 6 deletions tests/dags/test_example_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import timedelta
import datetime

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago

args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}

dag = DAG(
dag_id='test_example_bash_operator',
default_args=args,
default_args={'owner': 'airflow', 'retries': 3, 'start_date': datetime.datetime(2022, 1, 1)},
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
dagrun_timeout=datetime.timedelta(minutes=60),
)

cmd = 'ls -l'
Expand Down
Loading

0 comments on commit f352ee6

Please sign in to comment.