Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferrable Operators don't respect execution_timeout after being deferred #19382

Closed
2 tasks done
eskarimov opened this issue Nov 3, 2021 · 3 comments
Closed
2 tasks done
Assignees
Labels
affected_version:2.2 Issues Reported for 2.2 area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug

Comments

@eskarimov
Copy link
Contributor

eskarimov commented Nov 3, 2021

Apache Airflow version

2.2.0

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

./breeze start-airflow --python 3.7 --backend postgres

What happened

When a task is resumed after being deferred, its start_time is not equal to the original start_time, but to the timestamp when a task has resumed.

In case a task has execution_timeout set up and it's running longer, it might not raise a timeout error, because technically a brand new task instance starts after being deferred.
I know it's expected that it'd be a brand new task instance, but the documentation describes the behaviour with execution_timeout set differently (see below in "What you expected to happen")

It is especially true, if an Operator needs to be deferred multiple times, so every time it continues after defer, time starts to count again.

Some task instance details after an example task has completed:

Attribute Value
execution_date 2021-11-03, 14:45:29
trigger_timeout 2021-11-03, 14:46:30
start_date 2021-11-03, 14:46:32
end_date 2021-11-03, 14:47:02
execution_timeout 0:01:00
duration 30.140004
state success

What you expected to happen

Task failure with Timeout Exception.
Documentation says:

  • Note that execution_timeout on Operators is considered over the total runtime, not individual executions in-between deferrals - this means that if execution_timeout is set, an Operator may fail while it's deferred or while it's running after a deferral, even if it's only been resumed for a few seconds.

Also, I see the following code part trying to check the timeout value after the task is coming back from the deferral state:

            # If we are coming in with a next_method (i.e. from a deferral),
            # calculate the timeout from our start_date.
            if self.next_method:
                timeout_seconds = (
                    task_copy.execution_timeout - (timezone.utcnow() - self.start_date)
                ).total_seconds()

But the issue is that self.start_date isn't equal to the original task's start_date

How to reproduce

DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync

with DAG(
    dag_id='time_delta_async_bug',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    time_delta_async_sensor = TimeDeltaSensorAsync(task_id='time_delta_task_id',
                                                   delta=timedelta(seconds=60),
                                                   execution_timeout=timedelta(seconds=60),
                                                   )

Since there're not so many async Operators at the moment I slightly modified TimeDeltaSensorAsync in order to simulate task work after defer.
Here is the full code for TimeDeltaSensorAsync class I used for to reproduce the issue, the only difference is the line with time.sleep(30) to simulate post-processing after a trigger has completed.

class TimeDeltaSensorAsync(TimeDeltaSensor):
    """
    A drop-in replacement for TimeDeltaSensor that defers itself to avoid
    taking up a worker slot while it is waiting.

    :param delta: time length to wait after the data interval before succeeding.
    :type delta: datetime.timedelta
    """

    def execute(self, context):
        target_dttm = context['data_interval_end']
        target_dttm += self.delta
        self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete")

    def execute_complete(self, context, event=None):  # pylint: disable=unused-argument
        """Callback for when the trigger fires - returns immediately."""
        time.sleep(30) # Simulate processing event after trigger completed
        return None

Anything else

I've checked the mark box "I'm willing to submit a PR", but not sure where to start, would be happy if someone could help me with the guidance in which direction I should look at.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@eskarimov eskarimov added area:core kind:bug This is a clearly a bug labels Nov 3, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 3, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@eladkal
Copy link
Contributor

eladkal commented Nov 6, 2021

@eskarimov feel free to submit PR

@eskarimov
Copy link
Contributor Author

Closing the issue since PR #20062 got merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants