Skip to content

Commit

Permalink
Refactor result types names (#7106)
Browse files Browse the repository at this point in the history
  • Loading branch information
zanieb committed Oct 12, 2022
1 parent 2118b07 commit 665e803
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 55 deletions.
24 changes: 12 additions & 12 deletions docs/concepts/results.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,43 +474,43 @@ Drawbacks of the JSON serializer:

## Result types

Prefect uses internal result types to capture information about the result attached to a state. The following types exist:
Prefect uses internal result types to capture information about the result attached to a state. The following types are used:

- `ResultLiteral`: Stores simple values inline.
- `ResultReference`: A reference to persisted values.
- `LiteralResult`: Stores simple values inline.
- `PersistedResult`: Stores a reference to a result persisted to storage.

All result types include a `get()` method that can be called to return the value of the result. This is done behind the scenes when the `result()` method is used on states or futures.

### Result literals
### Literal results

Result literals are used to represent [results stored in the Prefect database](#storage-of-results-in-prefect). The values contained by these results must always be JSON serializable.
Literal results are used to represent [results stored in the Prefect database](#storage-of-results-in-prefect). The values contained by these results must always be JSON serializable.

Example:
```
result = ResultLiteral(value=None)
result = LiteralResult(value=None)
result.json()
# {"type": "result", "value": "null"}
```

Result literals reduce the overhead required to persist simple results.
Literal results reduce the overhead required to persist simple results.

### Result references
### Persisted results

Result references contain all of the information needed to retrieve the result from storage. This includes:
The persisted result type contains all of the information needed to retrieve the result from storage. This includes:

- Storage: A reference to the [result storage](#result-storage-types) that can be used to read the serialized result.
- Key: Indicates where this specific result is in storage.

References contain additional metadata, for inspection for the result:
Persisted result types also contain metadata for inspection without retrieving the result:

- Serializer type: The name of the [result serializer](#result-serializer-types) type.

The `get()` method on result references retrieves the data from storage, deserializes it, and returns the original object.
The `get()` operation will cache the resolved object to reduce the overhead of subsequent calls.

#### Result blob
#### Persisted result blob

When results are persisted to storage, they are always written as a JSON document. The schema for this is described by the `ResultBlob` type. The document contains:
When results are persisted to storage, they are always written as a JSON document. The schema for this is described by the `PersistedResultBlob` type. The document contains:

- The serialized data of the result.
- A full description of [result serializer](#result-serializer-types) that can be used to deserialize the result data.
Expand Down
32 changes: 17 additions & 15 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,23 +246,23 @@ async def create_result(self, obj: R) -> Union[R, "BaseResult[R]"]:
If persistence is disabled, the object is returned unaltered.
Literal types are converted into `ResultLiteral`.
Literal types are converted into `LiteralResult`.
Other types are serialized, persisted to storage, and a reference is returned.
"""
if obj is None:
# Always write nulls as result types to distinguish from unpersisted results
return await ResultLiteral.create(None)
return await LiteralResult.create(None)

if not self.persist_result:
# Attach the object directly if persistence is disabled; it will be dropped
# when sent to the API
return obj

if type(obj) in LITERAL_TYPES:
return await ResultLiteral.create(obj)
return await LiteralResult.create(obj)

return await ResultReference.create(
return await PersistedResult.create(
obj,
storage_block=self.storage_block,
storage_block_id=self.storage_block_id,
Expand Down Expand Up @@ -291,7 +291,7 @@ class Config:
extra = "forbid"


class ResultLiteral(BaseResult):
class LiteralResult(BaseResult):
"""
Result type for literal values like `None`, `True`, `False`.
Expand All @@ -309,9 +309,9 @@ async def get(self) -> R:
@classmethod
@sync_compatible
async def create(
cls: "Type[ResultLiteral]",
cls: "Type[LiteralResult]",
obj: R,
) -> "ResultLiteral[R]":
) -> "LiteralResult[R]":
if type(obj) not in LITERAL_TYPES:
raise TypeError(
f"Unsupported type {type(obj).__name__!r} for result literal. "
Expand All @@ -321,12 +321,12 @@ async def create(
return cls(value=obj)


class ResultReference(BaseResult):
class PersistedResult(BaseResult):
"""
Result type which stores a reference to a persisted result.
When created, the user's object is serialized and stored. The format for the content
is defined by `ResultBlob`. This reference contains metadata necessary for retrieval
is defined by `PersistedResultBlob`. This reference contains metadata necessary for retrieval
of the object, such as a reference to the storage block and the key where the
content was written.
"""
Expand Down Expand Up @@ -358,7 +358,7 @@ async def get(self, client: OrionClient) -> R:
storage_block: ReadableFileSystem = Block._from_block_document(block_document)
content = await storage_block.read_path(self.storage_key)

blob = ResultBlob.parse_raw(content)
blob = PersistedResultBlob.parse_raw(content)
obj = blob.serializer.loads(blob.data)
self._cache_object(obj)

Expand All @@ -367,21 +367,21 @@ async def get(self, client: OrionClient) -> R:
@classmethod
@sync_compatible
async def create(
cls: "Type[ResultReference]",
cls: "Type[PersistedResult]",
obj: R,
storage_block: WritableFileSystem,
storage_block_id: uuid.UUID,
serializer: Serializer,
cache_object: bool = True,
) -> "ResultReference[R]":
) -> "PersistedResult[R]":
"""
Create a new result reference from a user's object.
The object will be serialized and written to the storage block under a unique
key. It will then be cached on the returned result.
"""
data = serializer.dumps(obj)
blob = ResultBlob(serializer=serializer, data=data)
blob = PersistedResultBlob(serializer=serializer, data=data)

key = uuid.uuid4().hex
await storage_block.write_path(key, content=blob.to_bytes())
Expand All @@ -399,9 +399,11 @@ async def create(
return result


class ResultBlob(pydantic.BaseModel):
class PersistedResultBlob(pydantic.BaseModel):
"""
The format of the content stored in a result file.
The format of the content stored by a persisted result.
Typically, this is written to a file as bytes.
"""

serializer: Serializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,42 @@

import pytest

from prefect.results import BaseResult, ResultLiteral
from prefect.results import BaseResult, LiteralResult

LITERAL_VALUES = [True, False, None]


@pytest.mark.parametrize("value", LITERAL_VALUES)
async def test_result_literal_create_and_get(value):
result = await ResultLiteral.create(value)
result = await LiteralResult.create(value)
assert await result.get() == value


@pytest.mark.parametrize("value", LITERAL_VALUES)
def test_result_literal_create_and_get_sync(value):
result = ResultLiteral.create(value)
result = LiteralResult.create(value)
assert result.get() == value


@pytest.mark.parametrize("value", LITERAL_VALUES)
async def test_result_literal_json_roundtrip(value):
result = await ResultLiteral.create(value)
result = await LiteralResult.create(value)
serialized = result.json()
deserialized = ResultLiteral.parse_raw(serialized)
deserialized = LiteralResult.parse_raw(serialized)
assert await deserialized.get() == value


@pytest.mark.parametrize("value", LITERAL_VALUES)
async def test_result_literal_json_roundtrip(value):
result = await ResultLiteral.create(value)
result = await LiteralResult.create(value)
serialized = result.json()
deserialized = BaseResult.parse_raw(serialized)
assert await deserialized.get() == value


async def test_result_literal_does_not_allow_unsupported_types():
with pytest.raises(TypeError, match="Unsupported type 'dict' for result literal"):
await ResultLiteral.create({"foo": "bar"})
await LiteralResult.create({"foo": "bar"})


async def test_result_literal_null_is_distinguishable_from_none():
Expand All @@ -46,7 +46,7 @@ async def test_result_literal_null_is_distinguishable_from_none():
because the user disabled persistence (for example) from cases where the result
is stored but is a null value.
"""
result = await ResultLiteral.create(None)
result = await LiteralResult.create(None)
assert result is not None
serialized = result.json()
assert serialized is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from prefect.filesystems import LocalFileSystem
from prefect.results import ResultBlob, ResultReference
from prefect.results import PersistedResult, PersistedResultBlob
from prefect.serializers import JSONSerializer, PickleSerializer


Expand All @@ -16,7 +16,7 @@ async def storage_block(tmp_path):

@pytest.mark.parametrize("cache_object", [True, False])
async def test_result_reference_create_and_get(cache_object, storage_block):
result = await ResultReference.create(
result = await PersistedResult.create(
"test",
storage_block_id=storage_block._block_document_id,
storage_block=storage_block,
Expand All @@ -33,7 +33,7 @@ async def test_result_reference_create_and_get(cache_object, storage_block):


async def test_result_reference_create_uses_storage(storage_block):
result = await ResultReference.create(
result = await PersistedResult.create(
"test",
storage_block_id=storage_block._block_document_id,
storage_block=storage_block,
Expand All @@ -48,7 +48,7 @@ async def test_result_reference_create_uses_storage(storage_block):
async def test_result_reference_create_uses_serializer(storage_block):
serializer = PickleSerializer(picklelib="pickle")

result = await ResultReference.create(
result = await PersistedResult.create(
"test",
storage_block_id=storage_block._block_document_id,
storage_block=storage_block,
Expand All @@ -57,7 +57,7 @@ async def test_result_reference_create_uses_serializer(storage_block):

assert result.serializer_type == serializer.type
contents = await storage_block.read_path(result.storage_key)
blob = ResultBlob.parse_raw(contents)
blob = PersistedResultBlob.parse_raw(contents)
assert blob.serializer == serializer
assert serializer.loads(blob.data) == "test"

Expand All @@ -67,7 +67,7 @@ async def test_result_reference_file_blob_is_json(storage_block):
jsonlib="orjson", object_decoder=None, object_encoder=None
)

result = await ResultReference.create(
result = await PersistedResult.create(
"test",
storage_block_id=storage_block._block_document_id,
storage_block=storage_block,
Expand All @@ -79,8 +79,8 @@ async def test_result_reference_file_blob_is_json(storage_block):
# Should be readable by JSON
blob_dict = json.loads(contents)

# Should conform to the ResultBlob spec
blob = ResultBlob.parse_obj(blob_dict)
# Should conform to the PersistedResultBlob spec
blob = PersistedResultBlob.parse_obj(blob_dict)

assert blob.serializer
assert blob.data
6 changes: 3 additions & 3 deletions tests/results/test_result_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from prefect import flow, task
from prefect.context import get_run_context
from prefect.filesystems import LocalFileSystem
from prefect.results import ResultFactory, ResultLiteral, ResultReference
from prefect.results import LiteralResult, PersistedResult, ResultFactory
from prefect.serializers import JSONSerializer, PickleSerializer
from prefect.settings import PREFECT_LOCAL_STORAGE_PATH

Expand Down Expand Up @@ -39,13 +39,13 @@ async def factory(orion_client):
@pytest.mark.parametrize("value", [True, False, None])
async def test_create_result_literal(value, factory):
result = await factory.create_result(value)
assert isinstance(result, ResultLiteral)
assert isinstance(result, LiteralResult)
assert await result.get() == value


async def test_create_result_reference(factory):
result = await factory.create_result({"foo": "bar"})
assert isinstance(result, ResultReference)
assert isinstance(result, PersistedResult)
assert result.serializer_type == factory.serializer.type
assert result.storage_block_id == factory.storage_block_id
assert await result.get() == {"foo": "bar"}
Expand Down
10 changes: 5 additions & 5 deletions tests/results/test_result_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from prefect import flow, task
from prefect.client.schemas import Completed
from prefect.results import ResultLiteral
from prefect.results import LiteralResult
from prefect.settings import PREFECT_ASYNC_FETCH_STATE_RESULT, temporary_settings


Expand All @@ -20,7 +20,7 @@ def disable_fetch_by_default():
async def test_async_result_raises_deprecation_warning():
# This test creates a state directly because a flows do not yet return the new
# result types
state = Completed(data=await ResultLiteral.create(True))
state = Completed(data=await LiteralResult.create(True))
result = state.result(fetch=False)

with pytest.warns(
Expand All @@ -30,7 +30,7 @@ async def test_async_result_raises_deprecation_warning():
result = state.result()

# A result type is returned
assert isinstance(result, ResultLiteral)
assert isinstance(result, LiteralResult)
assert await result.get() is True


Expand Down Expand Up @@ -97,11 +97,11 @@ async def my_flow():
async def test_async_result_does_not_raise_warning_with_opt_out():
# This test creates a state directly because a flows do not yet return the new
# result types
state = Completed(data=await ResultLiteral.create(True))
state = Completed(data=await LiteralResult.create(True))
result = state.result(fetch=False)

# A result type is returned
assert isinstance(result, ResultLiteral)
assert isinstance(result, LiteralResult)
assert await result.get() is True


Expand Down
4 changes: 2 additions & 2 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter
from prefect.orion.schemas.sorting import FlowRunSort
from prefect.orion.schemas.states import StateType
from prefect.results import ResultReference
from prefect.results import PersistedResult
from prefect.settings import PREFECT_LOCAL_STORAGE_PATH, temporary_settings
from prefect.states import StateType, raise_state_exception
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner
Expand Down Expand Up @@ -1329,7 +1329,7 @@ def foo():

# Check that the storage block uses local path
reference = state.result(fetch=False)
assert isinstance(reference, ResultReference)
assert isinstance(reference, PersistedResult)
storage_block = Block._from_block_document(
await orion_client.read_block_document(reference.storage_block_id)
)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from prefect.client.schemas import Completed, Crashed, Failed, Pending, Running
from prefect.exceptions import CrashedRun, FailedRun
from prefect.futures import PrefectFuture
from prefect.results import ResultFactory, ResultLiteral
from prefect.results import LiteralResult, ResultFactory
from prefect.states import (
is_state,
is_state_iterable,
Expand Down Expand Up @@ -85,7 +85,7 @@ async def test_value_error_if_all_multistates_are_not_failed(self, state_cls):
):
await raise_state_exception(state_cls(data=inner_states))

@pytest.mark.parametrize("value", ["foo", ResultLiteral(value="foo")])
@pytest.mark.parametrize("value", ["foo", LiteralResult(value="foo")])
async def test_raises_wrapper_with_message_if_result_is_string(
self, state_cls, value
):
Expand Down

0 comments on commit 665e803

Please sign in to comment.