Skip to content

Commit

Permalink
[time-window-partitions][perf] stop validating when unnecessary (dags…
Browse files Browse the repository at this point in the history
…ter-io#15550)

## Summary & Motivation

When we create a TimeWindowPartitionsSubset, there's an implicit
contract that it doesn't contain invalid partition keys. Sometimes, we
create this subset using time windows directly, but sometimes
(especially in the partition mapping case), we create this subset with
explicit keys (which then need to be converted into time windows later
on).

This short-circuits some of the checking logic in the conversion from
regular partition string to time window.

This is especially important in cases where the start_offset /
end_offset are non-zero, as these end up generating very expensive calls
to get_last_partition_window / get_first_partition_window

## How I Tested These Changes
  • Loading branch information
OwenKephart committed Jul 28, 2023
1 parent 8c75d6e commit 8c1ec5c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def time_window_for_partition_key(self, partition_key: str) -> TimeWindow:
def time_windows_for_partition_keys(
self,
partition_keys: Sequence[str],
validate: bool = True,
) -> Sequence[TimeWindow]:
if len(partition_keys) == 0:
return []
Expand All @@ -348,20 +349,21 @@ def time_windows_for_partition_keys(
)
partition_key_time_windows.append(next(cur_windows_iterator))

start_time_window = self.get_first_partition_window()
end_time_window = self.get_last_partition_window()
if validate:
start_time_window = self.get_first_partition_window()
end_time_window = self.get_last_partition_window()

if end_time_window is None or start_time_window is None:
check.failed("No partitions in the PartitionsDefinition")
if start_time_window is None or end_time_window is None:
check.failed("No partitions in the PartitionsDefinition")

start_timestamp = start_time_window.start.timestamp()
end_timestamp = end_time_window.end.timestamp()
start_timestamp = start_time_window.start.timestamp()
end_timestamp = end_time_window.end.timestamp()

partition_key_time_windows = [
tw
for tw in partition_key_time_windows
if tw.start.timestamp() >= start_timestamp and tw.end.timestamp() <= end_timestamp
]
partition_key_time_windows = [
tw
for tw in partition_key_time_windows
if tw.start.timestamp() >= start_timestamp and tw.end.timestamp() <= end_timestamp
]
return partition_key_time_windows

def start_time_for_partition_key(self, partition_key: str) -> datetime:
Expand Down Expand Up @@ -1395,6 +1397,7 @@ def included_time_windows(self) -> Sequence[TimeWindow]:
result_time_windows, _ = self._add_partitions_to_time_windows(
initial_windows=[],
partition_keys=list(check.not_none(self._included_partition_keys)),
validate=False,
)
self._included_time_windows = result_time_windows
return self._included_time_windows
Expand Down Expand Up @@ -1474,14 +1477,17 @@ def get_partition_key_ranges(
]

def _add_partitions_to_time_windows(
self, initial_windows: Sequence[TimeWindow], partition_keys: Sequence[str]
self,
initial_windows: Sequence[TimeWindow],
partition_keys: Sequence[str],
validate: bool = True,
) -> Tuple[Sequence[TimeWindow], int]:
"""Merges a set of partition keys into an existing set of time windows, returning the
minimized set of time windows and the number of partitions added.
"""
result_windows = [*initial_windows]
time_windows = self._partitions_def.time_windows_for_partition_keys(
list(partition_keys),
list(partition_keys), validate=validate
)
num_added_partitions = 0
for window in sorted(time_windows):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,6 @@ def test_daily_to_daily_many_to_one():
["2021-05-05", "2021-05-06"],
datetime(2021, 5, 6, 1),
),
(
DailyPartitionsDefinition(start_date="2022-01-01"),
DailyPartitionsDefinition(start_date="2021-01-01"),
["2021-12-31"],
[],
datetime(2022, 1, 1, 1),
),
(
DailyPartitionsDefinition(start_date="2022-01-01"),
DailyPartitionsDefinition(start_date="2021-01-01"),
["2021-12-30"],
[],
datetime(2022, 12, 31, 1),
),
(
DailyPartitionsDefinition(start_date="2022-01-01"),
DailyPartitionsDefinition(start_date="2021-01-01"),
Expand Down

0 comments on commit 8c1ec5c

Please sign in to comment.