Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] In ExecutionEngine: Make variable names and usage more descriptive of their purpose. #6342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions great_expectations/execution_engine/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,16 +335,16 @@ def resolve_metrics( # noqa: C901 - 16
accessor_domain_kwargs: dict
metric_provider_kwargs: dict
metric_to_resolve: MetricConfiguration
metric_dependencies: dict
resolved_metrics_by_metric_name: Dict[str, Any]
k: str
v: MetricConfiguration
for metric_to_resolve in metrics_to_resolve:
metric_dependencies = {}
resolved_metrics_by_metric_name = {}
for k, v in metric_to_resolve.metric_dependencies.items():
if v.id in metrics:
metric_dependencies[k] = metrics[v.id]
resolved_metrics_by_metric_name[k] = metrics[v.id]
elif self._caching and v.id in self._metric_cache: # type: ignore[operator] # TODO: update NoOpDict
metric_dependencies[k] = self._metric_cache[v.id]
resolved_metrics_by_metric_name[k] = self._metric_cache[v.id]
else:
raise ge_exceptions.MetricError(
message=f'Missing metric dependency: {str(k)} for metric "{metric_to_resolve.metric_name}".'
Expand All @@ -358,7 +358,7 @@ def resolve_metrics( # noqa: C901 - 16
"execution_engine": self,
"metric_domain_kwargs": metric_to_resolve.metric_domain_kwargs,
"metric_value_kwargs": metric_to_resolve.metric_value_kwargs,
"metrics": metric_dependencies,
"metrics": resolved_metrics_by_metric_name,
"runtime_configuration": runtime_configuration,
}
if metric_fn is None:
Expand All @@ -367,7 +367,7 @@ def resolve_metrics( # noqa: C901 - 16
metric_fn,
compute_domain_kwargs,
accessor_domain_kwargs,
) = metric_dependencies.pop("metric_partial_fn")
) = resolved_metrics_by_metric_name.pop("metric_partial_fn")
except KeyError as e:
raise ge_exceptions.MetricError(
message=f'Missing metric dependency: {str(e)} for metric "{metric_to_resolve.metric_name}".'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from great_expectations.core.metric_domain_types import MetricDomainTypes
from great_expectations.core.util import AzureUrl, GCSUrl, S3Url, sniff_s3_compression
from great_expectations.execution_engine import ExecutionEngine
from great_expectations.execution_engine.execution_engine import SplitDomainKwargs
from great_expectations.execution_engine.pandas_batch_data import PandasBatchData
from great_expectations.execution_engine.split_and_sample.pandas_data_sampler import (
PandasDataSampler,
Expand Down Expand Up @@ -602,15 +603,15 @@ class MetricDomainTypes.
- a dictionary of accessor_domain_kwargs, describing any accessors needed to
identify the domain within the compute domain
"""
data = self.get_domain_records(domain_kwargs)

table = domain_kwargs.get("table", None)
table: str = domain_kwargs.get("table", None)
if table:
raise ValueError(
"PandasExecutionEngine does not currently support multiple named tables."
)

split_domain_kwargs = self._split_domain_kwargs(
data: pd.DataFrame = self.get_domain_records(domain_kwargs=domain_kwargs)

split_domain_kwargs: SplitDomainKwargs = self._split_domain_kwargs(
domain_kwargs, domain_type, accessor_keys
)

Expand Down
13 changes: 6 additions & 7 deletions great_expectations/execution_engine/sparkdf_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from great_expectations.execution_engine.bundled_metric_configuration import (
BundledMetricConfiguration,
)
from great_expectations.execution_engine.execution_engine import SplitDomainKwargs
from great_expectations.execution_engine.sparkdf_batch_data import SparkDFBatchData
from great_expectations.execution_engine.split_and_sample.sparkdf_data_sampler import (
SparkDataSampler,
Expand Down Expand Up @@ -614,15 +615,15 @@ class MetricDomainTypes.
- a dictionary of accessor_domain_kwargs, describing any accessors needed to
identify the domain within the compute domain
"""
data = self.get_domain_records(domain_kwargs)

table = domain_kwargs.get("table", None)
table: str = domain_kwargs.get("table", None)
if table:
raise ValueError(
"SparkDFExecutionEngine does not currently support multiple named tables."
)

split_domain_kwargs = self._split_domain_kwargs(
data: DataFrame = self.get_domain_records(domain_kwargs=domain_kwargs)

split_domain_kwargs: SplitDomainKwargs = self._split_domain_kwargs(
domain_kwargs, domain_type, accessor_keys
)

Expand Down Expand Up @@ -721,9 +722,7 @@ def resolve_metric_bundle(

for aggregate in aggregates.values():
domain_kwargs: dict = aggregate["domain_kwargs"]
df: Optional[DataFrame] = self.get_domain_records(
domain_kwargs=domain_kwargs,
)
df: DataFrame = self.get_domain_records(domain_kwargs=domain_kwargs)

assert len(aggregate["column_aggregates"]) == len(aggregate["ids"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,12 @@ class MetricDomainTypes.
Returns:
SqlAlchemy column
"""
selectable = self.get_domain_records(domain_kwargs)

split_domain_kwargs = self._split_domain_kwargs(
split_domain_kwargs: SplitDomainKwargs = self._split_domain_kwargs(
domain_kwargs, domain_type, accessor_keys
)

selectable: Selectable = self.get_domain_records(domain_kwargs=domain_kwargs)

return selectable, split_domain_kwargs.compute, split_domain_kwargs.accessor

def _split_column_metric_domain_kwargs( # type: ignore[override] # ExecutionEngine method is static
Expand Down Expand Up @@ -993,8 +993,8 @@ def resolve_metric_bundle(

for query in queries.values():
domain_kwargs: dict = query["domain_kwargs"]
selectable: Any = self.get_domain_records(
domain_kwargs=domain_kwargs,
selectable: Selectable = self.get_domain_records(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning up this Any!

domain_kwargs=domain_kwargs
)

assert len(query["select"]) == len(query["ids"])
Expand Down
Loading