Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CompressedSerializer for compression of other result serializers #7164

Merged
merged 2 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/concepts/results.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,19 @@ You may configure the result serializer using:
- A type name, e.g. `"json"` or `"pickle"` — this corresponds to an instance with default values
- An instance, e.g. `JSONSerializer(jsonlib="orjson")`

#### Compressing results

Prefect provides a `CompressedSerializer` which can be used to _wrap_ other serializers to provide compression over the bytes they generate. The compressed serializer uses `lzma` compression by default. We test other compression schemes provided in the Python standard library such as `bz2` and `zlib`, but you should be able to use any compression library that provides `compress` and `decompress` methods.

You may configure compression of results using:

- A type name, prefixed with `compressed/` e.g. `"compressed/json"` or `"compressed/pickle"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do a generalized dispatch implementation but it got really scary :D

- An instance e.g. `CompressedSerializer(serializer="pickle", compressionlib="lzma")`

Note that the `"compressed/<serializer-type>"` shortcut will only work for serializers provided by Prefect.
If you are using custom serializers, you must pass a full instance.


### Storage of results in Prefect

The Prefect API does not store your results in most cases for the following reasons:
Expand Down
77 changes: 77 additions & 0 deletions src/prefect/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,80 @@ def loads(self, blob: bytes) -> Any:
if self.object_decoder:
kwargs["object_hook"] = from_qualified_name(self.object_decoder)
return json.loads(blob.decode(), **kwargs)


class CompressedSerializer(Serializer):
"""
Wraps another serializer, compressing its output.
Uses `lzma` by default. See `compressionlib` for using alternative libraries.

Attributes:
serializer: The serializer to use before compression.
compressionlib: The import path of a compression module to use.
Must have methods `compress(bytes) -> bytes` and `decompress(bytes) -> bytes`.
level: If not null, the level of compression to pass to `compress`.
"""

type: Literal["compressed"] = "compressed"

serializer: Serializer
compressionlib: str = "lzma"

@pydantic.validator("serializer", pre=True)
def cast_type_names_to_serializers(cls, value):
if isinstance(value, str):
return Serializer(type=value)
return value

@pydantic.validator("compressionlib")
def check_compressionlib(cls, value):
"""
Check that the given pickle library is importable and has compress/decompress
methods.
"""
try:
compresser = from_qualified_name(value)
except (ImportError, AttributeError) as exc:
raise ValueError(
f"Failed to import requested compression library: {value!r}."
) from exc

if not callable(getattr(compresser, "compress", None)):
raise ValueError(
f"Compression library at {value!r} does not have a 'compress' method."
)

if not callable(getattr(compresser, "decompress", None)):
raise ValueError(
f"Compression library at {value!r} does not have a 'decompress' method."
)

return value

def dumps(self, obj: Any) -> bytes:
blob = self.serializer.dumps(obj)
compresser = from_qualified_name(self.compressionlib)
return base64.encodebytes(compresser.compress(blob))

def loads(self, blob: bytes) -> Any:
compresser = from_qualified_name(self.compressionlib)
uncompressed = compresser.decompress(base64.decodebytes(blob))
return self.serializer.loads(uncompressed)


class CompressedPickleSerializer(CompressedSerializer):
"""
A compressed serializer preconfigured to use the pickle serializer.
"""

type: Literal["compressed/pickle"] = "compressed/pickle"
serializer: Serializer = pydantic.Field(default_factory=PickleSerializer)


class CompressedJSONSerializer(CompressedSerializer):
"""
A compressed serializer preconfigured to use the json serializer.
"""

type: Literal["compressed/json"] = "compressed/json"
serializer: Serializer = pydantic.Field(default_factory=JSONSerializer)
19 changes: 17 additions & 2 deletions tests/results/test_flow_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from prefect.filesystems import LocalFileSystem
from prefect.flows import flow
from prefect.results import LiteralResult
from prefect.serializers import JSONSerializer, PickleSerializer, Serializer
from prefect.serializers import (
CompressedSerializer,
JSONSerializer,
PickleSerializer,
Serializer,
)
from prefect.settings import PREFECT_HOME
from prefect.testing.utilities import (
assert_uses_result_serializer,
Expand Down Expand Up @@ -95,7 +100,17 @@ def foo():

@pytest.mark.parametrize(
"serializer",
["json", "pickle", JSONSerializer(), PickleSerializer(), MyIntSerializer()],
[
"json",
"pickle",
JSONSerializer(),
PickleSerializer(),
MyIntSerializer(),
"int-custom",
"compressed/pickle",
"compressed/json",
CompressedSerializer(serializer=MyIntSerializer()),
],
)
async def test_flow_result_serializer(serializer, orion_client):
@flow(result_serializer=serializer, persist_result=True)
Expand Down
9 changes: 8 additions & 1 deletion tests/results/test_task_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ def bar():

@pytest.mark.parametrize(
"serializer",
["json", "pickle", JSONSerializer(), PickleSerializer()],
[
"json",
"pickle",
JSONSerializer(),
PickleSerializer(),
"compressed/pickle",
"compressed/json",
],
)
@pytest.mark.parametrize("source", ["child", "parent"])
async def test_task_result_serializer(orion_client, source, serializer):
Expand Down
47 changes: 47 additions & 0 deletions tests/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

from prefect.serializers import (
CompressedSerializer,
JSONSerializer,
PickleSerializer,
Serializer,
Expand Down Expand Up @@ -298,3 +299,49 @@ def test_does_not_allow_object_hook_collision(self):
def test_does_not_allow_default_collision(self):
with pytest.raises(pydantic.ValidationError):
JSONSerializer(dumps_kwargs={"default": "foo"})


class TestCompressedSerializer:
peytonrunyan marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize("data", SERIALIZER_TEST_CASES)
def test_simple_roundtrip(self, data):
serializer = CompressedSerializer(serializer="pickle")
serialized = serializer.dumps(data)
assert serializer.loads(serialized) == data

@pytest.mark.parametrize("lib", ["bz2", "lzma", "zlib"])
def test_allows_stdlib_compression_libraries(self, lib):
serializer = CompressedSerializer(compressionlib=lib, serializer="pickle")
serialized = serializer.dumps("test")
assert serializer.loads(serialized) == "test"

def test_uses_alternative_compression_library(self, monkeypatch):
compress_mock = MagicMock(return_value=b"test")
decompress_mock = MagicMock(return_value=PickleSerializer().dumps("test"))
monkeypatch.setattr("zlib.compress", compress_mock)
monkeypatch.setattr("zlib.decompress", decompress_mock)
serializer = CompressedSerializer(compressionlib="zlib", serializer="pickle")
serializer.dumps("test")
serializer.loads(b"test")
compress_mock.assert_called_once()
decompress_mock.assert_called_once()

def test_uses_given_serializer(self, monkeypatch):
compress_mock = MagicMock(return_value=b"test")
decompress_mock = MagicMock(return_value=JSONSerializer().dumps("test"))
monkeypatch.setattr("zlib.compress", compress_mock)
monkeypatch.setattr("zlib.decompress", decompress_mock)
serializer = CompressedSerializer(compressionlib="zlib", serializer="json")
serializer.dumps("test")
serializer.loads(b"test")
compress_mock.assert_called_once()
decompress_mock.assert_called_once()

def test_pickle_shorthand(self):
serializer = Serializer(type="compressed/pickle")
assert isinstance(serializer, CompressedSerializer)
assert isinstance(serializer.serializer, PickleSerializer)

def test_json_shorthand(self):
serializer = Serializer(type="compressed/json")
assert isinstance(serializer, CompressedSerializer)
assert isinstance(serializer.serializer, JSONSerializer)