Skip to content

Commit

Permalink
add support for transitioning delegated operators to queued run_state
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-vx51 committed Oct 4, 2024
1 parent c1de2fd commit 76feea9
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 13 deletions.
8 changes: 8 additions & 0 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ def update_run_state(
"updated_at": datetime.utcnow(),
}
}
elif run_state == ExecutionRunState.QUEUED:
update = {
"$set": {
"run_state": run_state,
"queued_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
}

if run_link is not None:
update["$set"]["run_link"] = run_link
Expand Down
15 changes: 15 additions & 0 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ def set_scheduled(self, doc_id, required_state: ExecutionRunState = None):
_id=doc_id, run_state=ExecutionRunState.SCHEDULED, required_state=required_state
)

def set_queued(self, doc_id, required_state: ExecutionRunState = None):
"""Sets the given delegated operation to queued state.
Args:
doc_id: the ID of the delegated operation
required_state (ExecutionRunState): an optional required state of the operation. If provided, the
update will only be performed if the referenced operation matches this state.
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.update_run_state(
_id=doc_id, run_state=ExecutionRunState.QUEUED, required_state=required_state
)

def set_completed(
self,
doc_id,
Expand Down
95 changes: 82 additions & 13 deletions tests/unittests/delegated_operators_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ def test_list_operations(self, mock_get_operator, mock_operator_exists):
operator = "@voxelfiftyone/operator/foo"
operator2 = "@voxelfiftyone/operator/bar"

docs_to_run = []
dynamic_docs = []
static_docs = []

# get all the existing counts of queued operations
initial_queued = len(self.svc.get_queued_operations())
Expand Down Expand Up @@ -271,7 +272,7 @@ def test_list_operations(self, mock_get_operator, mock_operator_exists):
)
self.docs_to_delete.append(doc)
# pylint: disable=no-member
docs_to_run.append(doc.id)
dynamic_docs.append(doc.id)

for i in range(10):
doc = self.svc.queue_operation(
Expand All @@ -287,33 +288,101 @@ def test_list_operations(self, mock_get_operator, mock_operator_exists):
),
)
self.docs_to_delete.append(doc)
static_docs.append(doc.id)

queued = self.svc.get_queued_operations()
self.assertEqual(len(queued), 20 + initial_queued)
# dynamic + static docs should be queued
self.assertEqual(len(queued), len(dynamic_docs) + len(static_docs) + initial_queued)

queued = self.svc.get_queued_operations(dataset_name=dataset_name)
self.assertEqual(len(queued), 10 + initial_dataset_queued)
# dataset_name corresponds to dynamic docs
self.assertEqual(len(queued), len(dynamic_docs) + initial_dataset_queued)

queued = self.svc.get_queued_operations(operator=operator)
self.assertEqual(len(queued), 10 + initial_operator_queued)
# operator corresponds to dynamic docs
self.assertEqual(len(queued), len(dynamic_docs) + initial_operator_queued)

for doc in docs_to_run:
self.svc.set_running(doc)
# test set_running behavior
for doc_id in dynamic_docs:
self.svc.set_running(doc_id)

queued = self.svc.get_queued_operations()
self.assertEqual(len(queued), 10 + initial_queued)
# static docs should be `queued`
self.assertEqual(len(queued), len(static_docs) + initial_queued)

running = self.svc.get_running_operations()
self.assertEqual(len(running), 10 + initial_running)
# dynamic docs should be `running`
self.assertEqual(len(running), len(dynamic_docs) + initial_running)

for doc in docs_to_run:
self.svc.set_scheduled(doc)
# test set_scheduled behavior
for doc_id in dynamic_docs:
self.svc.set_scheduled(doc_id)

queued = self.svc.get_queued_operations()
self.assertEqual(len(queued), 10 + initial_queued)
# static docs should be `queued`
self.assertEqual(len(queued), len(static_docs) + initial_queued)

scheduled = self.svc.get_scheduled_operations()
self.assertEqual(len(scheduled), 10 + initial_scheduled)
# dynamic docs should be `scheduled`
self.assertEqual(len(scheduled), len(dynamic_docs) + initial_scheduled)

# test set_queued(id) behavior
for doc_id in dynamic_docs:
self.svc.set_queued(doc_id)

queued = self.svc.get_queued_operations()
# dynamic + static docs should be `queued`
self.assertEqual(len(queued), len(dynamic_docs) + len(static_docs) + initial_queued)

# test set_queued(id, current_state=...) behavior
# set_queued(id, current_state=...) should only transition elements matching `current_state`

subset_size = 4
non_subset_size = len(dynamic_docs) - subset_size
# transition a subset of docs to `scheduled`
for doc_id in dynamic_docs[:subset_size]:
self.svc.set_scheduled(doc_id)
# transition the other dynamic docs to `running` (just not `scheduled` or `queued`)
for doc_id in dynamic_docs[subset_size:]:
self.svc.set_running(doc_id)

scheduled = self.svc.get_scheduled_operations()
# subset should be `scheduled`
self.assertEqual(len(scheduled), subset_size + initial_scheduled)

running = self.svc.get_running_operations()
# non-subset should be `running`
self.assertEqual(len(running), non_subset_size + initial_running)

queued = self.svc.get_queued_operations()
# static docs should be `queued`
self.assertEqual(len(queued), len(static_docs) + initial_queued)

return_values = []
for doc_id in dynamic_docs:
# attempt to transition from scheduled to queued
return_values.append(
self.svc.set_queued(doc_id, required_state=ExecutionRunState.SCHEDULED)
)

# set_queued should return the updated doc if a transition occurred
for result in return_values[:subset_size]:
self.assertIsNotNone(result)
# set_queued should return `None` if no transition occurred
for result in return_values[subset_size:]:
self.assertIsNone(result)

queued = self.svc.get_queued_operations()
# subset + static docs should be `queued`
self.assertEqual(len(queued), subset_size + len(static_docs) + initial_queued)

scheduled = self.svc.get_scheduled_operations()
# only initial docs should still be `scheduled`
self.assertEqual(len(scheduled), initial_scheduled)

running = self.svc.get_running_operations()
# non-subset should still be `running`
self.assertEqual(len(running), non_subset_size + initial_running)

dataset.delete()
dataset2.delete()
Expand Down

0 comments on commit 76feea9

Please sign in to comment.