Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zep PostgresDatasource returns a list of batches. #6341

Merged
merged 3 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions great_expectations/zep/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@
from great_expectations.zep.metadatasource import MetaDatasource

# BatchRequestOptions is a dict that is composed into a BatchRequest that specifies the
# Batches one wants returned. In the simple case the keys represent dimensions one can
# slice the data along and the values are the values. One can also namespace these key/value
# pairs, hence the Dict[str, BatchRequestValue], allowed values. For example:
# options = {
# "month": "3"
# "year_splitter": {
# "year": "2020"
# }
# }
# The month key is in the global namespace while the year key is in the year_splitter namespace.
# Batches one wants returned. The keys represent dimensions one can slice the data along
# and the values are the realized. If a value is None or unspecified, the batch_request
# will capture all data along this dimension. For example, if we have a year and month
# splitter and we want to query all months in the year 2020, the batch request options
# would look like:
# options = { "year": 2020 }
BatchRequestOptions: TypeAlias = Dict[str, Any]


Expand Down
225 changes: 160 additions & 65 deletions great_expectations/zep/postgres_datasource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import dataclasses
import itertools
from datetime import datetime
from pprint import pformat as pf
from typing import Any, Dict, List, Optional, Type
from typing import Dict, Iterable, List, Optional, Type, cast

import dateutil.tz
from typing_extensions import ClassVar

from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec
Expand All @@ -21,12 +24,28 @@ class PostgresDatasourceError(Exception):
pass


class BatchRequestError(Exception):
pass


# For our year splitter we default the range to the last 2 year.
_CURRENT_YEAR = datetime.now(dateutil.tz.tzutc()).year
_DEFAULT_YEAR_RANGE = range(_CURRENT_YEAR - 1, _CURRENT_YEAR + 1)
_DEFAULT_MONTH_RANGE = range(1, 13)


@dataclasses.dataclass(frozen=True)
class ColumnSplitter:
method_name: str
column_name: str
name: str
template_params: List[str]
# param_defaults is a Dict where the keys are the parameters of the splitter and the values are the default
# values are the default values if a batch request using the splitter leaves the parameter unspecified.
# template_params: List[str]
param_defaults: Dict[str, Iterable]

@property
def param_names(self) -> List[str]:
return list(self.param_defaults.keys())


class TableAsset(DataAsset):
Expand Down Expand Up @@ -65,64 +84,158 @@ def get_batch_request(
Args:
options: A dict that can be used to limit the number of batches returned from the asset.
The dict structure depends on the asset type. A template of the dict can be obtained by
calling batch_request_template.
calling batch_request_options_template.

Returns:
A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the
get_batch_list_from_batch_request method.
"""
if options is not None and not self._valid_batch_request_options(options):
raise BatchRequestError(
"Batch request options should have a subset of keys:\n"
f"{list(self.batch_request_options_template().keys())}\n"
f"but actually has the form:\n{pf(options)}\n"
)
return BatchRequest(
datasource_name=self.datasource.name,
data_asset_name=self.name,
options=options or {},
)

def batch_request_template(
def _valid_batch_request_options(self, options: BatchRequestOptions) -> bool:
return set(options.keys()).issubset(
Comment on lines +105 to +106
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit

On closer look, I don't think my suggestion below makes sense given how open batch_request_options_template is.

Leaving the comment up in-case it inspires someone to try this technique elsewhere.


This may not be that useful. But we might be to do 2 things to improve the type narrowing here.

  1. Define a more specific PostgresBatchOptions TypedDict
  2. Make this _valid_batch_request_options method a TypeGuard for the PostgresBatchOptions type.

I'm not totally sure how a TypedDict works with extra keys though.

from typing_extensions import TypedDict, TypeGuard

class PostgresBatchOptions(TypedDict):
    foo: str
    bar: int

def _valid_batch_request_options(self, options: BatchRequestOptions) -> TypeGaurd[PostgresBatchOptions]:
        return set(options.keys()).issubset(
            set(self.batch_request_options_template().keys())
        )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra arguments are not supported in TypedDicts. Here's the github issue about it with some interesting discussion: python/mypy#4617

Originally I had implemented BatchOptions using generics and had it more strongly typed. I abandoned that after getting some UX feedback and have reverted it to this untyped dict which I'm validated at runtime for a particular DataAsset. To help with the pain of "what can I put here" I've made the batch_request_options_template method. I would be nice to make better checks in static analysis but I'm not sure how to do that with the current requirements.

I am not familiar with TypeGuards and will look into that.

set(self.batch_request_options_template().keys())
)

def validate_batch_request(self, batch_request: BatchRequest) -> None:
if not (
batch_request.datasource_name == self.datasource.name
and batch_request.data_asset_name == self.name
and self._valid_batch_request_options(batch_request.options)
):
expect_batch_request_form = BatchRequest(
datasource_name=self.datasource.name,
data_asset_name=self.name,
options=self.batch_request_options_template(),
)
raise BatchRequestError(
"BatchRequest should have form:\n"
f"{pf(dataclasses.asdict(expect_batch_request_form))}\n"
f"but actually has form:\n{pf(dataclasses.asdict(batch_request))}\n"
)

def batch_request_options_template(
self,
) -> BatchRequestOptions:
"""A BatchRequestOptions template that can be used when calling get_batch_request.
"""A BatchRequestOptions template for get_batch_request.

Returns:
A BatchRequestOptions dictionary with the correct shape that get_batch_request
will understand. All the option values will be filled in with the placeholder "value".
will understand. All the option values are defaulted to None.
"""
template: BatchRequestOptions = {}
if not self.column_splitter:
template: BatchRequestOptions = {}
return template
params_dict: BatchRequestOptions
params_dict = {p: "<value>" for p in self.column_splitter.template_params}
if self.column_splitter.name:
params_dict = {self.column_splitter.name: params_dict}
return params_dict
return {p: None for p in self.column_splitter.param_names}

# This asset type will support a variety of splitters
def add_year_and_month_splitter(
self, column_name: str, name: str = ""
self,
column_name: str,
default_year_range: Iterable[int] = _DEFAULT_YEAR_RANGE,
default_month_range: Iterable[int] = _DEFAULT_MONTH_RANGE,
) -> TableAsset:
"""Associates a year month splitter with this DataAsset

Args:
column_name: A column name of the date column where year and month will be parsed out.
name: A name for the splitter that will be used to namespace the batch request options.
Leaving this empty, "", will add the options to the global namespace.
default_year_range: When this splitter is used, say in a BatchRequest, if no value for
year is specified, we query over all years in this range.
will query over all the years in this default range.
default_month_range: When this splitter is used, say in a BatchRequest, if no value for
month is specified, we query over all months in this range.

Returns:
This TableAsset so we can use this method fluently.
"""
self.column_splitter = ColumnSplitter(
method_name="split_on_year_and_month",
column_name=column_name,
name=name,
template_params=["year", "month"],
param_defaults={"year": default_year_range, "month": default_month_range},
)
return self

def fully_specified_batch_requests(self, batch_request) -> List[BatchRequest]:
"""Populates a batch requests unspecified params producing a list of batch requests

This method does NOT validate the batch_request. If necessary call
TableAsset.validate_batch_request before calling this method.
"""
if self.column_splitter is None:
# Currently batch_request.options is complete determined by the presence of a
# column splitter. If column_splitter is None, then there are no specifiable options
# so we return early.
# In the future, if there are options that are not determined by the column splitter
# this check will have to be generalized.
return [batch_request]

# Make a list of the specified and unspecified params in batch_request
specified_options = []
unspecified_options = []
options_template = self.batch_request_options_template()
for option_name in options_template.keys():
if (
option_name in batch_request.options
and batch_request.options[option_name] is not None
):
specified_options.append(option_name)
else:
unspecified_options.append(option_name)

# Make a list of the all possible batch_request.options by expanding out the unspecified
# options
batch_requests: List[BatchRequest] = []

if not unspecified_options:
batch_requests.append(batch_request)
else:
# All options are defined by the splitter, so we look at its default values to fill
# in the option values.
default_option_values = []
for option in unspecified_options:
default_option_values.append(
self.column_splitter.param_defaults[option]
)
for option_values in itertools.product(*default_option_values):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to use itertools more 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to implement this but then did some searching. Happy to have found it!

# Add options from specified options
options = {
name: batch_request.options[name] for name in specified_options
}
# Add options from unspecified options
for i, option_value in enumerate(option_values):
options[unspecified_options[i]] = option_value
batch_requests.append(
BatchRequest(
datasource_name=batch_request.datasource_name,
data_asset_name=batch_request.data_asset_name,
options=options,
)
)
return batch_requests


class PostgresDatasource(Datasource):
# class var definitions
asset_types: ClassVar[List[Type[DataAsset]]] = [TableAsset]

def __init__(self, name: str, connection_str: str) -> None:
"""Initializes the PostgresDatasource.

Args:
name: The name of this datasource.
connection_str: The SQLAlchemy connection string used to connect to the database.
For example: "postgresql+psycopg2://postgres:@localhost/test_database"
"""
self.name = name
self.execution_engine = SqlAlchemyExecutionEngine(
connection_string=connection_str
Expand Down Expand Up @@ -168,55 +281,37 @@ def get_batch_list_from_batch_request(
A list of batches that match the options specified in the batch request.
"""
# We translate the batch_request into a BatchSpec to hook into GX core.
# NOTE: We only produce 1 batch right now
data_asset = self.get_asset(batch_request.data_asset_name)

# We look at the splitters on the data asset and verify that the passed in batch request provides the
# correct arguments to specify the batch
batch_identifiers: Dict[str, Any] = {}
batch_spec_kwargs: Dict[str, Any] = {
"type": "table",
"data_asset_name": data_asset.name,
"table_name": data_asset.table_name,
"batch_identifiers": batch_identifiers,
}
if data_asset.column_splitter:
column_splitter = data_asset.column_splitter
batch_spec_kwargs["splitter_method"] = column_splitter.method_name
batch_spec_kwargs["splitter_kwargs"] = {
"column_name": column_splitter.column_name
data_asset.validate_batch_request(batch_request)

batch_list: List[Batch] = []
column_splitter = data_asset.column_splitter
for request in data_asset.fully_specified_batch_requests(batch_request):
batch_spec_kwargs = {
"type": "table",
"data_asset_name": data_asset.name,
"table_name": data_asset.table_name,
"batch_identifiers": {},
}
try:
param_lookup = (
batch_request.options[column_splitter.name]
if column_splitter.name
else batch_request.options
)
except KeyError as e:
raise PostgresDatasourceError(
"One must specify the batch request options in this form: "
f"{pf(data_asset.batch_request_template())}. It was specified like {pf(batch_request.options)}"
) from e

column_splitter_kwargs = {}
for param_name in column_splitter.template_params:
column_splitter_kwargs[param_name] = (
param_lookup[param_name] if param_name in param_lookup else None
if column_splitter:
batch_spec_kwargs["splitter_method"] = column_splitter.method_name
batch_spec_kwargs["splitter_kwargs"] = {
"column_name": column_splitter.column_name
}
# mypy infers that batch_spec_kwargs["batch_identifiers"] is a collection, but
# it is hardcoded to a dict above, so we cast it here.
cast(Dict, batch_spec_kwargs["batch_identifiers"]).update(
{column_splitter.column_name: request.options}
)
batch_spec_kwargs["batch_identifiers"].update(
{column_splitter.column_name: column_splitter_kwargs}
data, _ = self.execution_engine.get_batch_data_and_markers(
batch_spec=SqlAlchemyDatasourceBatchSpec(**batch_spec_kwargs)
)
batch_list.append(
Batch(
datasource=self,
data_asset=data_asset,
batch_request=batch_request,
data=data,
)

# Now, that we've verified the arguments, we can create the batch_spec and then the batch.
batch_spec = SqlAlchemyDatasourceBatchSpec(**batch_spec_kwargs)
data, _ = self.execution_engine.get_batch_data_and_markers(
batch_spec=batch_spec
)
return [
Batch(
datasource=self,
data_asset=data_asset,
batch_request=batch_request,
data=data,
)
]
return batch_list
Loading