Skip to content

Commit

Permalink
Agents with dynamic work queue matching (#7099)
Browse files Browse the repository at this point in the history
  • Loading branch information
anticorrelator committed Oct 12, 2022
1 parent 9986f5e commit 39ad6f8
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 27 deletions.
51 changes: 38 additions & 13 deletions src/prefect/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
class OrionAgent:
def __init__(
self,
work_queues: List[str],
work_queues: List[str] = None,
work_queue_prefix: str = None,
prefetch_seconds: int = None,
default_infrastructure: Infrastructure = None,
default_infrastructure_document_id: UUID = None,
Expand All @@ -35,14 +36,16 @@ def __init__(
"Provide only one of 'default_infrastructure' and 'default_infrastructure_document_id'."
)

self.work_queues: Set[str] = set(work_queues)
self.work_queues: Set[str] = set(work_queues) if work_queues else set()
self.prefetch_seconds = prefetch_seconds
self.submitting_flow_run_ids = set()
self.started = False
self.logger = get_logger("agent")
self.task_group: Optional[anyio.abc.TaskGroup] = None
self.client: Optional[OrionClient] = None

self.work_queue_prefix = work_queue_prefix

self._work_queue_cache_expiration: pendulum.DateTime = None
self._work_queue_cache: List[WorkQueue] = []

Expand All @@ -58,6 +61,23 @@ def __init__(
self.default_infrastructure = Process()
self.default_infrastructure_document_id = None

async def update_matched_agent_work_queues(self):
if self.work_queue_prefix:
matched_queues = await self.client.match_work_queues(self.work_queue_prefix)
matched_queues = set(q.name for q in matched_queues)
if matched_queues != self.work_queues:
new_queues = matched_queues - self.work_queues
removed_queues = self.work_queues - matched_queues
if new_queues:
self.logger.info(
f"Matched new work queues: {', '.join(new_queues)}"
)
if removed_queues:
self.logger.info(
f"Work queues no longer matched: {', '.join(removed_queues)}"
)
self.work_queues = matched_queues

async def get_work_queues(self) -> Iterator[WorkQueue]:
"""
Loads the work queue objects corresponding to the agent's target work
Expand All @@ -76,23 +96,28 @@ async def get_work_queues(self) -> Iterator[WorkQueue]:
self._work_queue_cache.clear()
self._work_queue_cache_expiration = now.add(seconds=30)

await self.update_matched_agent_work_queues()

for name in self.work_queues:
try:
work_queue = await self.client.read_work_queue_by_name(name)
except ObjectNotFound:

# if the work queue wasn't found, create it
try:
work_queue = await self.client.create_work_queue(name=name)
self.logger.info(f"Created work queue '{name}'.")

# if creating it raises an exception, it was probably just
# created by some other agent; rather than entering a re-read
# loop with new error handling, we log the exception and
# continue.
except Exception as exc:
self.logger.exception(f"Failed to create work queue {name!r}.")
continue
if not self.work_queue_prefix:
# do not attempt to create work queues if the agent is polling for
# queues using a regex
try:
work_queue = await self.client.create_work_queue(name=name)
self.logger.info(f"Created work queue '{name}'.")

# if creating it raises an exception, it was probably just
# created by some other agent; rather than entering a re-read
# loop with new error handling, we log the exception and
# continue.
except Exception as exc:
self.logger.exception(f"Failed to create work queue {name!r}.")
continue

self._work_queue_cache.append(work_queue)
yield work_queue
Expand Down
34 changes: 26 additions & 8 deletions src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ async def start(
"--work-queue",
help="One or more work queue names for the agent to pull from.",
),
work_queue_prefix: str = typer.Option(
None,
"-m",
"--match",
help=(
"Dynamically matches work queue names with the specified prefix for the agent to pull from,"
"for example `dev-` will match all work queues with a name that starts with `dev-`"
),
),
hide_welcome: bool = typer.Option(False, "--hide-welcome"),
api: str = SettingsOption(PREFECT_API_URL),
# deprecated tags
Expand Down Expand Up @@ -76,11 +85,12 @@ async def start(
style="blue",
)

if not work_queues and not tags:
if not work_queues and not tags and not work_queue_prefix:
exit_with_error("No work queues provided!", style="red")
elif work_queues and tags:
elif bool(work_queues) + bool(tags) + bool(work_queue_prefix) > 1:
exit_with_error(
"Either `work_queues` or `tags` can be provided, but not both.", style="red"
"Only one of `work_queues`, `match`, or `tags` can be provided.",
style="red",
)

if tags:
Expand Down Expand Up @@ -115,13 +125,21 @@ async def start(
f"Starting v{prefect.__version__} agent with ephemeral API..."
)

async with OrionAgent(work_queues=work_queues) as agent:
async with OrionAgent(
work_queues=work_queues, work_queue_prefix=work_queue_prefix
) as agent:
if not hide_welcome:
app.console.print(ascii_name)
app.console.print(
"Agent started! Looking for work from "
f"queue(s): {', '.join(work_queues)}..."
)
if work_queue_prefix:
app.console.print(
"Agent started! Looking for work from "
f"queue(s) that start with the prefix: {work_queue_prefix}..."
)
else:
app.console.print(
"Agent started! Looking for work from "
f"queue(s): {', '.join(work_queues)}..."
)

await critical_service_loop(
agent.get_and_submit_flow_runs,
Expand Down
53 changes: 53 additions & 0 deletions src/prefect/cli/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from prefect.context import PrefectObjectRegistry, registry_from_script
from prefect.deployments import Deployment, load_deployments_from_yaml
from prefect.exceptions import (
ObjectAlreadyExists,
ObjectNotFound,
PrefectHTTPStatusError,
ScriptError,
Expand Down Expand Up @@ -85,6 +86,38 @@ async def get_deployment(client, name, deployment_id):
return deployment


async def create_work_queue_and_set_concurrency_limit(
work_queue_name, work_queue_concurrency
):
async with get_client() as client:
if work_queue_concurrency is not None and work_queue_name:
try:
try:
res = await client.create_work_queue(name=work_queue_name)
except ObjectAlreadyExists:
res = await client.read_work_queue_by_name(name=work_queue_name)
if res.concurrency_limit != work_queue_concurrency:
app.console.print(
f"Work queue {work_queue_name!r} already exists with a concurrency limit of {res.concurrency_limit}, this limit is being updated...",
style="red",
)
await client.update_work_queue(
res.id, concurrency_limit=work_queue_concurrency
)
app.console.print(
f"Updated concurrency limit on work queue {work_queue_name!r} to {work_queue_concurrency}",
style="green",
)
except Exception as exc:
exit_with_error(
f"Failed to set concurrency limit on work queue {work_queue_name}."
)
elif work_queue_concurrency:
app.console.print(
f"No work queue set! The concurrency limit cannot be updated."
)


class RichTextIO:
def __init__(self, console, prefix: str = None) -> None:
self.console = console
Expand Down Expand Up @@ -375,6 +408,12 @@ async def apply(
"--upload",
help="A flag that, when provided, uploads this deployment's files to remote storage.",
),
work_queue_concurrency: int = typer.Option(
None,
"--limit",
"-l",
help="Sets the concurrency limit on the work queue that handles this deployment's runs",
),
):
"""
Create or update a deployment from a YAML file.
Expand All @@ -387,6 +426,10 @@ async def apply(
except Exception as exc:
exit_with_error(f"'{path!s}' did not conform to deployment spec: {exc!r}")

await create_work_queue_and_set_concurrency_limit(
deployment.work_queue_name, work_queue_concurrency
)

if upload:
if (
deployment.storage
Expand Down Expand Up @@ -507,6 +550,12 @@ async def build(
"Note that if a work queue is not set, work will not be scheduled."
),
),
work_queue_concurrency: int = typer.Option(
None,
"--limit",
"-l",
help="Sets the concurrency limit on the work queue that handles this deployment's runs",
),
infra_type: InfrastructureSlugs = typer.Option(
None,
"--infra",
Expand Down Expand Up @@ -733,6 +782,10 @@ async def build(
style="green",
)

await create_work_queue_and_set_concurrency_limit(
deployment.work_queue_name, work_queue_concurrency
)

# we process these separately for informative output
if not skip_upload:
if (
Expand Down
13 changes: 11 additions & 2 deletions src/prefect/cli/work_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ async def inspect(
async def ls(
verbose: bool = typer.Option(
False, "--verbose", "-v", help="Display more information."
)
),
work_queue_prefix: str = typer.Option(
None,
"--match",
"-m",
help="Will match work queues with names that start with the specified prefix string",
),
):
"""
View all work queues.
Expand All @@ -221,7 +227,10 @@ async def ls(
table.add_column("Filter (Deprecated)", style="magenta", no_wrap=True)

async with get_client() as client:
queues = await client.read_work_queues()
if work_queue_prefix is not None:
queues = await client.match_work_queues(work_queue_prefix)
else:
queues = await client.read_work_queues()

sort_by_created_key = lambda q: pendulum.now("utc") - q.created

Expand Down
32 changes: 32 additions & 0 deletions src/prefect/client/orion.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,38 @@ async def read_work_queues(
response = await self._client.post(f"/work_queues/filter", json=body)
return pydantic.parse_obj_as(List[schemas.core.WorkQueue], response.json())

async def match_work_queues(
self,
prefix: str,
) -> List[schemas.core.WorkQueue]:
"""
Query Orion for work queues with names with a specific prefix.
Args:
prefix: a string used to match work queue name prefixes
Returns:
a list of [WorkQueue model][prefect.orion.schemas.core.WorkQueue] representations
of the work queues
"""
page_length = 100
current_page = 0
work_queues = []

while True:
new_queues = await self.read_work_queues(
offset=current_page * page_length, limit=page_length
)
if not new_queues:
break
filtered_queues = list(
filter(lambda q: q.name.startswith(prefix), new_queues)
)
work_queues += filtered_queues
current_page += 1

return work_queues

async def delete_work_queue_by_id(
self,
id: UUID,
Expand Down
16 changes: 15 additions & 1 deletion src/prefect/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,15 @@ async def upload_to_storage(
return file_count

@sync_compatible
async def apply(self, upload: bool = False) -> UUID:
async def apply(
self, upload: bool = False, work_queue_concurrency: int = None
) -> UUID:
"""
Registers this deployment with the API and returns the deployment's ID.
Args:
upload: if True, deployment files are automatically uploaded to remote storage
work_queue_concurrency: If provided, sets the concurrency limit on the deployment's work queue
"""
if not self.name or not self.flow_name:
raise ValueError("Both a deployment name and flow name must be set.")
Expand All @@ -604,6 +607,17 @@ async def apply(self, upload: bool = False) -> UUID:
if upload:
await self.upload_to_storage()

if self.work_queue_name and work_queue_concurrency is not None:
try:
res = await client.create_work_queue(name=self.work_queue_name)
except ObjectAlreadyExists:
res = await client.read_work_queue_by_name(
name=self.work_queue_name
)
await client.update_work_queue(
res.id, concurrency_limit=work_queue_concurrency
)

# we assume storage was already saved
storage_document_id = getattr(self.storage, "_block_document_id", None)

Expand Down
1 change: 0 additions & 1 deletion src/prefect/orion/api/work_queues.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Routes for interacting with work queue objects.
"""

from typing import List, Optional
from uuid import UUID

Expand Down
18 changes: 16 additions & 2 deletions tests/cli/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,26 @@ def test_start_agent_with_no_args():
def test_start_agent_with_work_queue_and_tags():
invoke_and_assert(
command=["agent", "start", "hello", "-t", "blue"],
expected_output_contains="Either `work_queues` or `tags` can be provided, but not both.",
expected_output_contains="Only one of `work_queues`, `match`, or `tags` can be provided.",
expected_code=1,
)

invoke_and_assert(
command=["agent", "start", "-q", "hello", "-t", "blue"],
expected_output_contains="Either `work_queues` or `tags` can be provided, but not both.",
expected_output_contains="Only one of `work_queues`, `match`, or `tags` can be provided.",
expected_code=1,
)


def test_start_agent_with_regex_and_work_queue():
invoke_and_assert(
command=["agent", "start", "hello", "-m", "blue"],
expected_output_contains="Only one of `work_queues`, `match`, or `tags` can be provided.",
expected_code=1,
)

invoke_and_assert(
command=["agent", "start", "-q", "hello", "--match", "blue"],
expected_output_contains="Only one of `work_queues`, `match`, or `tags` can be provided.",
expected_code=1,
)
Loading

0 comments on commit 39ad6f8

Please sign in to comment.