Skip to content

Commit

Permalink
Handles infrastructure errors before the agent submit a flow (#6903)
Browse files Browse the repository at this point in the history
* Handles errors in submit_run()

* Avoiding the use of monkeypatch
  • Loading branch information
hallenmaia committed Sep 21, 2022
1 parent febf234 commit a1ef0b4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
22 changes: 14 additions & 8 deletions src/prefect/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,20 @@ async def submit_run(self, flow_run: FlowRun) -> None:
ready_to_submit = await self._propose_pending_state(flow_run)

if ready_to_submit:
infrastructure = await self.get_infrastructure(flow_run)

# Wait for submission to be completed. Note that the submission function
# may continue to run in the background after this exits.
await self.task_group.start(
self._submit_run_and_capture_errors, flow_run, infrastructure
)
self.logger.info(f"Completed submission of flow run '{flow_run.id}'")
try:
infrastructure = await self.get_infrastructure(flow_run)
except Exception as exc:
self.logger.exception(
f"Failed to get infrastructure for flow run '{flow_run.id}'."
)
await self._propose_failed_state(flow_run, exc)
else:
# Wait for submission to be completed. Note that the submission function
# may continue to run in the background after this exits.
await self.task_group.start(
self._submit_run_and_capture_errors, flow_run, infrastructure
)
self.logger.info(f"Completed submission of flow run '{flow_run.id}'")

self.submitting_flow_run_ids.remove(flow_run.id)

Expand Down
29 changes: 29 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,35 @@ async def test_agent_submit_run_aborts_without_raising_if_server_raises_abort(
"Server sent an abort signal: message"
)

async def test_agent_fails_flow_if_get_infrastructure_fails(
self, orion_client, deployment, mock_infrastructure_run
):
flow_run = await orion_client.create_flow_run_from_deployment(
deployment.id,
state=Scheduled(scheduled_time=pendulum.now("utc")),
)

async with OrionAgent(
work_queues=[deployment.work_queue_name], prefetch_seconds=10
) as agent:
agent.submitting_flow_run_ids.add(flow_run.id)
agent.logger = MagicMock()
agent.get_infrastructure = AsyncMock(side_effect=ValueError("Bad!"))

await agent.submit_run(flow_run)

mock_infrastructure_run.assert_not_called()
assert flow_run.id not in agent.submitting_flow_run_ids
agent.logger.exception.assert_called_once_with(
f"Failed to get infrastructure for flow run '{flow_run.id}'."
)

state = (await orion_client.read_flow_run(flow_run.id)).state
assert state.is_failed()
result = await orion_client.resolve_datadoc(state.data)
with pytest.raises(ValueError, match="Bad!"):
raise result

async def test_agent_fails_flow_if_infrastructure_submission_fails(
self, orion_client, deployment, mock_infrastructure_run
):
Expand Down

0 comments on commit a1ef0b4

Please sign in to comment.