Skip to content

Commit

Permalink
Load flows for deployments in a worker thread (#6340)
Browse files Browse the repository at this point in the history
If not run in a worker thread, it is executed in an "async" context which means that any methods decorated by `sync_compatible` will return a coroutine. However, top-level code in a script is always intended to be run in a "sync" context, so the user can never `await` the coroutines and code that works without a deployment will break while using a deployment.
  • Loading branch information
zanieb committed Aug 11, 2022
1 parent 89e8374 commit fe06be9
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/prefect/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from prefect.infrastructure import DockerContainer, KubernetesJob, Process
from prefect.logging.loggers import flow_run_logger
from prefect.orion import schemas
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.utilities.callables import ParameterSchema
from prefect.utilities.dispatch import lookup_type
from prefect.utilities.filesystem import tmpchdir
Expand Down Expand Up @@ -54,7 +55,7 @@ async def load_flow_from_flow_run(
if deployment.manifest_path:
with open(deployment.manifest_path, "r") as f:
import_path = json.load(f)["import_path"]
flow = import_object(import_path)
flow = await run_sync_in_worker_thread(import_object, import_path)
return flow


Expand Down

0 comments on commit fe06be9

Please sign in to comment.