diff --git a/nvtabular/ops/data_stats.py b/nvtabular/ops/data_stats.py index d62d203b075..3961d491f73 100644 --- a/nvtabular/ops/data_stats.py +++ b/nvtabular/ops/data_stats.py @@ -1,101 +1,101 @@ -# -# Copyright (c) 2020, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import dask_cudf -import numpy as np -from nvtx import annotate - -from .operator import ColumnNames -from .stat_operator import StatOperator - - -class DataStats(StatOperator): - def __init__(self): - super().__init__() - self.col_names = [] - self.col_types = [] - self.col_dtypes = [] - self.output = {} - - @annotate("DataStats_fit", color="green", domain="nvt_python") - def fit(self, columns: ColumnNames, ddf: dask_cudf.DataFrame): - dask_stats = {} - - ddf_dtypes = ddf.head(1) - - # For each column, calculate the stats - for col in columns: - dask_stats[col] = {} - self.col_names.append(col) - # Get dtype for all - dtype = ddf_dtypes[col].dtype - self.col_dtypes.append(dtype) - - # Identify column type - if np.issubdtype(dtype, np.floating): - col_type = "conts" - else: - col_type = "cats" - self.col_types.append(col_type) - - # Get cardinality for cats - if col_type == "cats": - dask_stats[col]["cardinality"] = ddf[col].nunique() - - # if string, replace string for their lengths for the rest of the computations - if dtype == "object": - ddf[col] = ddf[col].map_partitions(lambda x: x.str.len(), meta=("x", int)) - # Add list support when cudf supports it: - # https://github.com/rapidsai/cudf/issues/7157 - # elif col_type == "cat_mh": - # ddf[col] = ddf[col].map_partitions(lambda x: x.list.len()) - - # Get min,max, and mean - dask_stats[col]["min"] = ddf[col].min() - dask_stats[col]["max"] = ddf[col].max() - dask_stats[col]["mean"] = ddf[col].mean() - - # Get std only for conts - if col_type == "conts": - dask_stats[col]["std"] = ddf[col].std() - - # Get Percentage of NaNs for all - dask_stats[col]["per_nan"] = 100 * (1 - ddf[col].count() / len(ddf[col])) - - return dask_stats - - def fit_finalize(self, dask_stats): - for i, col in enumerate(self.col_names): - # Add dtype - dask_stats[col]["dtype"] = str(self.col_dtypes[i]) - # Cast types for yaml - if isinstance(dask_stats[col]["mean"], np.floating): - dask_stats[col]["mean"] = dask_stats[col]["mean"].item() - if isinstance(dask_stats[col]["per_nan"], np.floating): - dask_stats[col]["per_nan"] = dask_stats[col]["per_nan"].item() - if self.col_types[i] == "conts": - if isinstance(dask_stats[col]["std"], np.floating): - dask_stats[col]["std"] = dask_stats[col]["std"].item() - else: - if isinstance(dask_stats[col]["cardinality"], np.integer): - dask_stats[col]["cardinality"] = dask_stats[col]["cardinality"].item() - self.output = dask_stats - - def clear(self): - self.output = {} - - # transform.__doc__ = Operator.transform.__doc__ - fit.__doc__ = StatOperator.fit.__doc__ - fit_finalize.__doc__ = StatOperator.fit_finalize.__doc__ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import dask_cudf +import numpy as np +from nvtx import annotate + +from .operator import ColumnNames +from .stat_operator import StatOperator + + +class DataStats(StatOperator): + def __init__(self): + super().__init__() + self.col_names = [] + self.col_types = [] + self.col_dtypes = [] + self.output = {} + + @annotate("DataStats_fit", color="green", domain="nvt_python") + def fit(self, columns: ColumnNames, ddf: dask_cudf.DataFrame): + dask_stats = {} + + ddf_dtypes = ddf.head(1) + + # For each column, calculate the stats + for col in columns: + dask_stats[col] = {} + self.col_names.append(col) + # Get dtype for all + dtype = ddf_dtypes[col].dtype + self.col_dtypes.append(dtype) + + # Identify column type + if np.issubdtype(dtype, np.floating): + col_type = "conts" + else: + col_type = "cats" + self.col_types.append(col_type) + + # Get cardinality for cats + if col_type == "cats": + dask_stats[col]["cardinality"] = ddf[col].nunique() + + # if string, replace string for their lengths for the rest of the computations + if dtype == "object": + ddf[col] = ddf[col].map_partitions(lambda x: x.str.len(), meta=("x", int)) + # Add list support when cudf supports it: + # https://github.com/rapidsai/cudf/issues/7157 + # elif col_type == "cat_mh": + # ddf[col] = ddf[col].map_partitions(lambda x: x.list.len()) + + # Get min,max, and mean + dask_stats[col]["min"] = ddf[col].min() + dask_stats[col]["max"] = ddf[col].max() + dask_stats[col]["mean"] = ddf[col].mean() + + # Get std only for conts + if col_type == "conts": + dask_stats[col]["std"] = ddf[col].std() + + # Get Percentage of NaNs for all + dask_stats[col]["per_nan"] = 100 * (1 - ddf[col].count() / len(ddf[col])) + + return dask_stats + + def fit_finalize(self, dask_stats): + for i, col in enumerate(self.col_names): + # Add dtype + dask_stats[col]["dtype"] = str(self.col_dtypes[i]) + # Cast types for yaml + if isinstance(dask_stats[col]["mean"], np.floating): + dask_stats[col]["mean"] = dask_stats[col]["mean"].item() + if isinstance(dask_stats[col]["per_nan"], np.floating): + dask_stats[col]["per_nan"] = dask_stats[col]["per_nan"].item() + if self.col_types[i] == "conts": + if isinstance(dask_stats[col]["std"], np.floating): + dask_stats[col]["std"] = dask_stats[col]["std"].item() + else: + if isinstance(dask_stats[col]["cardinality"], np.integer): + dask_stats[col]["cardinality"] = dask_stats[col]["cardinality"].item() + self.output = dask_stats + + def clear(self): + self.output = {} + + # transform.__doc__ = Operator.transform.__doc__ + fit.__doc__ = StatOperator.fit.__doc__ + fit_finalize.__doc__ = StatOperator.fit_finalize.__doc__ diff --git a/nvtabular/tools/inspector_script.py b/nvtabular/tools/inspector_script.py index 034077c71d1..5ba7e681a2e 100644 --- a/nvtabular/tools/inspector_script.py +++ b/nvtabular/tools/inspector_script.py @@ -1,172 +1,172 @@ -# -# Copyright (c) 2020, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import argparse -import contextlib -import json -import os - -import fsspec -import rmm -from dask.distributed import Client -from dask_cuda import LocalCUDACluster - -import nvtabular.tools.dataset_inspector as datains -from nvtabular.io import Dataset -from nvtabular.utils import device_mem_size, get_rmm_size - - -def setup_rmm_pool(client, device_pool_size): - # Initialize an RMM pool allocator. - # Note: RMM may require the pool size to be a multiple of 256. - device_pool_size = get_rmm_size(device_pool_size) - client.run(rmm.reinitialize, pool_allocator=True, initial_pool_size=device_pool_size) - return None - - -@contextlib.contextmanager -def managed_client(devices, device_limit, protocol): - client = Client( - LocalCUDACluster( - protocol=protocol, - n_workers=len(devices.split(",")), - enable_nvlink=(protocol == "ucx"), - device_memory_limit=device_limit, - ) - ) - try: - yield client - finally: - client.shutdown() - - -def parse_args(): - """ - Use the inspector script indicating the config fil, path, format, - gpus to use and (optional) the output file name - - python inspector_script.py -c config_file.json -d dataset_path -f parquet - -g "0,1,2,3,4" -o dataset_info.json - """ - parser = argparse.ArgumentParser(description=("Dataset Inspect Tool")) - # Config file - parser.add_argument( - "-c", - "--config_file", - type=str, - help="Dataset columns type (Required)", - ) - # Dataset path - parser.add_argument( - "--data_path", - default="0", - type=str, - help="Input dataset path (Required)", - ) - - # Number of GPUs to use - parser.add_argument( - "-d", - "--devices", - default=os.environ.get("CUDA_VISIBLE_DEVICES", "0"), - type=str, - help='Comma-separated list of visible devices (e.g. "0,1,2,3"). ' - "The number of visible devices dictates the number of Dask workers (GPU processes) " - "The CUDA_VISIBLE_DEVICES environment variable will be used by default", - ) - - # Device limit - parser.add_argument( - "--device-limit-frac", - default=0.8, - type=float, - help="Worker device-memory limit as a fraction of GPU capacity (Default 0.8). " - "The worker will try to spill data to host memory beyond this limit", - ) - - # RMM pool size - parser.add_argument( - "--device-pool-frac", - default=0.9, - type=float, - help="RMM pool size for each worker as a fraction of GPU capacity (Default 0.9). " - "If 0 is specified, the RMM pool will be disabled", - ) - - # Dataset format - parser.add_argument( - "-f", - "--format", - choices=["csv", "parquet"], - default="parquet", - type=str, - help="Dataset format (Default 'parquet')", - ) - - # Partition size - parser.add_argument( - "--part-mem-frac", - default=0.125, - type=float, - help="Maximum size desired for dataset partitions as a fraction " - "of GPU capacity (Default 0.125)", - ) - - # Protocol - parser.add_argument( - "-p", - "--protocol", - choices=["tcp", "ucx"], - default="tcp", - type=str, - help="Communication protocol to use (Default 'tcp')", - ) - - # Output file name - parser.add_argument( - "-o", - "--output_file", - default="dataset_info.json", - type=str, - help="Output file name (Default 'dataset_info.json')", - ) - args = parser.parse_args() - return args - - -def main(args): - # Get device configuration - device_size = device_mem_size(kind="total") - device_limit = int(args.device_limit_frac * device_size) - device_pool_size = int(args.device_pool_frac * device_size) - part_size = int(args.part_mem_frac * device_size) - - # Get dataset columns - with fsspec.open(args.config_file) as f: - config = json.load(f) - - # Create Dataset - dataset = Dataset(args.data_path, engine=args.format, part_size=part_size) - - # Call Inspector - with managed_client(args.devices, device_limit, args.protocol) as client: - setup_rmm_pool(client, device_pool_size) - a = datains.DatasetInspector(client) - a.inspect(dataset, config, args.output_file) - - -if __name__ == "__main__": - main(parse_args()) +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import contextlib +import json +import os + +import fsspec +import rmm +from dask.distributed import Client +from dask_cuda import LocalCUDACluster + +import nvtabular.tools.dataset_inspector as datains +from nvtabular.io import Dataset +from nvtabular.utils import device_mem_size, get_rmm_size + + +def setup_rmm_pool(client, device_pool_size): + # Initialize an RMM pool allocator. + # Note: RMM may require the pool size to be a multiple of 256. + device_pool_size = get_rmm_size(device_pool_size) + client.run(rmm.reinitialize, pool_allocator=True, initial_pool_size=device_pool_size) + return None + + +@contextlib.contextmanager +def managed_client(devices, device_limit, protocol): + client = Client( + LocalCUDACluster( + protocol=protocol, + n_workers=len(devices.split(",")), + enable_nvlink=(protocol == "ucx"), + device_memory_limit=device_limit, + ) + ) + try: + yield client + finally: + client.shutdown() + + +def parse_args(): + """ + Use the inspector script indicating the config fil, path, format, + gpus to use and (optional) the output file name + + python inspector_script.py -c config_file.json -d dataset_path -f parquet + -g "0,1,2,3,4" -o dataset_info.json + """ + parser = argparse.ArgumentParser(description=("Dataset Inspect Tool")) + # Config file + parser.add_argument( + "-c", + "--config_file", + type=str, + help="Dataset columns type (Required)", + ) + # Dataset path + parser.add_argument( + "--data_path", + default="0", + type=str, + help="Input dataset path (Required)", + ) + + # Number of GPUs to use + parser.add_argument( + "-d", + "--devices", + default=os.environ.get("CUDA_VISIBLE_DEVICES", "0"), + type=str, + help='Comma-separated list of visible devices (e.g. "0,1,2,3"). ' + "The number of visible devices dictates the number of Dask workers (GPU processes) " + "The CUDA_VISIBLE_DEVICES environment variable will be used by default", + ) + + # Device limit + parser.add_argument( + "--device-limit-frac", + default=0.8, + type=float, + help="Worker device-memory limit as a fraction of GPU capacity (Default 0.8). " + "The worker will try to spill data to host memory beyond this limit", + ) + + # RMM pool size + parser.add_argument( + "--device-pool-frac", + default=0.9, + type=float, + help="RMM pool size for each worker as a fraction of GPU capacity (Default 0.9). " + "If 0 is specified, the RMM pool will be disabled", + ) + + # Dataset format + parser.add_argument( + "-f", + "--format", + choices=["csv", "parquet"], + default="parquet", + type=str, + help="Dataset format (Default 'parquet')", + ) + + # Partition size + parser.add_argument( + "--part-mem-frac", + default=0.125, + type=float, + help="Maximum size desired for dataset partitions as a fraction " + "of GPU capacity (Default 0.125)", + ) + + # Protocol + parser.add_argument( + "-p", + "--protocol", + choices=["tcp", "ucx"], + default="tcp", + type=str, + help="Communication protocol to use (Default 'tcp')", + ) + + # Output file name + parser.add_argument( + "-o", + "--output_file", + default="dataset_info.json", + type=str, + help="Output file name (Default 'dataset_info.json')", + ) + args = parser.parse_args() + return args + + +def main(args): + # Get device configuration + device_size = device_mem_size(kind="total") + device_limit = int(args.device_limit_frac * device_size) + device_pool_size = int(args.device_pool_frac * device_size) + part_size = int(args.part_mem_frac * device_size) + + # Get dataset columns + with fsspec.open(args.config_file) as f: + config = json.load(f) + + # Create Dataset + dataset = Dataset(args.data_path, engine=args.format, part_size=part_size) + + # Call Inspector + with managed_client(args.devices, device_limit, args.protocol) as client: + setup_rmm_pool(client, device_pool_size) + a = datains.DatasetInspector(client) + a.inspect(dataset, config, args.output_file) + + +if __name__ == "__main__": + main(parse_args()) diff --git a/tests/unit/test_tools.py b/tests/unit/test_tools.py index dd85948e4d1..69e1bb876e0 100644 --- a/tests/unit/test_tools.py +++ b/tests/unit/test_tools.py @@ -1,222 +1,222 @@ -import glob -import json -import os - -import cudf -import fsspec -import numpy as np -import pytest - -import nvtabular.tools.data_gen as datagen -import nvtabular.tools.dataset_inspector as datains -from nvtabular.io import Dataset - -json_sample = { - "conts": { - "cont_1": {"dtype": np.float32, "min_val": 0, "max_val": 1}, - "cont_2": {"dtype": np.float32, "min_val": 0, "max_val": 1}, - "cont_3": {"dtype": np.float32, "min_val": 0, "max_val": 1}, - "cont_4": {"dtype": np.float32, "min_val": 0, "max_val": 1}, - "cont_5": {"dtype": np.float32, "min_val": 0, "max_val": 1}, - }, - "cats": { - "cat_1": { - "dtype": None, - "cardinality": 50, - "min_entry_size": 1, - "max_entry_size": 5, - "multi_min": 2, - "multi_max": 5, - "multi_avg": 3, - }, - "cat_2": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, - "cat_3": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, - "cat_4": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, - "cat_5": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, - }, - "labels": {"lab_1": {"dtype": None, "cardinality": 2}}, -} - -distros = { - "cont_1": {"name": "powerlaw", "params": {"alpha": 0.1}}, - "cont_2": {"name": "powerlaw", "params": {"alpha": 0.2}}, - "cat_1": {"name": "powerlaw", "params": {"alpha": 0.1}}, - "cat_2": {"name": "powerlaw", "params": {"alpha": 0.2}}, -} - - -@pytest.mark.parametrize("num_rows", [1000, 10000]) -@pytest.mark.parametrize("distro", [None, distros]) -def test_powerlaw(num_rows, distro): - json_sample["num_rows"] = num_rows - cats = list(json_sample["cats"].keys())[1:] - - cols = datagen._get_cols_from_schema(json_sample, distros=distro) - - df_gen = datagen.DatasetGen(datagen.PowerLawDistro(0.1)) - df_pw = cudf.DataFrame() - for x in range(10): - df_pw_1 = df_gen.create_df(num_rows, cols) - df_pw = cudf.concat([df_pw, df_pw_1], axis=0) - sts, ps = df_gen.verify_df(df_pw[cats]) - assert all(s > 0.9 for s in sts) - - -@pytest.mark.parametrize("num_rows", [1000, 10000]) -@pytest.mark.parametrize("distro", [None, distros]) -def test_uniform(num_rows, distro): - json_sample["num_rows"] = num_rows - cats = list(json_sample["cats"].keys())[1:] - cols = datagen._get_cols_from_schema(json_sample, distros=distro) - - df_gen = datagen.DatasetGen(datagen.UniformDistro()) - df_uni = df_gen.create_df(num_rows, cols) - sts, ps = df_gen.verify_df(df_uni[cats]) - assert all(s > 0.9 for s in sts) - - -@pytest.mark.parametrize("num_rows", [1000, 10000]) -@pytest.mark.parametrize("distro", [None, distros]) -def test_width(num_rows, distro): - json_sample_1 = { - "conts": { - "cont_1": {"dtype": np.float32, "min_val": 0, "max_val": 1, "width": 20}, - } - } - json_sample_1["num_rows"] = num_rows - cols = datagen._get_cols_from_schema(json_sample_1, distros=distro) - - df_gen = datagen.DatasetGen(datagen.UniformDistro()) - df_uni = df_gen.create_df(num_rows, cols) - assert df_uni.shape[1] == 20 - - -@pytest.mark.parametrize("num_rows", [1000, 10000]) -@pytest.mark.parametrize("distro", [None, distros]) -def test_cat_rep(num_rows, distro): - json_sample["num_rows"] = num_rows - cats = list(json_sample["cats"].keys()) - cols = datagen._get_cols_from_schema(json_sample, distros=distro) - - df_gen = datagen.DatasetGen(datagen.UniformDistro()) - df_uni = df_gen.create_df(num_rows, cols, entries=True) - df_cats = df_uni[cats] - assert df_cats.shape[1] == len(cats) - assert df_cats.shape[0] == num_rows - cats_rep = cols["cats"] - for idx, cat in enumerate(cats[1:]): - assert df_uni[cat].nunique() == cats_rep[idx + 1].cardinality - assert df_uni[cat].str.len().min() == cats_rep[idx + 1].min_entry_size - assert df_uni[cat].str.len().max() == cats_rep[idx + 1].max_entry_size - check_ser = cudf.Series(df_uni[cats[0]]._column.elements.values_host) - assert check_ser.nunique() == cats_rep[0].cardinality - assert check_ser.str.len().min() == cats_rep[0].min_entry_size - assert check_ser.str.len().max() == cats_rep[0].max_entry_size - - -def test_json_convert(): - cols = datagen._get_cols_from_schema(json_sample) - assert len(cols["conts"]) == len(json_sample["conts"].keys()) - assert len(cols["cats"]) == len(json_sample["cats"].keys()) - assert len(cols["labels"]) == len(json_sample["labels"].keys()) - - -@pytest.mark.parametrize("num_rows", [1000, 100000]) -@pytest.mark.parametrize("distro", [None, distros]) -def test_full_df(num_rows, tmpdir, distro): - json_sample["num_rows"] = num_rows - cats = list(json_sample["cats"].keys()) - cols = datagen._get_cols_from_schema(json_sample, distros=distro) - - df_gen = datagen.DatasetGen(datagen.UniformDistro(), gpu_frac=0.00001) - df_files = df_gen.full_df_create(num_rows, cols, entries=True, output=tmpdir) - test_size = 0 - full_df = cudf.DataFrame() - for fi in df_files: - df = cudf.read_parquet(fi) - test_size = test_size + df.shape[0] - full_df = cudf.concat([full_df, df]) - assert test_size == num_rows - conts_rep = cols["conts"] - cats_rep = cols["cats"] - labels_rep = cols["labels"] - assert df.shape[1] == len(conts_rep) + len(cats_rep) + len(labels_rep) - for idx, cat in enumerate(cats[1:]): - dist = cats_rep[idx + 1].distro or df_gen.dist - if type(full_df[cat]._column) is not cudf.core.column.string.StringColumn: - sts, ps = dist.verify(full_df[cat].to_pandas()) - assert all(s > 0.9 for s in sts) - assert full_df[cat].nunique() == cats_rep[idx + 1].cardinality - assert full_df[cat].str.len().min() == cats_rep[idx + 1].min_entry_size - assert full_df[cat].str.len().max() == cats_rep[idx + 1].max_entry_size - check_ser = cudf.Series(full_df[cats[0]]._column.elements.values_host) - dist = cats_rep[0].distro or df_gen.dist - if type(full_df[cat]._column) is not cudf.core.column.string.StringColumn: - sts, ps = dist.verify(full_df[cats[0].to_pandas()]) - assert all(s > 0.9 for s in sts) - assert check_ser.nunique() == cats_rep[0].cardinality - assert check_ser.str.len().min() == cats_rep[0].min_entry_size - assert check_ser.str.len().max() == cats_rep[0].max_entry_size - - -@pytest.mark.parametrize("engine", ["parquet"]) -@pytest.mark.parametrize("dist", ["uniform"]) -def test_inspect_datagen(tmpdir, datasets, engine, dist): - # Dataset - paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0]) - - # Dataset columns type config - columns_dict = {} - columns_dict["cats"] = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - columns_dict["conts"] = ["x", "y"] - columns_dict["labels"] = ["label"] - - # Create inspector and inspect - output_inspect1 = tmpdir + "/dataset_info1.json" - dataset = Dataset(paths, engine=engine) - a = datains.DatasetInspector() - a.inspect(dataset, columns_dict, output_inspect1) - assert os.path.isfile(output_inspect1) - - # Generate dataset using data_gen tool - output_datagen = tmpdir + "/datagen" - os.mkdir(output_datagen) - with fsspec.open(output_inspect1) as f: - output1 = json.load(f) - cols = datagen._get_cols_from_schema(output1) - if dist == "uniform": - df_gen = datagen.DatasetGen(datagen.UniformDistro(), gpu_frac=0.00001) - else: - df_gen = datagen.DatasetGen(datagen.PowerLawDistro(0.1), gpu_frac=0.00001) - - output_datagen_files = df_gen.full_df_create( - output1["num_rows"], cols, entries=True, output=output_datagen - ) - - # Inspect again and check output are the same - output_inspect2 = tmpdir + "/dataset_info2.json" - dataset = Dataset(output_datagen_files, engine=engine) - a.inspect(dataset, columns_dict, output_inspect2) - assert os.path.isfile(output_inspect2) - - # Compare json outputs - with fsspec.open(output_inspect2) as f: - output2 = json.load(f) - for k1 in output1.keys(): - if k1 == "num_rows": - assert output1[k1] == output2[k1] - else: - for k2 in output1[k1].keys(): - for k3 in output1[k1][k2].keys(): - if k3 == "dtype": - if output1[k1][k2][k3] == "object": - assert ( - output1[k1][k2][k3] == output2[k1][k2][k3] - or "int64" == output2[k1][k2][k3] - ) - else: - assert output1[k1][k2][k3] == output2[k1][k2][k3] - else: - assert output1[k1][k2][k3] == pytest.approx( - output2[k1][k2][k3], rel=1e-0, abs=1e-0 - ) +import glob +import json +import os + +import cudf +import fsspec +import numpy as np +import pytest + +import nvtabular.tools.data_gen as datagen +import nvtabular.tools.dataset_inspector as datains +from nvtabular.io import Dataset + +json_sample = { + "conts": { + "cont_1": {"dtype": np.float32, "min_val": 0, "max_val": 1}, + "cont_2": {"dtype": np.float32, "min_val": 0, "max_val": 1}, + "cont_3": {"dtype": np.float32, "min_val": 0, "max_val": 1}, + "cont_4": {"dtype": np.float32, "min_val": 0, "max_val": 1}, + "cont_5": {"dtype": np.float32, "min_val": 0, "max_val": 1}, + }, + "cats": { + "cat_1": { + "dtype": None, + "cardinality": 50, + "min_entry_size": 1, + "max_entry_size": 5, + "multi_min": 2, + "multi_max": 5, + "multi_avg": 3, + }, + "cat_2": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, + "cat_3": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, + "cat_4": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, + "cat_5": {"dtype": None, "cardinality": 50, "min_entry_size": 1, "max_entry_size": 5}, + }, + "labels": {"lab_1": {"dtype": None, "cardinality": 2}}, +} + +distros = { + "cont_1": {"name": "powerlaw", "params": {"alpha": 0.1}}, + "cont_2": {"name": "powerlaw", "params": {"alpha": 0.2}}, + "cat_1": {"name": "powerlaw", "params": {"alpha": 0.1}}, + "cat_2": {"name": "powerlaw", "params": {"alpha": 0.2}}, +} + + +@pytest.mark.parametrize("num_rows", [1000, 10000]) +@pytest.mark.parametrize("distro", [None, distros]) +def test_powerlaw(num_rows, distro): + json_sample["num_rows"] = num_rows + cats = list(json_sample["cats"].keys())[1:] + + cols = datagen._get_cols_from_schema(json_sample, distros=distro) + + df_gen = datagen.DatasetGen(datagen.PowerLawDistro(0.1)) + df_pw = cudf.DataFrame() + for x in range(10): + df_pw_1 = df_gen.create_df(num_rows, cols) + df_pw = cudf.concat([df_pw, df_pw_1], axis=0) + sts, ps = df_gen.verify_df(df_pw[cats]) + assert all(s > 0.9 for s in sts) + + +@pytest.mark.parametrize("num_rows", [1000, 10000]) +@pytest.mark.parametrize("distro", [None, distros]) +def test_uniform(num_rows, distro): + json_sample["num_rows"] = num_rows + cats = list(json_sample["cats"].keys())[1:] + cols = datagen._get_cols_from_schema(json_sample, distros=distro) + + df_gen = datagen.DatasetGen(datagen.UniformDistro()) + df_uni = df_gen.create_df(num_rows, cols) + sts, ps = df_gen.verify_df(df_uni[cats]) + assert all(s > 0.9 for s in sts) + + +@pytest.mark.parametrize("num_rows", [1000, 10000]) +@pytest.mark.parametrize("distro", [None, distros]) +def test_width(num_rows, distro): + json_sample_1 = { + "conts": { + "cont_1": {"dtype": np.float32, "min_val": 0, "max_val": 1, "width": 20}, + } + } + json_sample_1["num_rows"] = num_rows + cols = datagen._get_cols_from_schema(json_sample_1, distros=distro) + + df_gen = datagen.DatasetGen(datagen.UniformDistro()) + df_uni = df_gen.create_df(num_rows, cols) + assert df_uni.shape[1] == 20 + + +@pytest.mark.parametrize("num_rows", [1000, 10000]) +@pytest.mark.parametrize("distro", [None, distros]) +def test_cat_rep(num_rows, distro): + json_sample["num_rows"] = num_rows + cats = list(json_sample["cats"].keys()) + cols = datagen._get_cols_from_schema(json_sample, distros=distro) + + df_gen = datagen.DatasetGen(datagen.UniformDistro()) + df_uni = df_gen.create_df(num_rows, cols, entries=True) + df_cats = df_uni[cats] + assert df_cats.shape[1] == len(cats) + assert df_cats.shape[0] == num_rows + cats_rep = cols["cats"] + for idx, cat in enumerate(cats[1:]): + assert df_uni[cat].nunique() == cats_rep[idx + 1].cardinality + assert df_uni[cat].str.len().min() == cats_rep[idx + 1].min_entry_size + assert df_uni[cat].str.len().max() == cats_rep[idx + 1].max_entry_size + check_ser = cudf.Series(df_uni[cats[0]]._column.elements.values_host) + assert check_ser.nunique() == cats_rep[0].cardinality + assert check_ser.str.len().min() == cats_rep[0].min_entry_size + assert check_ser.str.len().max() == cats_rep[0].max_entry_size + + +def test_json_convert(): + cols = datagen._get_cols_from_schema(json_sample) + assert len(cols["conts"]) == len(json_sample["conts"].keys()) + assert len(cols["cats"]) == len(json_sample["cats"].keys()) + assert len(cols["labels"]) == len(json_sample["labels"].keys()) + + +@pytest.mark.parametrize("num_rows", [1000, 100000]) +@pytest.mark.parametrize("distro", [None, distros]) +def test_full_df(num_rows, tmpdir, distro): + json_sample["num_rows"] = num_rows + cats = list(json_sample["cats"].keys()) + cols = datagen._get_cols_from_schema(json_sample, distros=distro) + + df_gen = datagen.DatasetGen(datagen.UniformDistro(), gpu_frac=0.00001) + df_files = df_gen.full_df_create(num_rows, cols, entries=True, output=tmpdir) + test_size = 0 + full_df = cudf.DataFrame() + for fi in df_files: + df = cudf.read_parquet(fi) + test_size = test_size + df.shape[0] + full_df = cudf.concat([full_df, df]) + assert test_size == num_rows + conts_rep = cols["conts"] + cats_rep = cols["cats"] + labels_rep = cols["labels"] + assert df.shape[1] == len(conts_rep) + len(cats_rep) + len(labels_rep) + for idx, cat in enumerate(cats[1:]): + dist = cats_rep[idx + 1].distro or df_gen.dist + if type(full_df[cat]._column) is not cudf.core.column.string.StringColumn: + sts, ps = dist.verify(full_df[cat].to_pandas()) + assert all(s > 0.9 for s in sts) + assert full_df[cat].nunique() == cats_rep[idx + 1].cardinality + assert full_df[cat].str.len().min() == cats_rep[idx + 1].min_entry_size + assert full_df[cat].str.len().max() == cats_rep[idx + 1].max_entry_size + check_ser = cudf.Series(full_df[cats[0]]._column.elements.values_host) + dist = cats_rep[0].distro or df_gen.dist + if type(full_df[cat]._column) is not cudf.core.column.string.StringColumn: + sts, ps = dist.verify(full_df[cats[0].to_pandas()]) + assert all(s > 0.9 for s in sts) + assert check_ser.nunique() == cats_rep[0].cardinality + assert check_ser.str.len().min() == cats_rep[0].min_entry_size + assert check_ser.str.len().max() == cats_rep[0].max_entry_size + + +@pytest.mark.parametrize("engine", ["parquet"]) +@pytest.mark.parametrize("dist", ["uniform"]) +def test_inspect_datagen(tmpdir, datasets, engine, dist): + # Dataset + paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0]) + + # Dataset columns type config + columns_dict = {} + columns_dict["cats"] = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] + columns_dict["conts"] = ["x", "y"] + columns_dict["labels"] = ["label"] + + # Create inspector and inspect + output_inspect1 = tmpdir + "/dataset_info1.json" + dataset = Dataset(paths, engine=engine) + a = datains.DatasetInspector() + a.inspect(dataset, columns_dict, output_inspect1) + assert os.path.isfile(output_inspect1) + + # Generate dataset using data_gen tool + output_datagen = tmpdir + "/datagen" + os.mkdir(output_datagen) + with fsspec.open(output_inspect1) as f: + output1 = json.load(f) + cols = datagen._get_cols_from_schema(output1) + if dist == "uniform": + df_gen = datagen.DatasetGen(datagen.UniformDistro(), gpu_frac=0.00001) + else: + df_gen = datagen.DatasetGen(datagen.PowerLawDistro(0.1), gpu_frac=0.00001) + + output_datagen_files = df_gen.full_df_create( + output1["num_rows"], cols, entries=True, output=output_datagen + ) + + # Inspect again and check output are the same + output_inspect2 = tmpdir + "/dataset_info2.json" + dataset = Dataset(output_datagen_files, engine=engine) + a.inspect(dataset, columns_dict, output_inspect2) + assert os.path.isfile(output_inspect2) + + # Compare json outputs + with fsspec.open(output_inspect2) as f: + output2 = json.load(f) + for k1 in output1.keys(): + if k1 == "num_rows": + assert output1[k1] == output2[k1] + else: + for k2 in output1[k1].keys(): + for k3 in output1[k1][k2].keys(): + if k3 == "dtype": + if output1[k1][k2][k3] == "object": + assert ( + output1[k1][k2][k3] == output2[k1][k2][k3] + or "int64" == output2[k1][k2][k3] + ) + else: + assert output1[k1][k2][k3] == output2[k1][k2][k3] + else: + assert output1[k1][k2][k3] == pytest.approx( + output2[k1][k2][k3], rel=1e-0, abs=1e-0 + )