Skip to content

Commit

Permalink
add scheduled state for delegated_ops collection (#4810)
Browse files Browse the repository at this point in the history
* add pending and pending_at states for being used in scheduler

* fix update_run_state to properly handle pending state

* Add missing quote

* Refactor DO doc for clarity

* more refactoring in DO doc

* add unit tests for new methods

* update terminology from pending to scheduled

* update missed pending to scheduled
  • Loading branch information
CamronStaley authored Sep 20, 2024
1 parent dacedbc commit 12aec6e
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 34 deletions.
4 changes: 2 additions & 2 deletions docs/source/cli/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,9 @@ List delegated operations.
only list operations for this dataset
-s STATE, --state STATE
only list operations with this state. Supported
values are ('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')
values are ('SCHEDULED', 'QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')
--sort-by SORT_BY how to sort the operations. Supported values are
('QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')
('SCHEDULED_AT', 'QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')
--reverse whether to sort in reverse order
-l LIMIT, --limit LIMIT
a maximum number of operations to show
Expand Down
6 changes: 3 additions & 3 deletions fiftyone/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3150,15 +3150,15 @@ def setup(parser):
default=None,
help=(
"only list operations with this state. Supported values are "
"('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')"
"('SCHEDULED', 'QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')"
),
)
parser.add_argument(
"--sort-by",
default="QUEUED_AT",
help=(
"how to sort the operations. Supported values are "
"('QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')"
"('SCHEDULED_AT', 'QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')"
),
)
parser.add_argument(
Expand Down Expand Up @@ -3390,7 +3390,7 @@ def setup(parser):
default=None,
help=(
"delete operations in this state. Supported values are "
"('QUEUED', 'COMPLETED', 'FAILED')"
"('SCHEDULED', 'QUEUED', 'COMPLETED', 'FAILED')"
),
)
parser.add_argument(
Expand Down
1 change: 1 addition & 0 deletions fiftyone/factory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class SortByField(object):
"""Sort by enum for delegated operations."""

UPDATED_AT = "updated_at"
SCHEDULED_AT = "scheduled_at"
QUEUED_AT = "queued_at"
COMPLETED_AT = "completed_at"
STARTED_AT = "started_at"
Expand Down
46 changes: 46 additions & 0 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ def get_queued_operations(
"subclass must implement get_queued_operations()"
)

def get_scheduled_operations(
self, operator: str = None, dataset_name=None
) -> List[DelegatedOperationDocument]:
"""Get all scheduled operations."""
raise NotImplementedError(
"subclass must implement get_scheduled_operations()"
)

def get_running_operations(
self, operator: str = None, dataset_name=None
) -> List[DelegatedOperationDocument]:
"""Get all running operations."""
raise NotImplementedError(
"subclass must implement get_running_operations()"
)

def list_operations(
self,
operator: str = None,
Expand Down Expand Up @@ -275,6 +291,14 @@ def update_run_state(
"updated_at": datetime.utcnow(),
}
}
elif run_state == ExecutionRunState.SCHEDULED:
update = {
"$set": {
"run_state": run_state,
"scheduled_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
}

if run_link is not None:
update["$set"]["run_link"] = run_link
Expand Down Expand Up @@ -341,6 +365,28 @@ def get_queued_operations(
run_state=ExecutionRunState.QUEUED,
)

def get_scheduled_operations(
self,
operator: str = None,
dataset_name: ObjectId = None,
) -> List[DelegatedOperationDocument]:
return self.list_operations(
operator=operator,
dataset_name=dataset_name,
run_state=ExecutionRunState.SCHEDULED,
)

def get_running_operations(
self,
operator: str = None,
dataset_name: ObjectId = None,
) -> List[DelegatedOperationDocument]:
return self.list_operations(
operator=operator,
dataset_name=dataset_name,
run_state=ExecutionRunState.RUNNING,
)

def list_operations(
self,
operator: str = None,
Expand Down
42 changes: 20 additions & 22 deletions fiftyone/factory/repos/delegated_operation_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,36 @@ def __init__(
self.pinned = False
self.completed_at = None
self.failed_at = None
self.scheduled_at = None
self.result = None
self.id = None
self._doc = None
self.metadata = None

def from_pymongo(self, doc: dict):
# required fields
self.operator = doc["operator"]
self.queued_at = doc["queued_at"]
self.run_state = doc["run_state"]
self.label = doc["label"] if "label" in doc else None
self.updated_at = doc["updated_at"] if "updated_at" in doc else None
self.operator = doc.get("operator")
self.queued_at = doc.get("queued_at")
self.run_state = doc.get("run_state")

# optional fields
self.delegation_target = (
doc["delegation_target"] if "delegation_target" in doc else None
)
self.started_at = doc["started_at"] if "started_at" in doc else None
self.completed_at = (
doc["completed_at"] if "completed_at" in doc else None
)
self.failed_at = doc["failed_at"] if "failed_at" in doc else None
self.pinned = doc["pinned"] if "pinned" in doc else None
self.dataset_id = doc["dataset_id"] if "dataset_id" in doc else None
self.run_link = doc["run_link"] if "run_link" in doc else None
self.delegation_target = doc.get("delegation_target", None)
self.started_at = doc.get("started_at", None)
self.completed_at = doc.get("completed_at", None)
self.failed_at = doc.get("failed_at", None)
self.scheduled_at = doc.get("scheduled_at", None)
self.pinned = doc.get("pinned", None)
self.dataset_id = doc.get("dataset_id", None)
self.run_link = doc.get("run_link", None)
self.metadata = doc.get("metadata", None)
self.label = doc.get("label", None)
self.updated_at = doc.get("updated_at", None)

# internal fields
self.id = doc["_id"]
self._doc = doc

# nested fields
if (
"context" in doc
and doc["context"] is not None
Expand Down Expand Up @@ -100,12 +104,6 @@ def from_pymongo(self, doc: dict):
if "updated_at" in doc["status"]:
self.status.updated_at = doc["status"]["updated_at"]

# internal fields
self.id = doc["_id"]
self._doc = doc

self.metadata = doc["metadata"] if "metadata" in doc else None

return self

def to_pymongo(self) -> dict:
Expand Down
39 changes: 39 additions & 0 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ def set_running(self, doc_id, progress=None, run_link=None):
progress=progress,
)

def set_scheduled(self, doc_id):
"""Sets the given delegated operation to scheduled state.
Args:
doc_id: the ID of the delegated operation
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.update_run_state(
_id=doc_id, run_state=ExecutionRunState.SCHEDULED
)

def set_completed(
self,
doc_id,
Expand Down Expand Up @@ -235,6 +246,34 @@ def get_queued_operations(self, operator=None, dataset_name=None):
operator=operator, dataset_name=dataset_name
)

def get_scheduled_operations(self, operator=None, dataset_name=None):
"""Returns all scheduled delegated operations.
Args:
operator (None): the optional name of the operator to return all
the scheduled delegated operations for
dataset_name (None): the optional name of the dataset to return all
the scheduled delegated operations for
Returns:
a list of :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.get_scheduled_operations(
operator=operator, dataset_name=dataset_name
)

def get_running_operations(self, operator=None, dataset_name=None):
"""Returns all running delegated operations.
Args:
operator (None): the optional name of the operator to return all
the running delegated operations for
dataset_name (None): the optional name of the dataset to return all
the running delegated operations for
Returns:
a list of :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.get_running_operations(
operator=operator, dataset_name=dataset_name
)

def get(self, doc_id):
"""Returns the delegated operation with the given ID.
Expand Down
1 change: 1 addition & 0 deletions fiftyone/operators/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
class ExecutionRunState(object):
"""Enumeration of the available operator run states."""

SCHEDULED = "scheduled"
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
Expand Down
20 changes: 13 additions & 7 deletions tests/unittests/delegated_operators_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,7 @@ def test_delegate_operation(
self.assertIsNotNone(doc2.metadata)
self.assertEqual(doc2.metadata, doc2_metadata)

def test_list_queued_operations(
self, mock_get_operator, mock_operator_exists
):
def test_list_operations(self, mock_get_operator, mock_operator_exists):
dataset_name = f"test_dataset_{ObjectId()}"
dataset = Dataset(dataset_name, _create=True, persistent=True)
dataset.save()
Expand All @@ -248,9 +246,8 @@ def test_list_queued_operations(

# get all the existing counts of queued operations
initial_queued = len(self.svc.get_queued_operations())
initial_running = len(
self.svc.list_operations(run_state=ExecutionRunState.RUNNING)
)
initial_running = len(self.svc.get_running_operations())
initial_scheduled = len(self.svc.get_scheduled_operations())
initial_dataset_queued = len(
self.svc.get_queued_operations(dataset_name=dataset_name)
)
Expand Down Expand Up @@ -306,9 +303,18 @@ def test_list_queued_operations(
queued = self.svc.get_queued_operations()
self.assertEqual(len(queued), 10 + initial_queued)

running = self.svc.list_operations(run_state=ExecutionRunState.RUNNING)
running = self.svc.get_running_operations()
self.assertEqual(len(running), 10 + initial_running)

for doc in docs_to_run:
self.svc.set_scheduled(doc)

queued = self.svc.get_queued_operations()
self.assertEqual(len(queued), 10 + initial_queued)

scheduled = self.svc.get_scheduled_operations()
self.assertEqual(len(scheduled), 10 + initial_scheduled)

dataset.delete()
dataset2.delete()

Expand Down

0 comments on commit 12aec6e

Please sign in to comment.