Skip to content

Commit

Permalink
Add max_file_size parameter to chunked parquet dataset writer (NVID…
Browse files Browse the repository at this point in the history
…IA#10718)

Resolves: NVIDIA#10144 

This PR introduces a parameter, `max_file_size` which when specified the parquet dataset writer will create multiple parquet files that are always less than `max_file_size`.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - https://github.com/brandon-b-miller
  - Vukasin Milovanovic (https://github.com/vuule)

URL: rapidsai/cudf#10718
  • Loading branch information
galipremsagar authored May 9, 2022
1 parent ac7492e commit 566f29a
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 22 deletions.
258 changes: 236 additions & 22 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import math
import warnings
from collections import defaultdict
from contextlib import ExitStack
Expand All @@ -16,6 +17,31 @@
from cudf.utils import ioutils
from cudf.utils.utils import _cudf_nvtx_annotate

BYTE_SIZES = {
"kb": 1000,
"mb": 1000000,
"gb": 1000000000,
"tb": 1000000000000,
"pb": 1000000000000000,
"kib": 1024,
"mib": 1048576,
"gib": 1073741824,
"tib": 1099511627776,
"pib": 1125899906842624,
"b": 1,
"": 1,
"k": 1000,
"m": 1000000,
"g": 1000000000,
"t": 1000000000000,
"p": 1000000000000000,
"ki": 1024,
"mi": 1048576,
"gi": 1073741824,
"ti": 1099511627776,
"pi": 1125899906842624,
}


@_cudf_nvtx_annotate
def _write_parquet(
Expand Down Expand Up @@ -672,6 +698,23 @@ def _generate_filename():
return uuid4().hex + ".parquet"


def _get_estimated_file_size(df):
# NOTE: This is purely a guesstimation method
# and the y = mx+c has been arrived
# after extensive experimentation of parquet file size
# vs dataframe sizes.
df_mem_usage = df.memory_usage().sum()
# Parquet file size of a dataframe with all unique values
# seems to be 1/1.5 times as that of on GPU for >10000 rows
# and 0.6 times else-wise.
# Y(file_size) = M(0.6) * X(df_mem_usage) + C(705)
file_size = int((df_mem_usage * 0.6) + 705)
# 1000 Bytes accounted for row-group metadata.
# A parquet file takes roughly ~810 Bytes of metadata per column.
file_size = file_size + 1000 + (810 * df.shape[1])
return file_size


@_cudf_nvtx_annotate
def _get_partitioned(
df,
Expand All @@ -684,17 +727,10 @@ def _get_partitioned(
):
fs = ioutils._ensure_filesystem(fs, root_path, **kwargs)
fs.mkdirs(root_path, exist_ok=True)
if not (set(df._data) - set(partition_cols)):
raise ValueError("No data left to save outside partition columns")

part_names, part_offsets, _, grouped_df = df.groupby(
partition_cols
)._grouped()
if not preserve_index:
grouped_df.reset_index(drop=True, inplace=True)
grouped_df.drop(columns=partition_cols, inplace=True)
# Copy the entire keys df in one operation rather than using iloc
part_names = part_names.to_pandas().to_frame(index=False)
part_names, grouped_df, part_offsets = _get_groups_and_offsets(
df, partition_cols, preserve_index
)

full_paths = []
metadata_file_paths = []
Expand All @@ -712,9 +748,94 @@ def _get_partitioned(
return full_paths, metadata_file_paths, grouped_df, part_offsets, filename


@_cudf_nvtx_annotate
def _get_groups_and_offsets(
df,
partition_cols,
preserve_index=False,
**kwargs,
):

if not (set(df._data) - set(partition_cols)):
raise ValueError("No data left to save outside partition columns")

part_names, part_offsets, _, grouped_df = df.groupby(
partition_cols
)._grouped()
if not preserve_index:
grouped_df.reset_index(drop=True, inplace=True)
grouped_df.drop(columns=partition_cols, inplace=True)
# Copy the entire keys df in one operation rather than using iloc
part_names = part_names.to_pandas().to_frame(index=False)

return part_names, grouped_df, part_offsets


ParquetWriter = libparquet.ParquetWriter


def _parse_bytes(s):
"""Parse byte string to numbers
Utility function vendored from Dask.
>>> _parse_bytes('100')
100
>>> _parse_bytes('100 MB')
100000000
>>> _parse_bytes('100M')
100000000
>>> _parse_bytes('5kB')
5000
>>> _parse_bytes('5.4 kB')
5400
>>> _parse_bytes('1kiB')
1024
>>> _parse_bytes('1e6')
1000000
>>> _parse_bytes('1e6 kB')
1000000000
>>> _parse_bytes('MB')
1000000
>>> _parse_bytes(123)
123
>>> _parse_bytes('5 foos')
Traceback (most recent call last):
...
ValueError: Could not interpret 'foos' as a byte unit
"""
if isinstance(s, (int, float)):
return int(s)
s = s.replace(" ", "")
if not any(char.isdigit() for char in s):
s = "1" + s

for i in range(len(s) - 1, -1, -1):
if not s[i].isalpha():
break
index = i + 1

prefix = s[:index]
suffix = s[index:]

try:
n = float(prefix)
except ValueError as e:
raise ValueError(
"Could not interpret '%s' as a number" % prefix
) from e

try:
multiplier = BYTE_SIZES[suffix.lower()]
except KeyError as e:
raise ValueError(
"Could not interpret '%s' as a byte unit" % suffix
) from e

result = n * multiplier
return int(result)


class ParquetDatasetWriter:
@_cudf_nvtx_annotate
def __init__(
Expand All @@ -724,6 +845,8 @@ def __init__(
index=None,
compression=None,
statistics="ROWGROUP",
max_file_size=None,
file_name_prefix=None,
) -> None:
"""
Write a parquet file or dataset incrementally
Expand All @@ -744,6 +867,15 @@ def __init__(
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
max_file_size : int or str, default None
A file size that cannot be exceeded by the writer.
It is in bytes, if the input is int.
Size can also be a str in form or "10 MB", "1 GB", etc.
If this parameter is used, it is mandatory to pass
`file_name_prefix`.
file_name_prefix : str
This is a prefix to file names generated only when
`max_file_size` is specified.
Examples
Expand Down Expand Up @@ -791,27 +923,109 @@ def __init__(
# Map of partition_col values to their ParquetWriter's index
# in self._chunked_writers for reverse lookup
self.path_cw_map: Dict[str, int] = {}
self.filename = None
self.filename = file_name_prefix
self.max_file_size = max_file_size
if max_file_size is not None:
if file_name_prefix is None:
raise ValueError(
"file_name_prefix cannot be None if max_file_size is "
"passed"
)
self.max_file_size = _parse_bytes(max_file_size)

self._file_sizes: Dict[str, int] = {}

@_cudf_nvtx_annotate
def write_table(self, df):
"""
Write a dataframe to the file/dataset
"""
(
paths,
metadata_file_paths,
grouped_df,
offsets,
self.filename,
) = _get_partitioned(
df,
self.path,
self.partition_cols,
(part_names, grouped_df, part_offsets,) = _get_groups_and_offsets(
df=df,
partition_cols=self.partition_cols,
preserve_index=self.common_args["index"],
filename=self.filename,
)
fs = ioutils._ensure_filesystem(None, self.path)
fs.mkdirs(self.path, exist_ok=True)

full_paths = []
metadata_file_paths = []
full_offsets = [0]

for idx, keys in enumerate(part_names.itertuples(index=False)):
subdir = fs.sep.join(
[
f"{name}={val}"
for name, val in zip(self.partition_cols, keys)
]
)
prefix = fs.sep.join([self.path, subdir])
fs.mkdirs(prefix, exist_ok=True)
current_offset = (part_offsets[idx], part_offsets[idx + 1])
num_chunks = 1
parts = 1

if self.max_file_size is not None:
# get the current partition
start, end = current_offset
sliced_df = grouped_df[start:end]

current_file_size = _get_estimated_file_size(sliced_df)
if current_file_size > self.max_file_size:
# if the file is too large, compute metadata for
# smaller chunks
parts = math.ceil(current_file_size / self.max_file_size)
new_offsets = list(
range(start, end, int((end - start) / parts))
)[1:]
new_offsets.append(end)
num_chunks = len(new_offsets)
parts = len(new_offsets)
full_offsets.extend(new_offsets)
else:
full_offsets.append(end)

curr_file_num = 0
num_chunks = 0
while num_chunks < parts:
new_file_name = f"{self.filename}_{curr_file_num}.parquet"
new_full_path = fs.sep.join([prefix, new_file_name])

# Check if the same `new_file_name` exists and
# generate a `new_file_name`
while new_full_path in self._file_sizes and (
self._file_sizes[new_full_path]
+ (current_file_size / parts)
) > (self.max_file_size):
curr_file_num += 1
new_file_name = (
f"{self.filename}_{curr_file_num}.parquet"
)
new_full_path = fs.sep.join([prefix, new_file_name])

self._file_sizes[new_full_path] = self._file_sizes.get(
new_full_path, 0
) + (current_file_size / parts)
full_paths.append(new_full_path)
metadata_file_paths.append(
fs.sep.join([subdir, new_file_name])
)
num_chunks += 1
curr_file_num += 1
else:
self.filename = self.filename or _generate_filename()
full_path = fs.sep.join([prefix, self.filename])
full_paths.append(full_path)
metadata_file_paths.append(
fs.sep.join([subdir, self.filename])
)
full_offsets.append(current_offset[1])

paths, metadata_file_paths, offsets = (
full_paths,
metadata_file_paths,
full_offsets,
)
existing_cw_batch = defaultdict(dict)
new_cw_paths = []

Expand Down
Loading

0 comments on commit 566f29a

Please sign in to comment.