Skip to content

Commit

Permalink
Consolidate DataDocument related handling into deprecated module (#…
Browse files Browse the repository at this point in the history
…6900)

* Consolidate `DataDocument` related handling into `deprecated` module

* Restore `blocks` file

* Fix DataDocument reference in postgres migration

* Remove `data` schema from docs

* Add deprecated import

* Fix missed postgres import

* Fix merge
  • Loading branch information
zanieb committed Sep 28, 2022
1 parent d0b35fb commit 8ca3c83
Show file tree
Hide file tree
Showing 24 changed files with 272 additions and 270 deletions.
3 changes: 0 additions & 3 deletions docs/api-ref/orion/schemas/data.md

This file was deleted.

1 change: 1 addition & 0 deletions src/prefect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

# Import modules that register types
import prefect.serializers
import prefect.deprecated.data_documents
import prefect.packaging
import prefect.blocks.kubernetes
import prefect.blocks.notifications
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

from prefect.blocks.core import Block
from prefect.client import OrionClient, get_client
from prefect.deprecated.data_documents import DataDocument
from prefect.engine import propose_state
from prefect.exceptions import Abort, ObjectNotFound
from prefect.infrastructure import Infrastructure, InfrastructureResult, Process
from prefect.logging import get_logger
from prefect.orion.schemas.core import BlockDocument, FlowRun, WorkQueue
from prefect.orion.schemas.data import DataDocument
from prefect.orion.schemas.states import Failed, Pending
from prefect.settings import PREFECT_AGENT_PREFETCH_SECONDS

Expand Down
6 changes: 3 additions & 3 deletions src/prefect/client/orion.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
State,
TaskRun,
)
from prefect.deprecated.data_documents import DataDocument
from prefect.logging import get_logger
from prefect.orion.api.server import ORION_API_VERSION, create_app
from prefect.orion.schemas.actions import LogCreate, WorkQueueCreate, WorkQueueUpdate
Expand All @@ -32,7 +33,6 @@
BlockType,
QueueFilter,
)
from prefect.orion.schemas.data import DataDocument
from prefect.orion.schemas.filters import LogFilter
from prefect.settings import (
PREFECT_API_KEY,
Expand Down Expand Up @@ -1423,7 +1423,7 @@ async def set_flow_run_state(
flow_run_id: UUID,
state: State,
force: bool = False,
backend_state_data: schemas.data.DataDocument = None,
backend_state_data: DataDocument = None,
) -> OrchestrationResult:
"""
Set the state of a flow run.
Expand Down Expand Up @@ -1606,7 +1606,7 @@ async def set_task_run_state(
task_run_id: UUID,
state: State,
force: bool = False,
backend_state_data: schemas.data.DataDocument = None,
backend_state_data: DataDocument = None,
) -> OrchestrationResult:
"""
Set the state of a task run.
Expand Down
3 changes: 2 additions & 1 deletion src/prefect/client/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pydantic import Field

from prefect.deprecated.data_documents import DataDocument
from prefect.orion import schemas

R = TypeVar("R")
Expand All @@ -16,7 +17,7 @@ class State(schemas.states.State.subclass(exclude_fields=["data"]), Generic[R]):
This client-side extension adds a `result` interface.
"""

data: Optional[schemas.data.DataDocument[R]] = Field(
data: Optional[DataDocument[R]] = Field(
default=None,
)

Expand Down
Empty file.
211 changes: 211 additions & 0 deletions src/prefect/deprecated/data_documents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import base64
import json
from typing import TYPE_CHECKING, Any, Dict, Generic, Tuple, Type, TypeVar, Union

import cloudpickle
from typing_extensions import Protocol

from prefect.orion.utilities.schemas import PrefectBaseModel

if TYPE_CHECKING:
from prefect.packaging.base import PackageManifest
from prefect.results import _Result


T = TypeVar("T", bound="DataDocument") # Generic for DataDocument class types
D = TypeVar("D", bound=Any) # Generic for DataDocument data types

_SERIALIZERS: Dict[str, "Serializer"] = {}
D = TypeVar("D")


class Serializer(Protocol[D]):
"""
Define a serializer that can encode data of type 'D' into bytes
"""

@staticmethod
def dumps(data: D, **kwargs: Any) -> bytes:
raise NotImplementedError

@staticmethod
def loads(blob: bytes) -> D:
raise NotImplementedError


def register_serializer(
encoding: Union[str, Tuple[str, ...]], serializer: Serializer = None
):
"""Register dispatch of `func` on arguments of encoding `encoding`"""

def wrapper(serializer):
if isinstance(encoding, tuple):
for e in encoding:
register_serializer(e, serializer)
else:
_SERIALIZERS[encoding] = serializer
return serializer

return wrapper(serializer) if serializer is not None else wrapper


def lookup_serializer(encoding: str) -> Serializer:
"""Return the serializer implementation for the given ``encoding``"""
try:
return _SERIALIZERS[encoding]
except KeyError:
raise ValueError(f"Unregistered encoding {encoding!r}")


class DataDocument(PrefectBaseModel, Generic[D]):
"""
A data document includes an encoding string and a blob of encoded data
Subclasses can define the expected type for the blob's underlying type using the
generic variable `D`.
For example `DataDocument[str]` indicates that a string should be passed when
creating the document and a string will be returned when it is decoded.
"""

encoding: str
blob: bytes

# A cache for the decoded data, see `DataDocument.decode`
_data: D
__slots__ = ["_data"]

@classmethod
def encode(
cls: Type["DataDocument"], encoding: str, data: D, **kwargs: Any
) -> "DataDocument[D]":
"""
Create a new data document
A serializer must be registered for the given `encoding`
"""
# Dispatch encoding
blob = lookup_serializer(encoding).dumps(data, **kwargs)

inst = cls(blob=blob, encoding=encoding)
inst._cache_data(data)
return inst

def decode(self) -> D:
"""
Get the data from a data document
A serializer must be registered for the document's encoding
"""
if self.has_cached_data():
return self._data

# Dispatch decoding
data = lookup_serializer(self.encoding).loads(self.blob)

self._cache_data(data)
return data

def _cache_data(self, data) -> None:
# Use object's setattr to avoid a pydantic 'field does not exist' error
# See https://github.com/samuelcolvin/pydantic/issues/655
object.__setattr__(self, "_data", data)

def has_cached_data(self):
return hasattr(self, "_data")

def __str__(self) -> str:
if self.has_cached_data():
return repr(self._data)
else:
return repr(self)

def __repr__(self) -> str:
return f"{type(self).__name__}(encoding={self.encoding!r})"


@register_serializer("json")
class DocumentJSONSerializer:
"""
Serializes data to JSON.
Input types must be compatible with the stdlib json library.
Wraps the `json` library to serialize to UTF-8 bytes instead of string types.
"""

@staticmethod
def dumps(data: Any) -> bytes:
return json.dumps(data).encode()

@staticmethod
def loads(blob: bytes) -> Any:
return json.loads(blob.decode())


@register_serializer("text")
class TextSerializer:
@staticmethod
def dumps(data: str) -> bytes:
return data.encode()

@staticmethod
def loads(blob: bytes) -> str:
return blob.decode()


@register_serializer("cloudpickle")
class DocumentPickleSerializer:
"""
Serializes arbitrary objects using the pickle protocol.
Wraps `cloudpickle` to encode bytes in base64 for safe transmission.
"""

@staticmethod
def dumps(data: Any) -> bytes:
data_bytes = cloudpickle.dumps(data)

return base64.encodebytes(data_bytes)

@staticmethod
def loads(blob: bytes) -> Any:
return cloudpickle.loads(base64.decodebytes(blob))
# TODO: Consider adding python version data to pickle payloads to raise
# more helpful errors for users.
# A TypeError("expected bytes-like object, not int") will be raised if
# a document is deserialized by Python 3.7 and serialized by 3.8+


@register_serializer("package-manifest")
class PackageManifestSerializer:
"""
Serializes a package manifest.
"""

@staticmethod
def dumps(data: "PackageManifest") -> bytes:
return data.json().encode()

@staticmethod
def loads(blob: bytes) -> "PackageManifest":
from prefect.packaging.base import PackageManifest

return PackageManifest.parse_raw(blob)


@register_serializer("result")
class ResultSerializer:
"""
Serializes a result object
"""

@staticmethod
def dumps(data: "_Result") -> bytes:
return data.json().encode()

@staticmethod
def loads(blob: bytes) -> "_Result":
from prefect.results import _Result

return _Result.parse_raw(blob)
2 changes: 1 addition & 1 deletion src/prefect/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
TaskRunContext,
)
from prefect.deployments import load_flow_from_flow_run
from prefect.deprecated.data_documents import DataDocument
from prefect.exceptions import (
Abort,
MappingLengthMismatch,
Expand All @@ -57,7 +58,6 @@
task_run_logger,
)
from prefect.orion.schemas.core import TaskRunInput, TaskRunResult
from prefect.orion.schemas.data import DataDocument
from prefect.orion.schemas.filters import FlowRunFilter
from prefect.orion.schemas.responses import SetStateStatus
from prefect.orion.schemas.sorting import FlowRunSort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
from sqlalchemy import Text

import prefect
from prefect.orion.utilities.schemas import PrefectBaseModel


class DataDocument(PrefectBaseModel):
"""
DataDocuments were deprecated in September 2022 and this stub is included here
to simplify removal from the library.
"""

encoding: str
blob: bytes


# revision identifiers, used by Alembic.
revision = "25f4b90a7a42"
Expand Down Expand Up @@ -248,9 +260,7 @@ def upgrade():
),
sa.Column(
"flow_data",
prefect.orion.utilities.database.Pydantic(
prefect.orion.schemas.data.DataDocument
),
prefect.orion.utilities.database.Pydantic(DataDocument),
nullable=True,
),
sa.Column("flow_runner_type", sa.String(), nullable=True),
Expand Down Expand Up @@ -485,9 +495,7 @@ def upgrade():
),
sa.Column(
"data",
prefect.orion.utilities.database.Pydantic(
prefect.orion.schemas.data.DataDocument
),
prefect.orion.utilities.database.Pydantic(DataDocument),
nullable=True,
),
sa.Column(
Expand Down Expand Up @@ -692,9 +700,7 @@ def upgrade():
),
sa.Column(
"data",
prefect.orion.utilities.database.Pydantic(
prefect.orion.schemas.data.DataDocument
),
prefect.orion.utilities.database.Pydantic(DataDocument),
nullable=True,
),
sa.Column(
Expand Down
Loading

0 comments on commit 8ca3c83

Please sign in to comment.