Skip to content

Commit

Permalink
Update deployment flow run retry settings with runtime values (Prefec…
Browse files Browse the repository at this point in the history
…tHQ#6489)

* Update deployment flow run retry settings with runtime values

Retries are configured on ad-hoc flow runs during flow run creation. When flow runs are created for deployments, settings are copied from the deployment to the flow run. However, deployments do not have any concept of flow run orchestration policies yet. Instead of introducing retries to deployments now, we will update the flow run with any discovered orchestration settings at runtime. In the future, the deployment will have the option of overriding these discovered settings.

* Add test
  • Loading branch information
zanieb authored and darrida committed Aug 25, 2022
1 parent 03be16b commit 0ac1257
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/prefect/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,21 @@ async def retrieve_flow_then_begin_flow_run(
)
return state

# Update the flow run policy defaults to match settings on the flow
# Note: Mutating the flow run object prevents us from performing another read
# operation if these properties are used by the client downstream
if flow_run.empirical_policy.retry_delay_seconds is None:
flow_run.empirical_policy.retry_delay_seconds = flow.retry_delay_seconds

if flow_run.empirical_policy.max_retries is None:
flow_run.empirical_policy.max_retries = flow.retries

await client.update_flow_run(
flow_run_id=flow_run_id,
flow_version=flow.version,
empirical_policy=flow_run.empirical_policy,
)

if flow.should_validate_parameters:
failed_state = None
try:
Expand Down
21 changes: 14 additions & 7 deletions src/prefect/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,20 @@ def latest_now(*args):
monkeypatch.setattr("anyio.sleep", sleep)

@contextmanager
def assert_sleeps_for(seconds: Union[int, float]):
def assert_sleeps_for(
seconds: Union[int, float], extra_tolerance: Union[int, float] = 0
):
"""
Assert that sleep was called for N seconds during the duration of the context.
The runtime of the code during the context of the duration is used as a
tolerance to account for sleeps that start based on a time. This is less
The runtime of the code during the context of the duration is used as an
upper tolerance to account for sleeps that start based on a time. This is less
brittle than attempting to freeze the current time.
If an integer is provided, the tolerance will be rounded up to the nearest
integer. If a float is provided, the tolerance will be a float.
If an integer is provided, the upper tolerance will be rounded up to the nearest
integer. If a float is provided, the upper tolerance will be a float.
An optional extra tolerance may be provided to account for any other issues.
This will be applied symmetrically.
"""
run_t0 = original_now().timestamp()
sleep_t0 = time_shift
Expand All @@ -168,8 +173,10 @@ def assert_sleeps_for(seconds: Union[int, float]):
runtime = int(runtime) + 1
sleeptime = sleep_t1 - sleep_t0
assert (
sleeptime <= seconds <= sleeptime + runtime
), f"Sleep was called for {sleeptime}; expected {seconds} with tolerance of +{runtime}."
sleeptime - float(extra_tolerance)
<= seconds
<= sleeptime + runtime + extra_tolerance
), f"Sleep was called for {sleeptime}; expected {seconds} with tolerance of +{runtime + extra_tolerance}, -{extra_tolerance}"

sleep.assert_sleeps_for = assert_sleeps_for

Expand Down
31 changes: 31 additions & 0 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,37 @@ def my_flow(x: int):
)
assert state.result() == 1

async def test_retries_loaded_from_flow_definition(
self, orion_client, patch_manifest_load, mock_anyio_sleep
):
@flow(retries=2, retry_delay_seconds=3)
def my_flow(x: int):
raise ValueError()

await patch_manifest_load(my_flow)
deployment_id = await self.create_deployment(orion_client, my_flow)

flow_run = await orion_client.create_flow_run_from_deployment(
deployment_id, parameters={"x": 1}
)
assert flow_run.empirical_policy.max_retries == None
assert flow_run.empirical_policy.retry_delay_seconds == None

with mock_anyio_sleep.assert_sleeps_for(
my_flow.retries * my_flow.retry_delay_seconds,
# Allow an extra second tolerance per retry to account for rounding
extra_tolerance=my_flow.retries,
):
state = await retrieve_flow_then_begin_flow_run(
flow_run.id, client=orion_client
)

flow_run = await orion_client.read_flow_run(flow_run.id)
assert flow_run.empirical_policy.max_retries == 2
assert flow_run.empirical_policy.retry_delay_seconds == 3
assert state.is_failed()
assert flow_run.run_count == 3

async def test_failed_run(self, orion_client, patch_manifest_load):
@flow
def my_flow(x: int):
Expand Down

0 comments on commit 0ac1257

Please sign in to comment.