diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index 1d4cf54fb9f9d..95716f3b4b503 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -14,11 +14,10 @@ from ray.workflow.common import ( WorkflowStatus, Event, - WorkflowRunningError, - WorkflowNotFoundError, asyncio_run, validate_user_metadata, ) +from ray.workflow.exceptions import WorkflowRunningError, WorkflowNotFoundError from ray.workflow import serialization from ray.workflow.event_listener import EventListener, EventListenerType, TimerListener from ray.workflow import workflow_access @@ -459,7 +458,11 @@ def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any] ValueError: if given workflow or workflow step does not exist. """ _ensure_workflow_initialized() - return execution.get_metadata(workflow_id, name) + store = get_workflow_storage(workflow_id) + if name is None: + return store.load_workflow_metadata() + else: + return store.load_step_metadata(name) @PublicAPI(stability="beta") diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index a29610e902eab..4d419864fd84c 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -201,20 +201,3 @@ class WorkflowExecutionMetadata: class WorkflowMetaData: # The current status of the workflow status: WorkflowStatus - - -@PublicAPI(stability="beta") -class WorkflowNotFoundError(Exception): - def __init__(self, workflow_id: str): - self.message = f"Workflow[id={workflow_id}] was referenced but doesn't exist." - super().__init__(self.message) - - -@PublicAPI(stability="beta") -class WorkflowRunningError(Exception): - def __init__(self, operation: str, workflow_id: str): - self.message = ( - f"{operation} couldn't be completed becasue " - f"Workflow[id={workflow_id}] is still running." - ) - super().__init__(self.message) diff --git a/python/ray/workflow/exceptions.py b/python/ray/workflow/exceptions.py index 4fbebd31a622c..e7d3b12dcb1c1 100644 --- a/python/ray/workflow/exceptions.py +++ b/python/ray/workflow/exceptions.py @@ -35,3 +35,20 @@ class WorkflowNotResumableError(WorkflowError): def __init__(self, workflow_id: str): self.message = f"Workflow[id={workflow_id}] is not resumable." super().__init__(self.message) + + +@PublicAPI(stability="beta") +class WorkflowNotFoundError(WorkflowError): + def __init__(self, workflow_id: str): + self.message = f"Workflow[id={workflow_id}] was referenced but doesn't exist." + super().__init__(self.message) + + +@PublicAPI(stability="beta") +class WorkflowRunningError(WorkflowError): + def __init__(self, operation: str, workflow_id: str): + self.message = ( + f"{operation} couldn't be completed becasue " + f"Workflow[id={workflow_id}] is still running." + ) + super().__init__(self.message) diff --git a/python/ray/workflow/execution.py b/python/ray/workflow/execution.py index 6a70b364a3639..410a48981644f 100644 --- a/python/ray/workflow/execution.py +++ b/python/ray/workflow/execution.py @@ -1,6 +1,6 @@ import logging import time -from typing import Set, List, Tuple, Optional, Dict, Any +from typing import Set, List, Tuple, Optional, Dict import uuid import ray @@ -135,17 +135,6 @@ def get_status(workflow_id: str) -> Optional[WorkflowStatus]: return ray.get(workflow_manager.get_workflow_status.remote(workflow_id)) -def get_metadata(workflow_id: str, name: Optional[str]) -> Dict[str, Any]: - """Get the metadata of the workflow. - See "api.get_metadata()" for details. - """ - store = workflow_storage.get_workflow_storage(workflow_id) - if name is None: - return store.load_workflow_metadata() - else: - return store.load_step_metadata(name) - - def list_all(status_filter: Set[WorkflowStatus]) -> List[Tuple[str, WorkflowStatus]]: try: workflow_manager = get_management_actor() diff --git a/python/ray/workflow/tests/test_storage.py b/python/ray/workflow/tests/test_storage.py index e1b0242405995..430652da3410c 100644 --- a/python/ray/workflow/tests/test_storage.py +++ b/python/ray/workflow/tests/test_storage.py @@ -10,9 +10,9 @@ from ray.workflow import workflow_storage from ray.workflow.common import ( StepType, - WorkflowNotFoundError, WorkflowStepRuntimeOptions, ) +from ray.workflow.exceptions import WorkflowNotFoundError from ray.workflow import serialization_context from ray.workflow.tests import utils diff --git a/python/ray/workflow/tests/test_workflow_manager.py b/python/ray/workflow/tests/test_workflow_manager.py index e5500b1979e1a..7cedcdba199ed 100644 --- a/python/ray/workflow/tests/test_workflow_manager.py +++ b/python/ray/workflow/tests/test_workflow_manager.py @@ -5,8 +5,10 @@ def test_workflow_manager_simple(workflow_start_regular): + from ray.workflow.exceptions import WorkflowNotFoundError + assert [] == workflow.list_all() - with pytest.raises(workflow.common.WorkflowNotFoundError): + with pytest.raises(WorkflowNotFoundError): workflow.get_status("X") diff --git a/python/ray/workflow/workflow_access.py b/python/ray/workflow/workflow_access.py index 184868d224af9..98ead63253a84 100644 --- a/python/ray/workflow/workflow_access.py +++ b/python/ray/workflow/workflow_access.py @@ -6,12 +6,13 @@ import ray from ray.workflow import common -from ray.workflow.common import WorkflowStatus, TaskID, WorkflowNotFoundError +from ray.workflow.common import WorkflowStatus, TaskID from ray.workflow import workflow_state_from_storage from ray.workflow import workflow_context from ray.workflow import workflow_storage from ray.workflow.exceptions import ( WorkflowCancellationError, + WorkflowNotFoundError, WorkflowNotResumableError, ) from ray.workflow.workflow_executor import WorkflowExecutor diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index c66018ed8fb57..5514fa975df15 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -17,9 +17,9 @@ from ray.workflow.common import ( TaskID, WorkflowStatus, - WorkflowNotFoundError, WorkflowStepRuntimeOptions, ) +from ray.workflow.exceptions import WorkflowNotFoundError from ray.workflow import workflow_context from ray.workflow import serialization from ray.workflow import serialization_context