diff --git a/docs/source/images/plugins/operators/operator-execute-button.png b/docs/source/images/plugins/operators/operator-execute-button.png new file mode 100644 index 0000000000..ba4bd0d1db Binary files /dev/null and b/docs/source/images/plugins/operators/operator-execute-button.png differ diff --git a/docs/source/images/plugins/operators/operator-user-delegation.png b/docs/source/images/plugins/operators/operator-user-delegation.png new file mode 100644 index 0000000000..82a7f82c6c Binary files /dev/null and b/docs/source/images/plugins/operators/operator-user-delegation.png differ diff --git a/docs/source/plugins/developing_plugins.rst b/docs/source/plugins/developing_plugins.rst index 16cf260348..80f747436b 100644 --- a/docs/source/plugins/developing_plugins.rst +++ b/docs/source/plugins/developing_plugins.rst @@ -626,6 +626,12 @@ subsequent sections. icon="/assets/icon.svg", light_icon="/assets/icon-light.svg", # light theme only dark_icon="/assets/icon-dark.svg", # dark theme only + + # Whether the operator supports immediate and/or delegated execution + allow_immediate_execution=True/False, # default True + allow_delegated_execution=True/False, # default False + default_choice_to_delegated=True/False, # default False + resolve_execution_options_on_change=None, ) def resolve_placement(self, ctx): @@ -658,8 +664,8 @@ subsequent sections. ) def resolve_input(self, ctx): - """Implement this method if your operator can render a form to - collect user inputs. + """Implement this method to collect user inputs as parameters + that are stored in `ctx.params`. Returns: a `types.Property` defining the form's components @@ -680,13 +686,30 @@ subsequent sections. return types.Property(inputs, view=types.View(label="Example operator")) def resolve_delegation(self, ctx): - """Implement this method if you want to programmatically determine - whether to delegate execution of this operation based on `ctx`. + """Implement this method if you want to programmatically *force* + this operation to be delegated or executed immediately. + + Returns: + whether the operation should be delegated (True), run + immediately (False), or None to defer to + `resolve_execution_options()` to specify the available options + """ + return len(ctx.view) > 1000 # delegate for larger views + + def resolve_execution_options(self, ctx): + """Implement this method if you want to dynamically configure the + execution options available to this operator based on the current + `ctx`. Returns: - True/False + an `ExecutionOptions` instance """ - return len(ctx.view) > 1000 # delegated for larger views + should_delegate = len(ctx.view) > 1000 # delegate for larger views + return foo.ExecutionOptions( + allow_immediate_execution=True, + allow_delegated_execution=True, + default_choice_to_delegated=should_delegate, + ) def execute(self, ctx): """Executes the actual operation based on the hydrated `ctx`. @@ -711,6 +734,7 @@ subsequent sections. for i, sample in enumerate(current_view, 1): # do some computation yield ctx.trigger("set_progress", {"progress": i / n}) + yield ctx.trigger("reload_dataset") return {"value": value, ...} @@ -720,7 +744,7 @@ subsequent sections. to the user. Returns: - a Property defining the components of the output form + a `types.Property` defining the components of the output form """ outputs = types.Object() @@ -785,6 +809,12 @@ execution: icon="/assets/icon.svg", light_icon="/assets/icon-light.svg", # light theme only dark_icon="/assets/icon-dark.svg", # dark theme only + + # Whether the operator supports immediate and/or delegated execution + allow_immediate_execution=True/False, # default True + allow_delegated_execution=True/False, # default False + default_choice_to_delegated=True/False, # default False + resolve_execution_options_on_change=None, ) .. _operator-execution-context: @@ -809,10 +839,15 @@ contains the following properties: - `ctx.selected` - the list of currently selected samples in the App, if any - `ctx.selected_labels` - the list of currently selected labels in the App, if any +- `ctx.delegated` - whether delegated execution has been forced for the + operation +- `ctx.requesting_delegated_execution` - whether delegated execution has been + requested for the operation +- `ctx.delegation_target` - the orchestrator to which the operation should be + delegated, if applicable - `ctx.secrets` - a dict of :ref:`secrets ` for the plugin, if any -- `ctx.results` - a dict containing the outputs of the - :meth:`execute() ` method, if +- `ctx.results` - a dict containing the outputs of the `execute()` method, if it has been called - `ctx.hooks` **(JS only)** - the return value of the operator's `useHooks()` method @@ -940,26 +975,130 @@ workflow orchestrator like :ref:`Apache Airflow ` or run just :ref:`run locally ` in a separate process. -Operators can delegate any or all of its operations by implementing +.. note:: + + Even though delegated operations are run in a separate process or physical + location, they are provided with the same `ctx` that was hydrated by the + operator's :ref:`input form `. + + Refer to :ref:`this section ` for more information + about how delegated operations are executed. + +There are a variety of options available for configuring whether a given +operation should be delegated or executed immediately. + +.. _operator-delegation-configuration: + +Delegation configuration +~~~~~~~~~~~~~~~~~~~~~~~~ + +You can provide the optional properties described below in the +:ref:`operator's config ` to specify the available execution +modes for the operator: + +.. code-block:: python + :linenos: + + @property + def config(self): + return foo.OperatorConfig( + # Other parameters... + + # Whether to allow immediate execution + allow_immediate_execution=True/False, # default True + + # Whether to allow delegated execution + allow_delegated_execution=True/False, # default False + + # Whether the default execution mode should be delegated, if both + # options are available + default_choice_to_delegated=True/False, # default False + + # Whether to resolve execution options dynamically when the + # operator's inputs change. By default, this behavior will match + # the operator's ``dynamic`` setting + resolve_execution_options_on_change=True/False/None, # default None + ) + +When the operator's input form is rendered in the App, the `Execute|Schedule` +button at the bottom of the modal will contextually show whether the operation +will be executed immediately, scheduled for delegated execution, or allow the +user to choose between the supported options if there are multiple: + +.. image:: /images/plugins/operators/operator-execute-button.png + :align: center + +.. _operator-execution-options: + +Execution options +~~~~~~~~~~~~~~~~~ + +Operators can implement +:meth:`resolve_execution_options() ` +to dynamically configure the available execution options based on the current +execution context: + +.. code-block:: python + :linenos: + + # Option 1: recommend delegation for larger views + def resolve_execution_options(self, ctx): + should_delegate = len(ctx.view) > 1000 + return foo.ExecutionOptions( + allow_immediate_execution=True, + allow_delegated_execution=True, + default_choice_to_delegated=should_delegate, + ) + + # Option 2: force delegation for larger views + def resolve_execution_options(self, ctx): + delegate = len(ctx.view) > 1000 + return foo.ExecutionOptions( + allow_immediate_execution=not delegate, + allow_delegated_execution=delegate, + ) + +If implemented, this method will override any static execution parameters +included in the :ref:`operator's config ` as described in the +previous section. + +.. _operator-forced-delegation: + +Forced delegation +~~~~~~~~~~~~~~~~~ + +Operators can implement :meth:`resolve_delegation() ` -as shown below: +to force a particular operation to be delegated (by returning `True`) or +executed immediately (by returning `False`) based on the current execution +context. + +For example, you could decide whether to delegate execution based on the size +of the current view: .. code-block:: python :linenos: def resolve_delegation(self, ctx): - return len(ctx.view) > 1000 # delegate for larger views + # Force delegation for large views and immediate execution for small views + return len(ctx.view) > 1000 + +.. note:: -As demonstrated above, you can use the -:ref:`execution context ` to conditionally decide -whether a given operation should be delegated. For example, you could simply -ask the user: + If :meth:`resolve_delegation() ` + is not implemented or returns `None`, then the choice of execution mode is + deferred to + :meth:`resolve_execution_options() ` + to specify the available execution options as described in the previous + section. + +Alternatively, you could simply ask the user to decide: .. code-block:: python :linenos: def resolve_input(self, ctx): - delegate = ctx.params.get("delegate", False) + delegate = ctx.params.get("delegate", None) if delegate: description = "Uncheck this box to execute the operation immediately" @@ -968,8 +1107,6 @@ ask the user: inputs.bool( "delegate", - default=False, - required=True, label="Delegate execution?", description=description, view=types.CheckboxView(), @@ -990,16 +1127,46 @@ ask the user: ) def resolve_delegation(self, ctx): - return ctx.params.get("delegate", False) + return ctx.params.get("delegate", None) -.. note:: +.. image:: /images/plugins/operators/operator-user-delegation.png + :align: center - Even though delegated operations are run in a separate process or physical - location, they are provided with the same `ctx` that was hydrated by the - operator's :ref:`input form `. +.. _operator-reporting-progress: - Refer to :ref:`this section ` for more information - about how delegated operations are executed. +Reporting progress +~~~~~~~~~~~~~~~~~~ + +Delegated operations can report their execution progress by calling +:meth:`set_progress() ` +on their execution context from within +:meth:`execute() `: + +.. code-block:: python + :linenos: + + import fiftyone.core.storage as fos + import fiftyone.core.utils as fou + + def execute(self, ctx): + images_dir = ctx.params["images_dir"] + + filepaths = fos.list_files(images_dir, abs_paths=True, recursive=True) + + num_added = 0 + num_total = len(filepaths) + for batch in fou.iter_batches(filepaths, 100): + samples = [fo.Sample(filepath=f) for f in batch] + ctx.dataset.add_samples(samples) + + num_added += len(batch) + ctx.set_progress(progress=num_added / num_total) + +.. note:: + + :ref:`FiftyOne Teams ` users can view the current progress + of their delegated operations from the + :ref:`Runs page ` of the Teams App! .. _operator-execution: @@ -1240,9 +1407,12 @@ Operator placement ------------------ By default, operators are only accessible from the -:ref:`operator browser `. However, you can optionally expose -the operation by placing a custom button, icon, menu item, etc. in various -places of the App: +:ref:`operator browser `. However, you can place a custom +button, icon, menu item, etc. in the App that will trigger the operator when +clicked in any location supported by the +:class:`types.Places ` enum. + +For example, you can use: - `types.Places.SAMPLES_GRID_ACTIONS` diff --git a/docs/source/plugins/using_plugins.rst b/docs/source/plugins/using_plugins.rst index 2324105951..56d8c65bad 100644 --- a/docs/source/plugins/using_plugins.rst +++ b/docs/source/plugins/using_plugins.rst @@ -556,15 +556,81 @@ Some Operators perform an immediate action when executed, while other Operators Executing operators via SDK ___________________________ -You can use :func:`execute_operator() ` to -invoke operators programmatically from Python rather than filling out the -operator's input form in the App: +Many operators are intended to be executed programmatically via the SDK in +addition to executing them by filling out their input form in the App. + +.. _calling-operators: + +Calling operators +----------------- + +By convention, operators that are intended to be executed programmatically +should implement `__call__()` so that users have a well-documented interface +for invoking the operator as a function. + +For example, the +`@voxel51/utils/compute_metadata `_ +operator can be invoked like so: .. code-block:: python + :linenos: import fiftyone as fo + import fiftyone.operators as foo import fiftyone.zoo as foz + + dataset = foz.load_zoo_dataset("quickstart") + compute_metadata = foo.get_operator("@voxel51/utils/compute_metadata") + + # Schedule a delegated operation to (re)compute metadata for the dataset + compute_metadata(dataset, overwrite=True, delegate=True) + +.. note:: + + Notice that :func:`get_operator() ` is + used to retrieve the operator by its URI. + +Behind the scenes, the operator's `__call__()` method is implemented as +follows: + +.. code-block:: python + :linenos: + + class ComputeMetadata(foo.Operator): + def __call__( + self, + sample_collection, + overwrite=False, + num_workers=None, + delegate=False, + ): + ctx = dict(view=sample_collection.view()) + params = dict( + overwrite=overwrite, + num_workers=num_workers, + delegate=delegate, + ) + return foo.execute_operator(self.uri, ctx, params=params) + +which simply packages up the provided keyword arguments into the correct format +for the operator's `ctx.params` and then passes them to +:func:`execute_operator() `, which +performs the execution. + +.. _direct-operator-execution: + +Direct execution +---------------- + +You can also programmatically execute any operator by calling +:func:`execute_operator() `: + +.. code-block:: python + :linenos: + + import fiftyone as fo import fiftyone.operators as foo + import fiftyone.zoo as foz dataset = foz.load_zoo_dataset("quickstart") @@ -575,18 +641,69 @@ operator's input form in the App: dataset_type="COCO", labels_path=dict(absolute_path="/tmp/coco/labels.json"), label_field="ground_truth", - delegate=False, + delegate=False, # False: execute immediately, True: delegate ) } result = foo.execute_operator("@voxel51/io/export_samples", ctx) +In the above example, the `delegate=True/False` parameter controls whether +execution happens immediately or is +:ref:`delegated ` because the operator implements +its +:meth:`resolve_delegation() ` +as follows: + +.. code-block:: python + :linenos: + + def resolve_delegation(self, ctx): + return ctx.params.get("delegate", False) + .. note:: - In order to use programmatic execution, you must generally inspect the - operator's :meth:`execute() ` + In general, to use + :func:`execute_operator() ` you must + inspect the operator's + :meth:`execute() ` implementation to understand what parameters are required. +.. _requesting-operator-delegation: + +Requesting delegation +--------------------- + +If an operation supports both immediate and delegated execution as specified +either by its :ref:`configuration ` or +:ref:`execution options `, you can request +delegated execution by passing the `request_delegation=True` flag to +:func:`execute_operator() `: + +.. code-block:: python + :linenos: + + foo.execute_operator(operator_uri, ctx=ctx, request_delegation=True) + +This has the same effect as choosing `Schedule` from the dropdown in the +operator's input modal when executing it from within the App: + +.. image:: /images/plugins/operators/operator-execute-button.png + :align: center + +.. note:: + + :ref:`FiftyOne Teams ` users can also specify an optional + delegation target for their delegated operations: + + .. code-block:: python + + foo.execute_operator( + operator_uri, + ctx=ctx, + request_delegation=True, + delegation_target="overnight", + ) + .. _delegated-operations: Delegated operations diff --git a/fiftyone/operators/__init__.py b/fiftyone/operators/__init__.py index 59f767dd6d..d90e6d3c86 100644 --- a/fiftyone/operators/__init__.py +++ b/fiftyone/operators/__init__.py @@ -14,7 +14,6 @@ ) from .executor import ( execute_operator, - execute_or_delegate_operator, ExecutionOptions, ) diff --git a/fiftyone/operators/delegated.py b/fiftyone/operators/delegated.py index f83cfa341a..151a4ae909 100644 --- a/fiftyone/operators/delegated.py +++ b/fiftyone/operators/delegated.py @@ -16,9 +16,9 @@ do_execute_operator, ExecutionResult, ExecutionRunState, - ExecutionProgress, ) + logger = logging.getLogger(__name__) @@ -54,28 +54,30 @@ def queue_operation( context=context, ) - def set_progress(self, doc_id, progress: ExecutionProgress): + def set_progress(self, doc_id, progress): """Sets the progress of the given delegated operation. Args: doc_id: the ID of the delegated operation - progress: the progress of the operation + progress: the + :class:`fiftyone.operators.executor.ExecutionProgress` of the + operation Returns: a :class:`fiftyone.factory.repos.DelegatedOperationDocument` """ return self._repo.update_progress(_id=doc_id, progress=progress) - def set_running( - self, doc_id, progress: ExecutionProgress = None, run_link: str = None - ): + def set_running(self, doc_id, progress=None, run_link=None): """Sets the given delegated operation to running state. Args: doc_id: the ID of the delegated operation - run_link (None): an optional run link to orchestrator specific run - information - progress (None): the optional progress of the operation + progress (None): an optional + :class:`fiftyone.operators.executor.ExecutionProgress` of the + operation + run_link (None): an optional link to orchestrator-specific + information about the operation Returns: a :class:`fiftyone.factory.repos.DelegatedOperationDocument` @@ -91,8 +93,8 @@ def set_completed( self, doc_id, result=None, - progress: ExecutionProgress = None, - run_link: str = None, + progress=None, + run_link=None, ): """Sets the given delegated operation to completed state. @@ -101,9 +103,11 @@ def set_completed( result (None): the :class:`fiftyone.operators.executor.ExecutionResult` of the operation - run_link (None): the optional run link to orchestrator specific run - information - progress (None): the optional progress of the operation + progress (None): an optional + :class:`fiftyone.operators.executor.ExecutionProgress` of the + operation + run_link (None): an optional link to orchestrator-specific + information about the operation Returns: a :class:`fiftyone.factory.repos.DelegatedOperationDocument` @@ -120,8 +124,8 @@ def set_failed( self, doc_id, result=None, - progress: ExecutionProgress = None, - run_link: str = None, + progress=None, + run_link=None, ): """Sets the given delegated operation to failed state. @@ -130,9 +134,11 @@ def set_failed( result (None): the :class:`fiftyone.operators.executor.ExecutionResult` of the operation - run_link (None): the optional run link to orchestrator specific run - information - progress (None): the optional progress of the operation + progress (None): an optional + :class:`fiftyone.operators.executor.ExecutionProgress` of the + operation + run_link (None): an optional link to orchestrator-specific + information about the operation Returns: a :class:`fiftyone.factory.repos.DelegatedOperationDocument` @@ -329,8 +335,8 @@ def execute_operation(self, operation, log=False, run_link=None): :class:`fiftyone.factory.repos.DelegatedOperationDocument` log (False): the optional boolean flag to log the execution of the delegated operations - run_link (None): the optional run link to orchestrator specific run - information + run_link (None): an optional link to orchestrator-specific + information about the operation """ try: self.set_running(doc_id=operation.id, run_link=run_link) @@ -346,7 +352,7 @@ def execute_operation(self, operation, log=False, run_link=None): self.set_completed(doc_id=operation.id, result=result) if log: logger.info("Operation %s complete", operation.id) - except Exception as e: + except: result = ExecutionResult(error=traceback.format_exc()) self.set_failed(doc_id=operation.id, result=result) diff --git a/fiftyone/operators/executor.py b/fiftyone/operators/executor.py index 00cd2db7db..f0aff83a1b 100644 --- a/fiftyone/operators/executor.py +++ b/fiftyone/operators/executor.py @@ -5,15 +5,12 @@ | `voxel51.com `_ | """ -import os import asyncio import collections import inspect import logging import os import traceback -import types as python_types -import typing import fiftyone as fo import fiftyone.core.dataset as fod @@ -28,6 +25,9 @@ from .message import GeneratedMessage, MessageType +logger = logging.getLogger(__name__) + + class ExecutionRunState(object): """Enumeration of the available operator run states.""" @@ -56,16 +56,15 @@ def to_json(self): } -class ExecutionProgress: +class ExecutionProgress(object): """Represents the status of an operator execution. - at least one of progress or label must be provided Args: progress (None): an optional float between 0 and 1 (0% to 100%) label (None): an optional label to display """ - def __init__(self, progress: float = None, label: str = None): + def __init__(self, progress=None, label=None): self.progress = progress self.label = label self.updated_at = None @@ -111,44 +110,38 @@ def to_json(self): } -# TODO: add request_delegation and delegation_target -def execute_operator(operator_uri, ctx, params=None): +def execute_operator(operator_uri, ctx=None, **kwargs): """Executes the operator with the given name. Args: operator_uri: the URI of the operator - ctx: a dictionary of parameters defining the execution context. The - supported keys are: + ctx (None): a dictionary of parameters defining the execution context. + The supported keys are: - ``dataset``: a :class:`fiftyone.core.dataset.Dataset` or the name of a dataset to process. This is required unless a ``view`` is provided - - ``view``: an optional :class:`fiftyone.core.view.DatasetView` - to process - - ``selected``: an optional list of selected sample IDs - - ``selected_labels``: an optional list of selected labels in the - format returned by + - ``view`` (None): an optional + :class:`fiftyone.core.view.DatasetView` to process + - ``selected`` ([]): an optional list of selected sample IDs + - ``selected_labels`` ([]): an optional list of selected labels + in the format returned by :attr:`fiftyone.core.session.Session.selected_labels` + - ``current_sample`` (None): an optional ID of the current sample + being processed - ``params``: a dictionary of parameters for the operator. Consult the operator's documentation for details - params (None): you can optionally provide the ``ctx.params`` dict as - a separate argument + - ``request_delegation`` (False): whether to request delegated + execution, if supported by the operator + - ``delegation_target`` (None): an optional orchestrator on which + to schedule the operation, if it is delegated + **kwargs: you can optionally provide any of the supported ``ctx`` keys + as keyword arguments rather than including them in ``ctx`` Returns: an :class:`ExecutionResult` """ - dataset_name, view_stages, selected, selected_labels, params = _parse_ctx( - ctx, params=params - ) - - request_params = dict( - operator_uri=operator_uri, - dataset_name=dataset_name, - view=view_stages, - selected=selected, - selected_labels=selected_labels, - params=params, - ) + request_params = _parse_ctx(ctx=ctx, **kwargs) return asyncio.run( execute_or_delegate_operator( @@ -157,14 +150,13 @@ def execute_operator(operator_uri, ctx, params=None): ) -def _parse_ctx(ctx, params=None): - dataset = ctx.get("dataset", None) - view = ctx.get("view", None) - selected = ctx.get("selected", None) - selected_labels = ctx.get("selected_labels", None) +def _parse_ctx(ctx=None, **kwargs): + if ctx is None: + ctx = {} - if params is None: - params = ctx.get("params", {}) + ctx = {**ctx, **kwargs} # don't modify input `ctx` in-place + dataset = ctx.pop("dataset", None) + view = ctx.pop("view", None) if dataset is None and isinstance(view, fov.DatasetView): dataset = view._root_dataset @@ -175,14 +167,14 @@ def _parse_ctx(ctx, params=None): view = dataset.view() - view_stages = view._serialize() + view = view._serialize() if isinstance(dataset, fod.Dataset): dataset_name = dataset.name else: dataset_name = dataset - return dataset_name, view_stages, selected, selected_labels, params + return dict(dataset_name=dataset_name, view=view, **ctx) @coroutine_timeout(seconds=fo.config.operator_timeout) @@ -206,10 +198,36 @@ async def execute_or_delegate_operator( operator, executor, ctx = prepared execution_options = operator.resolve_execution_options(ctx) - resolved_delegation = operator.resolve_delegation(ctx) + if ( + not execution_options.allow_immediate_execution + and not execution_options.allow_delegated_execution + ): + raise RuntimeError( + "This operation does not support immediate OR delegated execution" + ) + should_delegate = ( - ctx.requesting_delegated_execution or resolved_delegation - ) and execution_options.allow_delegated_execution + operator.resolve_delegation(ctx) or ctx.requesting_delegated_execution + ) + if should_delegate: + if not execution_options.allow_delegated_execution: + logger.warning( + ( + "This operation does not support delegated execution; it " + "will be executed immediately" + ) + ) + should_delegate = False + else: + if not execution_options.allow_immediate_execution: + logger.warning( + ( + "This operation does not support immediate execution; it " + "will be delegated" + ) + ) + should_delegate = True + if should_delegate: try: from .delegated import DelegatedOperationService @@ -230,14 +248,14 @@ async def execute_or_delegate_operator( else None ) return execution - except Exception as e: + except: return ExecutionResult( executor=executor, error=traceback.format_exc() ) else: try: result = await do_execute_operator(operator, ctx, exhaust=exhaust) - except Exception as e: + except: return ExecutionResult( executor=executor, error=traceback.format_exc() ) @@ -264,8 +282,6 @@ async def prepare_operator_executor( delegated_operation_id=delegated_operation_id, operator_uri=operator_uri, required_secrets=operator._plugin_secrets, - delegation_target=request_params.get("delegation_target", None), - request_delegation=request_params.get("request_delegation", False), ) await ctx.resolve_secret_values(operator._plugin_secrets) @@ -338,8 +354,7 @@ def resolve_execution_options(registry, operator_uri, request_params): request_params: a dictionary of request parameters Returns: - the type of the inputs :class:`fiftyone.operators.executor.ExecutionOptions` of - the operator, or None + a :class:`fiftyone.operators.executor.ExecutionOptions` or None """ if registry.operator_exists(operator_uri) is False: raise ValueError("Operator '%s' does not exist" % operator_uri) @@ -352,7 +367,6 @@ def resolve_execution_options(registry, operator_uri, request_params): ) try: return operator.resolve_execution_options(ctx) - except Exception as e: return ExecutionResult(error=traceback.format_exc()) @@ -403,9 +417,7 @@ def __init__( set_progress=None, delegated_operation_id=None, operator_uri=None, - required_secrets: typing.List[str] = None, - delegation_target=None, - request_delegation=False, + required_secrets=None, ): self.request_params = request_params or {} self.params = self.request_params.get("params", {}) @@ -425,8 +437,6 @@ def __init__( operator_uri=self._operator_uri, required_secrets=self._required_secret_keys, ) - self._delegation_target = delegation_target - self._request_delegation = request_delegation @property def dataset(self): @@ -455,13 +465,6 @@ def dataset_id(self): """ return self.request_params.get("dataset_id", None) - @property - def delegation_target(self): - """The ID of the Orchestrator being used to delegate the operation.""" - return self.request_params.get( - "delegation_target", self._delegation_target - ) - @property def view(self): """The :class:`fiftyone.core.view.DatasetView` being operated on.""" @@ -516,24 +519,26 @@ def current_sample(self): @property def delegated(self): - """Whether the operation's execution was delegated to an orchestrator. - - This property is only available for methods that are invoked after an - operator is executed, e.g. :meth:`resolve_output`. - """ + """Whether delegated execution has been forced for the operation.""" return self.request_params.get("delegated", False) @property - def results(self): - """A ``dict`` of results for the current operation. + def requesting_delegated_execution(self): + """Whether delegated execution has been requested for the operation.""" + return self.request_params.get("request_delegation", False) - This property is only available for methods that are invoked after an - operator is executed, e.g. :meth:`resolve_output`. - """ + @property + def delegation_target(self): + """The orchestrator to which the operation was delegated (if any).""" + return self.request_params.get("delegation_target", None) + + @property + def results(self): + """A ``dict`` of results for the current operation.""" return self.request_params.get("results", {}) @property - def secrets(self) -> SecretsDictionary: + def secrets(self): """A read-only mapping of keys to their resolved values.""" return SecretsDictionary( self._secrets, @@ -542,11 +547,6 @@ def secrets(self) -> SecretsDictionary: required_keys=self._required_secret_keys, ) - @property - def requesting_delegated_execution(self): - """Whether the invocation requested delegated execution.""" - return self._request_delegation - def secret(self, key): """Retrieves the secret with the given key. @@ -630,7 +630,7 @@ def to_dict(self): k: v for k, v in self.__dict__.items() if not k.startswith("_") } - def set_progress(self, progress: float = None, label: str = None): + def set_progress(self, progress=None, label=None): """Sets the progress of the current operation. Args: @@ -952,36 +952,42 @@ def exists_or_non_required(self, property, value): class ExecutionOptions(object): - """Represents the execution options of an operator. + """Represents the execution options of an operation. Args: - allow_immediate_execution (True): whether the operator can be executed + allow_immediate_execution (True): whether the operation can be executed immediately - allow_delegated_execution (True): whether the operator can be delegated - to an orchestrator - default_choice_to_delegated (True): when True the default option is to delegate + allow_delegated_execution (False): whether the operation can be + delegated to an orchestrator + default_choice_to_delegated (False): whether to default to delegated + execution, if allowed """ def __init__( self, allow_immediate_execution=True, - allow_delegated_execution=True, - default_choice_to_delegated=True, + allow_delegated_execution=False, + default_choice_to_delegated=False, ): self._allow_immediate_execution = allow_immediate_execution self._allow_delegated_execution = allow_delegated_execution - self._available_orchestrators = [] self._default_choice_to_delegated = default_choice_to_delegated + self._available_orchestrators = [] + if not allow_delegated_execution and not allow_immediate_execution: self._allow_immediate_execution = True + @property + def allow_immediate_execution(self): + return self._allow_immediate_execution + @property def allow_delegated_execution(self): return self._allow_delegated_execution @property - def allow_immediate_execution(self): - return self._allow_immediate_execution + def default_choice_to_delegated(self): + return self._default_choice_to_delegated @property def available_orchestrators(self): @@ -993,10 +999,6 @@ def orchestrator_registration_enabled(self): os.environ.get("FIFTYONE_ENABLE_ORCHESTRATOR_REGISTRATION", False) ) - @property - def default_choice_to_delegated(self): - return self._default_choice_to_delegated - def update(self, available_orchestrators=None): self._available_orchestrators = available_orchestrators @@ -1004,9 +1006,9 @@ def to_dict(self): return { "allow_immediate_execution": self._allow_immediate_execution, "allow_delegated_execution": self._allow_delegated_execution, + "default_choice_to_delegated": self._default_choice_to_delegated, "orchestrator_registration_enabled": self.orchestrator_registration_enabled, "available_orchestrators": [ x.__dict__ for x in self.available_orchestrators ], - "default_choice_to_delegated": self._default_choice_to_delegated, } diff --git a/fiftyone/operators/operator.py b/fiftyone/operators/operator.py index d7ff5ead43..090550fa62 100644 --- a/fiftyone/operators/operator.py +++ b/fiftyone/operators/operator.py @@ -5,7 +5,8 @@ | `voxel51.com `_ | """ -from .types import Object, Form, Property, PromptView +from .types import Object, PromptView + BUILTIN_OPERATOR_PREFIX = "@voxel51/operators" @@ -31,9 +32,15 @@ class OperatorConfig(object): when app is in the light mode dark_icon (None): icon to show for the operator in the Operator Browser when app is in the dark mode - allow_immediate_execution (True): whether the operator should allow immediate execution - allow_delegated_execution (False): whether the operator should allow delegated execution - resolve_execution_options_on_change (False): whether the execution options are resolved on change + allow_immediate_execution (True): whether the operator should allow + immediate execution + allow_delegated_execution (False): whether the operator should allow + delegated execution + default_choice_to_delegated (False): whether to default to delegated + execution, if allowed + resolve_execution_options_on_change (None): whether to resolve + execution options dynamically when inputs change. By default, this + behavior will match the ``dynamic`` setting """ def __init__( @@ -176,43 +183,49 @@ def resolve_definition(self, resolve_dynamic, ctx): return definition - def resolve_delegation(self, ctx) -> bool: - """Returns the resolved delegation flag. + def resolve_delegation(self, ctx): + """Returns the resolved *forced* delegation flag. - Subclasses can implement this method to define the logic which decides - if the operation should be queued for delegation + Subclasses can implement this method to decide if delegated execution + should be *forced* for the given operation. Args: ctx: the :class:`fiftyone.operators.executor.ExecutionContext` Returns: - a boolean indicating whether the operation should be delegated or `None` - to allow the default logic to be used + whether the operation should be delegated (True), run immediately + (False), or None to defer to :meth:`resolve_execution_options` to + specify the available options """ return None def resolve_execution_options(self, ctx): """Returns the resolved execution options. - Subclasses can implement this method to define the execution options. This - defines the behavior of the Execute button in the FiftyOne App. + Subclasses can implement this method to define the execution options + available for the operation. + + Args: + ctx: the :class:`fiftyone.operators.executor.ExecutionContext` Returns: a :class:`fiftyone.operators.executor.ExecutionOptions` instance """ from .executor import ExecutionOptions - resolved_delegation = self.resolve_delegation(ctx) - if resolved_delegation is None: - # legacy behavior + # Defer to forced delegation, if implemented + # pylint: disable=assignment-from-none + delegate = self.resolve_delegation(ctx) + if delegate is not None: return ExecutionOptions( - allow_immediate_execution=self.config.allow_immediate_execution, - allow_delegated_execution=self.config.allow_delegated_execution, + allow_immediate_execution=not delegate, + allow_delegated_execution=delegate, ) return ExecutionOptions( - allow_immediate_execution=not resolved_delegation, - allow_delegated_execution=resolved_delegation, + allow_immediate_execution=self.config.allow_immediate_execution, + allow_delegated_execution=self.config.allow_delegated_execution, + default_choice_to_delegated=self.config.default_choice_to_delegated, ) def execute(self, ctx):