Skip to content

Commit

Permalink
Intelligent estimation of manifest entry size (#355)
Browse files Browse the repository at this point in the history
* use parquet meta to estimate size

* enable intelligent size estimation

* simplify

* fix by array size estimator

* append content type params if intelligent estimation is enabled

* Few more changes

* Adding UTs

* Add invalid column UT

* Fix log messages

* Adding enums to perform each type of estimation

* address comments

* Adding delta size estimation

* rename manifest module

* Add more tests

* Fix requires content type params

* Ugrade dependencies and bump version

* Adding a case where files to sample is zero

* Export operation type

* Support case when parquet to pyarrow inflation is none

* Add caching in append_content_type_params to avoid redownloading parquet meta

* Only cache when the number of entries is high to avoid constant calls to actor

* Add json context to logs

* Ensure appropriate log level

* Fix circular imports

* Adding
  • Loading branch information
raghumdani authored Sep 30, 2024
1 parent 7862733 commit b0b441e
Show file tree
Hide file tree
Showing 32 changed files with 3,210 additions and 258 deletions.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "1.1.18"
__version__ = "1.1.19"


__all__ = [
Expand Down
72 changes: 70 additions & 2 deletions deltacat/compute/compactor/model/compact_partition_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
PartitionLocator,
SortKey,
)
from deltacat.compute.resource_estimation import (
ResourceEstimationMethod,
EstimateResourcesParams,
)
from deltacat.compute.compactor_v2.constants import (
MAX_RECORDS_PER_COMPACTED_FILE,
MIN_DELTA_BYTES_IN_BATCH,
Expand All @@ -23,6 +27,8 @@
TOTAL_MEMORY_BUFFER_PERCENTAGE,
DEFAULT_DISABLE_COPY_BY_REFERENCE,
DEFAULT_NUM_ROUNDS,
PARQUET_TO_PYARROW_INFLATION,
MAX_PARQUET_METADATA_SIZE,
)
from deltacat.constants import PYARROW_INFLATION_MULTIPLIER
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
Expand Down Expand Up @@ -104,6 +110,22 @@ def of(params: Optional[Dict]) -> CompactPartitionParams:
result.metrics_config = params.get("metrics_config")

result.num_rounds = params.get("num_rounds", DEFAULT_NUM_ROUNDS)
result.parquet_to_pyarrow_inflation = params.get(
"parquet_to_pyarrow_inflation", PARQUET_TO_PYARROW_INFLATION
)
result.resource_estimation_method = ResourceEstimationMethod[
params.get(
"resource_estimation_method", ResourceEstimationMethod.DEFAULT.value
)
]

# disable input split during rebase as the rebase files are already uniform
result.enable_input_split = (
params.get("rebase_source_partition_locator") is None
)
result.max_parquet_meta_size_bytes = params.get(
"max_parquet_meta_size_bytes", MAX_PARQUET_METADATA_SIZE
)

if not importlib.util.find_spec("memray"):
result.enable_profiler = False
Expand Down Expand Up @@ -414,13 +436,59 @@ def num_rounds(self, num_rounds: int) -> None:
self["num_rounds"] = num_rounds

@property
def parquet_to_pyarrow_inflation(self) -> int:
def parquet_to_pyarrow_inflation(self) -> float:
"""
The inflation factor for the parquet uncompressed_size_bytes to pyarrow table size.
"""
return self["parquet_to_pyarrow_inflation"]

@parquet_to_pyarrow_inflation.setter
def parquet_to_pyarrow_inflation(self, value: int) -> None:
def parquet_to_pyarrow_inflation(self, value: float) -> None:
self["parquet_to_pyarrow_inflation"] = value

@property
def enable_input_split(self) -> bool:
"""
When this is True, the input split will be always enabled for parquet files.
The input split feature will split the parquet files into individual row groups
so that we could process them in different nodes in parallel.
By default, input split is enabled for incremental compaction and disabled for rebase or backfill.
"""
return self["enable_input_split"]

@enable_input_split.setter
def enable_input_split(self, value: bool) -> None:
self["enable_input_split"] = value

@property
def max_parquet_meta_size_bytes(self) -> int:
"""
The maximum size of the parquet metadata in bytes. Used for allocating tasks
to fetch parquet metadata.
"""
return self["max_parquet_meta_size_bytes"]

@max_parquet_meta_size_bytes.setter
def max_parquet_meta_size_bytes(self, value: int) -> None:
self["max_parquet_meta_size_bytes"] = value

@property
def resource_estimation_method(self) -> ResourceEstimationMethod:
return self["resource_estimation_method"]

@resource_estimation_method.setter
def resource_estimation_method(self, value: ResourceEstimationMethod) -> None:
self["resource_estimation_method"] = value

@property
def estimate_resources_params(self) -> EstimateResourcesParams:
return EstimateResourcesParams.of(
resource_estimation_method=self.resource_estimation_method,
previous_inflation=self.previous_inflation,
parquet_to_pyarrow_inflation=self.parquet_to_pyarrow_inflation,
average_record_size_bytes=self.average_record_size_bytes,
)

@staticmethod
def json_handler_for_compact_partition_params(obj):
"""
Expand Down
26 changes: 26 additions & 0 deletions deltacat/compute/compactor/model/compaction_session_audit_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,22 @@ def compactor_version(self) -> str:
"""
return self.get("compactorVersion")

@property
def observed_input_inflation(self) -> float:
"""
The average inflation observed for input files only.
This only accounts for files in the source.
"""
return self.get("observedInputInflation")

@property
def observed_input_average_record_size_bytes(self) -> float:
"""
The average record size observed for input files only.
This only accounts for files in the source.
"""
return self.get("observedInputAverageRecordSizeBytes")

# Setters follow

def set_audit_url(self, audit_url: str) -> CompactionSessionAuditInfo:
Expand Down Expand Up @@ -756,6 +772,16 @@ def set_compactor_version(self, value: str) -> CompactionSessionAuditInfo:
self["compactorVersion"] = value
return self

def set_observed_input_inflation(self, value: float) -> CompactionSessionAuditInfo:
self["observedInputInflation"] = value
return self

def set_observed_input_average_record_size_bytes(
self, value: float
) -> CompactionSessionAuditInfo:
self["observedInputAverageRecordSizeBytes"] = value
return self

# High level methods to save stats
def save_step_stats(
self,
Expand Down
25 changes: 16 additions & 9 deletions deltacat/compute/compactor/model/delta_annotated.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def rebatch(
estimation_function: Optional[
Callable[[ManifestEntry], float]
] = lambda entry: entry.meta.content_length,
enable_input_split: Optional[bool] = False,
) -> List[DeltaAnnotated]:
"""
Simple greedy algorithm to split/merge 1 or more annotated deltas into
Expand All @@ -86,13 +87,19 @@ def rebatch(
new_da_bytes = 0
da_group_entry_count = 0

for delta_annotated in annotated_deltas:
split_annotated_deltas.extend(DeltaAnnotated._split_single(delta_annotated))
if enable_input_split:
for delta_annotated in annotated_deltas:
split_annotated_deltas.extend(
DeltaAnnotated._split_single(delta_annotated)
)

logger.info(
f"Split the {len(annotated_deltas)} annotated deltas "
f"into {len(split_annotated_deltas)} groups."
)
logger.info(
f"Split the {len(annotated_deltas)} annotated deltas "
f"into {len(split_annotated_deltas)} groups."
)
else:
logger.info("Skipping input split as it is disabled...")
split_annotated_deltas = annotated_deltas

for src_da in split_annotated_deltas:
src_da_annotations = src_da.annotations
Expand All @@ -107,7 +114,7 @@ def rebatch(
# (i.e. the previous compaction round ran a rebase)
if new_da and src_da.locator != new_da.locator:
groups.append(new_da)
logger.info(
logger.debug(
f"Due to different delta locator, Appending group of {da_group_entry_count} elements "
f"and {new_da_bytes} bytes"
)
Expand All @@ -126,12 +133,12 @@ def rebatch(
or da_group_entry_count >= min_file_counts
):
if new_da_bytes >= min_delta_bytes:
logger.info(
logger.debug(
f"Appending group of {da_group_entry_count} elements "
f"and {new_da_bytes} bytes to meet file size limit"
)
if da_group_entry_count >= min_file_counts:
logger.info(
logger.debug(
f"Appending group of {da_group_entry_count} elements "
f"and {da_group_entry_count} files to meet file count limit"
)
Expand Down
3 changes: 3 additions & 0 deletions deltacat/compute/compactor_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
# size in metadata to pyarrow table size.
PARQUET_TO_PYARROW_INFLATION = 4

# Maximum size of the parquet metadata
MAX_PARQUET_METADATA_SIZE = 100_000_000 # 100 MB

# By default, copy by reference is enabled
DEFAULT_DISABLE_COPY_BY_REFERENCE = False

Expand Down
14 changes: 9 additions & 5 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,8 @@ def _build_uniform_deltas(
input_deltas=input_deltas,
hash_bucket_count=params.hash_bucket_count,
compaction_audit=mutable_compaction_audit,
compact_partition_params=params,
deltacat_storage=params.deltacat_storage,
previous_inflation=params.previous_inflation,
min_delta_bytes=params.min_delta_bytes_in_batch,
min_file_counts=params.min_files_in_batch,
# disable input split during rebase as the rebase files are already uniform
enable_input_split=params.rebase_source_partition_locator is None,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)
delta_discovery_end: float = time.monotonic()
Expand Down Expand Up @@ -400,6 +396,7 @@ def _merge(
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
estimate_resources_params=params.estimate_resources_params,
)

def merge_input_provider(index, item) -> dict[str, MergeInput]:
Expand Down Expand Up @@ -463,6 +460,7 @@ def _hash_bucket(
primary_keys=params.primary_keys,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
estimate_resources_params=params.estimate_resources_params,
)

def hash_bucket_input_provider(index, item) -> dict[str, HashBucketInput]:
Expand Down Expand Up @@ -537,6 +535,7 @@ def _run_local_merge(
ray_custom_resources=params.ray_custom_resources,
primary_keys=params.primary_keys,
memory_logs_enabled=params.memory_logs_enabled,
estimate_resources_params=params.estimate_resources_params,
)
local_merge_result = ray.get(
mg.merge.options(**local_merge_options).remote(local_merge_input)
Expand Down Expand Up @@ -666,6 +665,11 @@ def _write_new_round_completion_file(
f" and average record size={input_average_record_size_bytes}"
)

mutable_compaction_audit.set_observed_input_inflation(input_inflation)
mutable_compaction_audit.set_observed_input_average_record_size_bytes(
input_average_record_size_bytes
)

_update_and_upload_compaction_audit(
params,
mutable_compaction_audit,
Expand Down
Loading

0 comments on commit b0b441e

Please sign in to comment.