Skip to content

Commit

Permalink
Add PREFECT_RESULTS_PERSIST_BY_DEFAULT to globally toggle the resul…
Browse files Browse the repository at this point in the history
…t persistence default (#7228)
  • Loading branch information
zanieb committed Oct 19, 2022
1 parent 7aea3e8 commit 50bbdcc
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 5 deletions.
12 changes: 11 additions & 1 deletion docs/concepts/results.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Persistence of results requires a [**serializer**](#result-serializers) and a [*

#### Toggling persistence

Persistence of the result of a task or flow can be configured with the `persist_result` option. The `persist_result` option defaults to a null value, which will automatically enable persistence if it is needed for a Prefect feature used by the flow or task.
Persistence of the result of a task or flow can be configured with the `persist_result` option. The `persist_result` option defaults to a null value, which will automatically enable persistence if it is needed for a Prefect feature used by the flow or task. Otherwise, persistence is disabled by default.

For example, the following flow has retries enabled. Flow retries require that all task results are persisted, so the task's result will be persisted:

Expand All @@ -311,6 +311,8 @@ def my_flow():
my_task()
```

Flow retries do not require the flow's result to be persisted, so it will not be.

In this next example, one task has caching enabled. Task caching requires that the given task's result is persisted:

```python
Expand Down Expand Up @@ -357,6 +359,14 @@ def my_task():

Toggling persistence manually will always override any behavior that Prefect would infer.

You may also change Prefect's default persistence behavior with the `PREFECT_RESULTS_PERSIST_BY_DEFAULT` setting. To persist results by default, even if they are not needed for a feature change the value to a truthy value:

```
$ prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
```

Task and flows with `persist_result=False` will not persist their results even if `PREFECT_RESULTS_PERSIST_BY_DEFAULT` is `true`.

#### Result storage location

[The result storage location](#result-storage-types) can be configured with the `result_storage` option. The `result_storage` option defaults to a null value, which infers storage from the context.
Expand Down
17 changes: 15 additions & 2 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from prefect.settings import (
PREFECT_LOCAL_STORAGE_PATH,
PREFECT_RESULTS_DEFAULT_SERIALIZER,
PREFECT_RESULTS_PERSIST_BY_DEFAULT,
)
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import sync_compatible
Expand Down Expand Up @@ -49,6 +50,13 @@ def get_default_result_serializer() -> ResultSerializer:
return PREFECT_RESULTS_DEFAULT_SERIALIZER.value()


def get_default_persist_setting() -> bool:
"""
Return the default option for result persistence (False).
"""
return PREFECT_RESULTS_PERSIST_BY_DEFAULT.value()


def flow_features_require_result_persistence(flow: "Flow") -> bool:
"""
Returns `True` if the given flow uses features that require its result to be
Expand Down Expand Up @@ -109,7 +117,7 @@ async def default_factory(cls, client: "OrionClient" = None, **kwargs):
# Apply defaults
kwargs.setdefault("result_storage", get_default_result_storage())
kwargs.setdefault("result_serializer", get_default_result_serializer())
kwargs.setdefault("persist_result", False)
kwargs.setdefault("persist_result", get_default_persist_setting())
kwargs.setdefault("cache_result_in_memory", True)

return await cls.from_settings(**kwargs, client=client)
Expand Down Expand Up @@ -140,6 +148,7 @@ async def from_flow(
(
flow_features_require_result_persistence(flow)
or flow_features_require_child_result_persistence(ctx.flow)
or get_default_persist_setting()
)
),
cache_result_in_memory=flow.cache_result_in_memory,
Expand All @@ -159,7 +168,10 @@ async def from_flow(
else
# !! Flows persist their result by default if uses a feature that
# requires it
flow_features_require_result_persistence(flow)
(
flow_features_require_result_persistence(flow)
or get_default_persist_setting()
)
),
cache_result_in_memory=flow.cache_result_in_memory,
)
Expand Down Expand Up @@ -191,6 +203,7 @@ async def from_task(
(
flow_features_require_child_result_persistence(ctx.flow)
or task_features_require_result_persistence(task)
or get_default_persist_setting()
)
)
cache_result_in_memory = task.cache_result_in_memory
Expand Down
11 changes: 11 additions & 0 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,17 @@ def warn_on_database_password_value_without_usage(values):
)
"""The default serializer to use when not otherwise specified."""


PREFECT_RESULTS_PERSIST_BY_DEFAULT = Setting(
bool,
default=False,
)
"""
The default setting for persisting results when not otherwise specified. If enabled,
flow and task results will be persisted unless they opt out.
"""


PREFECT_LOCAL_STORAGE_PATH = Setting(
Path,
default=Path("${PREFECT_HOME}") / "storage",
Expand Down
130 changes: 128 additions & 2 deletions tests/results/test_result_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from prefect.filesystems import LocalFileSystem
from prefect.results import LiteralResult, PersistedResult, ResultFactory
from prefect.serializers import JSONSerializer, PickleSerializer
from prefect.settings import PREFECT_LOCAL_STORAGE_PATH
from prefect.settings import (
PREFECT_LOCAL_STORAGE_PATH,
PREFECT_RESULTS_DEFAULT_SERIALIZER,
PREFECT_RESULTS_PERSIST_BY_DEFAULT,
temporary_settings,
)
from prefect.testing.utilities import assert_blocks_equal

DEFAULT_SERIALIZER = PickleSerializer
Expand Down Expand Up @@ -39,7 +44,7 @@ async def test_create_result_reference_has_cached_object(factory):
assert result.has_cached_object()


def test_root_flow_default_result_settings():
def test_root_flow_default_result_factory():
@flow
def foo():
return get_run_context().result_factory
Expand All @@ -52,6 +57,37 @@ def foo():
assert isinstance(result_factory.storage_block_id, uuid.UUID)


def test_root_flow_default_result_serializer_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_DEFAULT_SERIALIZER: "json"}):
result_factory = foo()
assert result_factory.serializer == JSONSerializer()


def test_root_flow_default_persist_result_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
result_factory = foo()
assert result_factory.persist_result is True


def test_roto_flow_can_opt_out_when_persist_result_default_is_overriden_by_setting():
@flow(persist_result=False)
def foo():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
result_factory = foo()

assert result_factory.persist_result is False


@pytest.mark.parametrize("toggle", [True, False])
def test_root_flow_custom_persist_setting(toggle):
@flow(persist_result=toggle)
Expand Down Expand Up @@ -203,6 +239,51 @@ def bar():
assert isinstance(child_factory.storage_block_id, uuid.UUID)


def test_child_flow_default_result_serializer_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()

@flow
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_DEFAULT_SERIALIZER: "json"}):
_, child_factory = foo()

assert child_factory.serializer == JSONSerializer()


def test_child_flow_default_persist_result_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()

@flow
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
_, child_factory = foo()

assert child_factory.persist_result is True


def test_child_flow_can_opt_out_when_persist_result_default_is_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()

@flow(persist_result=False)
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
_, child_factory = foo()

assert child_factory.persist_result is False


def test_child_flow_custom_persist_setting():
@flow
def foo():
Expand Down Expand Up @@ -439,6 +520,36 @@ def bar():
assert isinstance(task_factory.storage_block_id, uuid.UUID)


def test_task_default_result_serializer_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()

@task
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_DEFAULT_SERIALIZER: "json"}):
_, task_factory = foo()

assert task_factory.serializer == JSONSerializer()


def test_task_default_persist_result_can_be_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()

@task
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
_, task_factory = foo()

assert task_factory.persist_result is True


def test_task_custom_persist_setting():
@flow
def foo():
Expand Down Expand Up @@ -534,6 +645,21 @@ def bar():
assert isinstance(task_factory.storage_block_id, uuid.UUID)


def test_task_can_opt_out_when_persist_result_default_is_overriden_by_setting():
@flow
def foo():
return get_run_context().result_factory, bar()

@task(persist_result=False)
def bar():
return get_run_context().result_factory

with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
_, task_factory = foo()

assert task_factory.persist_result is False


def test_task_inherits_custom_serializer():
@flow(result_serializer="json")
def foo():
Expand Down

0 comments on commit 50bbdcc

Please sign in to comment.