diff --git a/nvtabular/ops/bucketize.py b/nvtabular/ops/bucketize.py index 6e5e27e6ab2..11a5dfbcc8e 100644 --- a/nvtabular/ops/bucketize.py +++ b/nvtabular/ops/bucketize.py @@ -45,6 +45,5 @@ def transform(self, columns, gdf: cudf.DataFrame): val = 0 for boundary in b: val += (gdf[col] >= boundary).astype("int") - new_col = f"{col}_{self._id}" - new_gdf[new_col] = val + new_gdf[col] = val return new_gdf diff --git a/nvtabular/ops/hashed_cross.py b/nvtabular/ops/hashed_cross.py index e2839bd183e..f37601d4e50 100644 --- a/nvtabular/ops/hashed_cross.py +++ b/nvtabular/ops/hashed_cross.py @@ -25,16 +25,15 @@ def __init__(self, num_buckets): self.num_buckets = num_buckets @annotate("HashedCross_op", color="darkgreen", domain="nvt_python") - def op_logic(self, columns, gdf: cudf.DataFrame): + def transform(self, columns, gdf: cudf.DataFrame): new_gdf = cudf.DataFrame() - for cross in columns: - val = 0 - for column in cross: - val ^= gdf[column].hash_values() # or however we want to do this aggregation - # TODO: support different size buckets per cross - val = val % self.bucket_size - new_gdf["_X_".join(cross)] = val + val = 0 + for column in columns: + val ^= gdf[column].hash_values() # or however we want to do this aggregation + # TODO: support different size buckets per cross + val = val % self.num_buckets + new_gdf["_X_".join(columns)] = val return new_gdf def output_column_names(self, columns): - return ["_X_".join(cross) for cross in columns] + return ["_X_".join(columns)] diff --git a/nvtabular/ops/join_external.py b/nvtabular/ops/join_external.py index 2c0631a09e4..773ef78ec2c 100644 --- a/nvtabular/ops/join_external.py +++ b/nvtabular/ops/join_external.py @@ -88,7 +88,7 @@ def __init__( cache="host", **kwargs, ): - super().__init__(replace=False) + super(JoinExternal).__init__() self.on = on self.df_ext = df_ext self.on_ext = on_ext or self.on @@ -155,9 +155,9 @@ def transform( return new_gdf def output_column_names(self, columns): - if self.ext_columns: - return columns + self.ext_columns - return columns + self._ext.columns + if self.columns_ext: + return list(set(columns + self.columns_ext)) + return list(set(columns + list(self._ext.columns))) def _detect_format(data): diff --git a/tests/unit/test_ops.py b/tests/unit/test_ops.py index 449e6d77d5e..20214db89c1 100644 --- a/tests/unit/test_ops.py +++ b/tests/unit/test_ops.py @@ -14,7 +14,6 @@ # limitations under the License. # import math -import os import string import cudf @@ -27,158 +26,30 @@ import nvtabular as nvt import nvtabular.io +from nvtabular import ColumnGroup from nvtabular import ops as ops -from tests.conftest import get_cats, mycols_csv, mycols_pq +from tests.conftest import mycols_csv, mycols_pq @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) # TODO: dask workflow doesn't support min/max on string columns, so won't work # with op_columns=None -@pytest.mark.parametrize("op_columns", [["x"]]) -def test_minmax(tmpdir, client, df, dataset, gpu_memory_frac, engine, op_columns): - cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - cont_names = ["x", "y"] - label_name = ["label"] - - config = nvtabular.workflow.get_new_config() - config["PP"]["all"] = [ops.MinMax(columns=op_columns)] - - processor = nvtabular.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - processor.update_stats(dataset) - x_min = df["x"].min() - - assert x_min == pytest.approx(processor.stats["mins"]["x"], 1e-2) - x_max = df["x"].max() - assert x_max == pytest.approx(processor.stats["maxs"]["x"], 1e-2) - if not op_columns: - name_min = min(df["name-string"].tolist()) - name_max = max(df["name-string"].tolist()) - assert name_min == processor.stats["mins"]["name-string"] - y_max = df["y"].max() - y_min = df["y"].min() - assert y_max == processor.stats["maxs"]["y"] - assert name_max == processor.stats["maxs"]["name-string"] - assert y_min == processor.stats["mins"]["y"] - - -@pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) -@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("op_columns", [["x"], None]) -def test_moments(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): - cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - cont_names = ["x", "y", "id"] - label_name = ["label"] - - config = nvt.workflow.get_new_config() - config["PP"]["continuous"] = [ops.Moments(columns=op_columns)] - - processor = nvtabular.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - - processor.update_stats(dataset) - - assert df.x.count() == processor.stats["counts"]["x"] - assert df.x.count() == 4321 - - # Check mean and std - assert math.isclose(df.x.mean(), processor.stats["means"]["x"], rel_tol=1e-4) - assert math.isclose(df.x.std(), processor.stats["stds"]["x"], rel_tol=1e-3) - if not op_columns: - assert math.isclose(df.y.mean(), processor.stats["means"]["y"], rel_tol=1e-4) - assert math.isclose(df.id.mean(), processor.stats["means"]["id"], rel_tol=1e-4) - - assert math.isclose(df.y.std(), processor.stats["stds"]["y"], rel_tol=1e-3) - assert math.isclose(df.id.std(), processor.stats["stds"]["id"], rel_tol=1e-3) - - -@pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) -@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("op_columns", [["name-string"], None]) -def test_encoder(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): - cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - cont_names = ["x", "y", "id"] - label_name = ["label"] - - encoder = ops.GroupbyStatistics(columns=op_columns) - config = nvt.workflow.get_new_config() - config["PP"]["categorical"] = [encoder] - - processor = nvt.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - processor.update_stats(dataset) - - if engine == "parquet" and not op_columns: - cats_expected0 = df["name-cat"].unique().values_host - cats0 = get_cats(processor, "name-cat") - assert cats0.tolist() == [None] + cats_expected0.tolist() - - cats_expected1 = df["name-string"].unique().values_host - cats1 = get_cats(processor, "name-string") - assert cats1.tolist() == [None] + cats_expected1.tolist() - - -@pytest.mark.parametrize("engine", ["parquet"]) -@pytest.mark.parametrize("groups", [[["name-cat", "name-string"], "name-cat"], "name-string"]) -@pytest.mark.parametrize("concat_groups", [True, False]) -def test_multicolumn_cats(tmpdir, df, dataset, engine, groups, concat_groups): - cat_names = ["name-cat", "name-string"] - cont_names = ["x", "y", "id"] - label_name = ["label"] - - encoder = ops.GroupbyStatistics( - columns=groups, - cont_names=None if concat_groups else ["x"], - stats=None if concat_groups else ["count", "mean"], - out_path=str(tmpdir), - concat_groups=concat_groups, - ) - config = nvt.workflow.get_new_config() - config["PP"]["categorical"] = [encoder] - - processor = nvt.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - processor.update_stats(dataset) - - groups = [groups] if isinstance(groups, str) else groups - for group in groups: - group = [group] if isinstance(group, str) else group - prefix = "unique." if concat_groups else "cat_stats." - fn = prefix + "_".join(group) + ".parquet" - cudf.read_parquet(os.path.join(tmpdir, "categories", fn)) - - -@pytest.mark.parametrize("engine", ["parquet"]) -@pytest.mark.parametrize("groups", [[["name-cat", "name-string"]], "name-string"]) -@pytest.mark.parametrize("kfold", [3]) -def test_groupby_folds(tmpdir, df, dataset, engine, groups, kfold): - cat_names = ["name-cat", "name-string"] - cont_names = ["x", "y", "id"] - label_name = ["label"] - - gb_stats = ops.GroupbyStatistics( - columns=None, - out_path=str(tmpdir), - kfold=kfold, - fold_groups=groups, - stats=["count", "sum"], - cont_names=["y"], - ) - config = nvt.workflow.get_new_config() - config["PP"]["categorical"] = [gb_stats] - - processor = nvt.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - processor.update_stats(dataset) - for group, path in processor.stats["categories"].items(): - df = cudf.read_parquet(path) - assert "__fold__" in df.columns +@pytest.mark.parametrize("op_columns", [["x"], ["x", "y"]]) +def test_normalize_minmax(tmpdir, client, df, dataset, gpu_memory_frac, engine, op_columns): + cont_features = op_columns >> ops.NormalizeMinMax() + processor = nvtabular.Workflow(cont_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + for col in op_columns: + col_min = df[col].min() + assert col_min == pytest.approx(processor.column_group.op.mins[col], 1e-2) + col_max = df[col].max() + assert col_max == pytest.approx(processor.column_group.op.maxs[col], 1e-2) + df[col] = (df[col] - processor.column_group.op.mins[col]) / ( + processor.column_group.op.maxs[col] - processor.column_group.op.mins[col] + ) + assert np.all((df[col] - new_gdf[col]).abs().values <= 1e-2) @pytest.mark.parametrize("cat_groups", ["Author", [["Author", "Engaging-User"]]]) @@ -260,45 +131,27 @@ def test_target_encode_multi(tmpdir, npartitions): @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("op_columns", [["x"], None]) -def test_median(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): - cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - cont_names = ["x", "y", "id"] - label_name = ["label"] - - config = nvt.workflow.get_new_config() - config["PP"]["continuous"] = [ops.Median(columns=op_columns)] - - processor = nvt.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - - processor.update_stats(dataset) - - # Check median (TODO: Improve the accuracy) - x_median = df.x.dropna().quantile(0.5, interpolation="linear") - assert math.isclose(x_median, processor.stats["medians"]["x"], rel_tol=1e1) - if not op_columns: - y_median = df.y.dropna().quantile(0.5, interpolation="linear") - id_median = df.id.dropna().quantile(0.5, interpolation="linear") - assert math.isclose(y_median, processor.stats["medians"]["y"], rel_tol=1e1) - assert math.isclose(id_median, processor.stats["medians"]["id"], rel_tol=1e1) +@pytest.mark.parametrize("op_columns", [["x"], ["x", "y"]]) +def test_fill_median(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): + cont_features = op_columns >> nvt.ops.FillMedian() + processor = nvt.Workflow(cont_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + for col in op_columns: + col_median = df[col].dropna().quantile(0.5, interpolation="linear") + assert math.isclose(col_median, processor.column_group.op.medians[col], rel_tol=1e1) + assert np.all((df[col].fillna(col_median) - new_gdf[col]).abs().values <= 1e-2) @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("op_columns", [["x"], None]) +@pytest.mark.parametrize("op_columns", [["x"], ["x", "y"]]) def test_log(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): - cont_names = ["x", "y", "id"] - log_op = ops.LogOp(columns=op_columns) - - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = cont_names - - for gdf in dataset.to_iter(): - new_gdf = log_op.apply_op(gdf, columns_ctx, "continuous") - assert new_gdf[cont_names] == np.log(gdf[cont_names].astype(np.float32)) + cont_features = op_columns >> nvt.ops.LogOp() + processor = nvt.Workflow(cont_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + assert new_gdf[op_columns] == np.log(df[op_columns].astype(np.float32)) @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @@ -311,23 +164,18 @@ def test_hash_bucket(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): num_buckets = 10 else: num_buckets = {column: 10 for column in op_columns} - hash_bucket_op = ops.HashBucket(num_buckets) - columns_ctx = {} - columns_ctx["categorical"] = {} - columns_ctx["categorical"]["base"] = cat_names + hash_features = cat_names >> ops.HashBucket(num_buckets) + processor = nvt.Workflow(hash_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() # check sums for determinancy - checksums = [] - for gdf in dataset.to_iter(): - new_gdf = hash_bucket_op.apply_op(gdf, columns_ctx, "categorical") - assert np.all(new_gdf[cat_names].values >= 0) - assert np.all(new_gdf[cat_names].values <= 9) - checksums.append(new_gdf[cat_names].sum().values) - - for checksum, gdf in zip(checksums, dataset.to_iter()): - new_gdf = hash_bucket_op.apply_op(gdf, columns_ctx, "categorical") - assert np.all(new_gdf[cat_names].sum().values == checksum) + assert np.all(new_gdf[cat_names].values >= 0) + assert np.all(new_gdf[cat_names].values <= 9) + checksum = new_gdf[cat_names].sum().values + new_gdf = processor.transform(dataset).to_ddf().compute() + np.all(new_gdf[cat_names].sum().values == checksum) def test_hash_bucket_lists(tmpdir): @@ -339,193 +187,120 @@ def test_hash_bucket_lists(tmpdir): } ) cat_names = ["Authors"] # , "Engaging User"] - cont_names = [] - label_name = ["Post"] - processor = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name) - processor.add_preprocess(ops.HashBucket(num_buckets=10)) - processor.finalize() - processor.apply(nvt.Dataset(df), output_format=None) - df_out = processor.get_ddf().compute(scheduler="synchronous") + dataset = nvt.Dataset(df) + hash_features = cat_names >> ops.HashBucket(num_buckets=10) + processor = nvt.Workflow(hash_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() # check to make sure that the same strings are hashed the same - authors = df_out["Authors"].to_arrow().to_pylist() + authors = new_gdf["Authors"].to_arrow().to_pylist() assert authors[0][0] == authors[1][0] # 'User_A' assert authors[2][1] == authors[3][0] # 'User_C' - # make sure we get the embedding sizes - assert nvt.ops.get_embedding_sizes(processor)["Authors"][0] == 10 + # ToDo: make sure we get the embedding sizes + # assert nvt.ops.get_embedding_sizes(processor)["Authors"][0] == 10 @pytest.mark.parametrize("engine", ["parquet"]) def test_fill_missing(tmpdir, df, dataset, engine): - op = nvt.ops.FillMissing(42) - cont_names = ["x", "y"] - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = cont_names + cont_features = cont_names >> nvt.ops.FillMissing(fill_val=42) + for col in cont_names: idx = np.random.choice(df.shape[0] - 1, int(df.shape[0] * 0.2)) df[col].iloc[idx] = None - transformed = cudf.concat([op.apply_op(df, columns_ctx, "continuous")]) - assert_eq(transformed[cont_names], df[cont_names].fillna(42)) + df = df.reset_index() + dataset = nvt.Dataset(df) + processor = nvt.Workflow(cont_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + for col in cont_names: + assert np.all((df[col].fillna(42) - new_gdf[col]).abs().values <= 1e-2) + assert new_gdf[col].isna().sum() == 0 @pytest.mark.parametrize("engine", ["parquet"]) def test_dropna(tmpdir, df, dataset, engine): - dropna = ops.Dropna() columns = mycols_pq if engine == "parquet" else mycols_csv + dropna_features = columns >> ops.Dropna() - columns_ctx = {} - columns_ctx["all"] = {} - columns_ctx["all"]["base"] = columns - - for gdf in dataset.to_iter(): - new_gdf = dropna.apply_op(gdf, columns_ctx, "all") - assert new_gdf.columns.all() == gdf.columns.all() - assert new_gdf.isnull().all().sum() < 1, "null values exist" + processor = nvt.Workflow(dropna_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + assert new_gdf.columns.all() == df.columns.all() + assert new_gdf.isnull().all().sum() < 1, "null values exist" @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("op_columns", [["x"], None]) +@pytest.mark.parametrize("op_columns", [["x"], ["x", "y"]]) def test_normalize(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): - cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - cont_names = ["x", "y"] - label_name = ["label"] - - config = nvt.workflow.get_new_config() - config["PP"]["continuous"] = [ops.Moments(columns=op_columns)] - - processor = nvtabular.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - - processor.update_stats(dataset) - - op = ops.Normalize() - - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = op_columns or cont_names - - new_gdf = op.apply_op(df, columns_ctx, "continuous", stats_context=processor.stats) - df["x"] = (df["x"] - processor.stats["means"]["x"]) / processor.stats["stds"]["x"] - assert new_gdf["x"].equals(df["x"]) + cont_features = op_columns >> ops.Normalize() + processor = nvtabular.Workflow(cont_features) + processor.fit(dataset) + + new_gdf = processor.transform(dataset).to_ddf().compute() + for col in op_columns: + assert math.isclose(df[col].mean(), processor.column_group.op.means[col], rel_tol=1e-4) + assert math.isclose(df[col].std(), processor.column_group.op.stds[col], rel_tol=1e-4) + df[col] = (df[col] - processor.column_group.op.means[col]) / processor.column_group.op.stds[ + col + ] + assert np.all((df[col] - new_gdf[col]).abs().values <= 1e-2) @pytest.mark.parametrize("gpu_memory_frac", [0.1]) @pytest.mark.parametrize("engine", ["parquet"]) -@pytest.mark.parametrize("op_columns", [["x"], None]) +@pytest.mark.parametrize("op_columns", [["x"]]) def test_normalize_upcastfloat64(tmpdir, dataset, gpu_memory_frac, engine, op_columns): df = cudf.DataFrame( {"x": [1.9e10, 2.3e16, 3.4e18, 1.6e19], "label": [1, 0, 1, 0]}, dtype="float32" ) - cat_names = [] - cont_names = ["x"] - label_name = ["label"] - - config = nvt.workflow.get_new_config() - config["PP"]["continuous"] = [ops.Moments(columns=op_columns)] - - processor = nvtabular.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - - processor.update_stats(dataset) - - op = ops.Normalize() - - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = op_columns or cont_names - - new_gdf = op.apply_op(df, columns_ctx, "continuous", stats_context=processor.stats) - df["x"] = (df["x"] - processor.stats["means"]["x"]) / processor.stats["stds"]["x"] - assert new_gdf["x"].equals(df["x"]) - - -@pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) -@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("op_columns", [["x"], None]) -def test_normalize_minmax(tmpdir, df, dataset, gpu_memory_frac, engine, op_columns): - cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] - cont_names = ["x", "y"] - label_name = ["label"] - - config = nvt.workflow.get_new_config() - config["PP"]["continuous"] = [ops.MinMax()] - - processor = nvtabular.Workflow( - cat_names=cat_names, cont_names=cont_names, label_name=label_name, config=config - ) - - processor.update_stats(dataset) + cont_features = op_columns >> ops.Normalize() + processor = nvtabular.Workflow(cont_features) + dataset = nvt.Dataset(df) + processor.fit(dataset) - op = ops.NormalizeMinMax() + new_gdf = processor.transform(dataset).to_ddf().compute() - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = cont_names - - new_gdf = op.apply_op(df, columns_ctx, "continuous", stats_context=processor.stats) - df["x"] = (df["x"] - processor.stats["mins"]["x"]) / ( - processor.stats["maxs"]["x"] - processor.stats["mins"]["x"] - ) - assert new_gdf["x"].equals(df["x"]) + for col in op_columns: + assert math.isclose(df[col].mean(), processor.column_group.op.means[col], rel_tol=1e-4) + assert math.isclose(df[col].std(), processor.column_group.op.stds[col], rel_tol=1e-4) + df[col] = (df[col] - processor.column_group.op.means[col]) / processor.column_group.op.stds[ + col + ] + assert np.all((df[col] - new_gdf[col]).abs().values <= 1e-2) @pytest.mark.parametrize("gpu_memory_frac", [0.1]) @pytest.mark.parametrize("engine", ["parquet"]) def test_lambdaop(tmpdir, df, dataset, gpu_memory_frac, engine, client): - cat_names = ["name-cat", "name-string"] - cont_names = ["x", "y"] - label_name = ["label"] - columns = mycols_pq if engine == "parquet" else mycols_csv - df_copy = df.copy() - config = nvt.workflow.get_new_config() - - processor = nvtabular.Workflow( - cat_names=cat_names, - cont_names=cont_names, - label_name=label_name, - config=config, - client=client, - ) - - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = cont_names - columns_ctx["all"] = {} - columns_ctx["all"]["base"] = columns - # Substring # Replacement - op = ops.LambdaOp( - op_name="slice", - f=lambda col: col.str.slice(1, 3), - columns=["name-cat", "name-string"], - replace=True, - ) + substring = ColumnGroup(["name-cat", "name-string"]) >> (lambda col: col.str.slice(1, 3)) + processor = nvtabular.Workflow(substring) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() - new_gdf = op.apply_op(df, columns_ctx, "all", stats_context=None) assert new_gdf["name-cat"].equals(df_copy["name-cat"].str.slice(1, 3)) assert new_gdf["name-string"].equals(df_copy["name-string"].str.slice(1, 3)) - # No Replacement - df = df_copy.copy() - op = ops.LambdaOp( - op_name="slice", - f=lambda col: col.str.slice(1, 3), - columns=["name-cat", "name-string"], - replace=False, + # No Replacement from old API (skipped for other examples) + substring = ( + ColumnGroup(["name-cat", "name-string"]) + >> (lambda col: col.str.slice(1, 3)) + >> ops.Rename(postfix="_slice") ) - new_gdf = op.apply_op(df, columns_ctx, "all", stats_context=None) + processor = nvtabular.Workflow(["name-cat", "name-string"] + substring) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + assert new_gdf["name-cat_slice"].equals(df_copy["name-cat"].str.slice(1, 3)) assert new_gdf["name-string_slice"].equals(df_copy["name-string"].str.slice(1, 3)) assert new_gdf["name-cat"].equals(df_copy["name-cat"]) @@ -533,166 +308,44 @@ def test_lambdaop(tmpdir, df, dataset, gpu_memory_frac, engine, client): # Replace # Replacement - df = df_copy.copy() - op = ops.LambdaOp( - op_name="replace", - f=lambda col: col.str.replace("e", "XX"), - columns=["name-cat", "name-string"], - replace=True, - ) + oplambda = ColumnGroup(["name-cat", "name-string"]) >> (lambda col: col.str.replace("e", "XX")) + processor = nvtabular.Workflow(oplambda) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() - new_gdf = op.apply_op(df, columns_ctx, "all", stats_context=None) assert new_gdf["name-cat"].equals(df_copy["name-cat"].str.replace("e", "XX")) assert new_gdf["name-string"].equals(df_copy["name-string"].str.replace("e", "XX")) - # No Replacement - df = df_copy.copy() - op = ops.LambdaOp( - op_name="replace", - f=lambda col: col.str.replace("e", "XX"), - columns=["name-cat", "name-string"], - replace=False, - ) - new_gdf = op.apply_op(df, columns_ctx, "all", stats_context=None) - assert new_gdf["name-cat_replace"].equals(df_copy["name-cat"].str.replace("e", "XX")) - assert new_gdf["name-string_replace"].equals(df_copy["name-string"].str.replace("e", "XX")) - assert new_gdf["name-cat"].equals(df_copy["name-cat"]) - assert new_gdf["name-string"].equals(df_copy["name-string"]) - # astype # Replacement - df = df_copy.copy() - op = ops.LambdaOp( - op_name="astype", f=lambda col: col.astype(float), columns=["id"], replace=True - ) - new_gdf = op.apply_op(df, columns_ctx, "all", stats_context=None) + oplambda = ColumnGroup(["id"]) >> (lambda col: col.astype(float)) + processor = nvtabular.Workflow(oplambda) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + assert new_gdf["id"].dtype == "float64" # Workflow # Replacement - import glob - - processor = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name) - - processor.add_preprocess( - [ - ops.LambdaOp( - op_name="slice", - f=lambda col: col.astype(str).str.slice(0, 1), - columns=["name-cat"], - replace=True, - ), - ops.Categorify(), - ] - ) - processor.finalize() - processor.update_stats(dataset) - outdir = tmpdir.mkdir("out1") - processor.write_to_dataset( - outdir, dataset, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION, apply_ops=True + oplambda = ( + ColumnGroup(["name-cat"]) + >> (lambda col: col.astype(str).str.slice(0, 1)) + >> ops.Categorify() ) + processor = nvtabular.Workflow(oplambda) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + assert is_integer_dtype(new_gdf["name-cat"].dtype) - dataset_2 = nvtabular.io.Dataset( - glob.glob(str(outdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac + oplambda = ( + ColumnGroup(["name-cat", "name-string"]) >> ops.Categorify() >> (lambda col: col + 100) ) - df_pp = cudf.concat(list(dataset_2.to_iter()), axis=0) - assert is_integer_dtype(df_pp["name-cat"].dtype) + processor = nvtabular.Workflow(oplambda) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() - processor = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name) - - processor.add_preprocess( - [ - ops.Categorify(), - ops.LambdaOp(op_name="add100", f=lambda col: col + 100, replace=True), - ] - ) - processor.finalize() - processor.update_stats(dataset) - outdir = tmpdir.mkdir("out2") - processor.write_to_dataset( - outdir, dataset, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION, apply_ops=True - ) - - dataset_2 = nvtabular.io.Dataset( - glob.glob(str(outdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac - ) - df_pp = cudf.concat(list(dataset_2.to_iter()), axis=0) - assert is_integer_dtype(df_pp["name-cat"].dtype) - assert np.sum(df_pp["name-cat"] < 100) == 0 - - # Workflow - # No Replacement - processor = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name) - - processor.add_preprocess( - [ - ops.LambdaOp( - op_name="slice", - f=lambda col: col.astype(str).str.slice(0, 1), - columns=["name-cat"], - replace=False, - ), - ops.Categorify(), - ] - ) - processor.finalize() - processor.update_stats(dataset) - outdir = tmpdir.mkdir("out3") - processor.write_to_dataset( - outdir, dataset, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION, apply_ops=True - ) - dataset_2 = nvtabular.io.Dataset( - glob.glob(str(outdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac - ) - df_pp = cudf.concat(list(dataset_2.to_iter()), axis=0) - - assert df_pp["name-cat"].dtype == "O" - print(df_pp) - assert is_integer_dtype(df_pp["name-cat_slice"].dtype) - assert np.sum(df_pp["name-cat_slice"] == 0) == 0 - - processor = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name) - - processor.add_preprocess( - [ - ops.Categorify(), - ops.LambdaOp(op_name="add100", f=lambda col: col + 100, replace=False), - ] - ) - processor.finalize() - processor.update_stats(dataset) - outdir = tmpdir.mkdir("out4") - processor.write_to_dataset( - outdir, dataset, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION, apply_ops=True - ) - - dataset_2 = nvtabular.io.Dataset( - glob.glob(str(outdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac - ) - df_pp = cudf.concat(list(dataset_2.to_iter()), axis=0) - assert is_integer_dtype(df_pp["name-cat_add100"].dtype) - assert np.sum(df_pp["name-cat_add100"] < 100) == 0 - - processor = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name) - - processor.add_preprocess( - [ - ops.LambdaOp(op_name="mul0", f=lambda col: col * 0, columns=["x"], replace=False), - ops.LambdaOp(op_name="add100", f=lambda col: col + 100, replace=False), - ] - ) - processor.finalize() - processor.update_stats(dataset) - outdir = tmpdir.mkdir("out5") - processor.write_to_dataset( - outdir, dataset, out_files_per_proc=10, shuffle=nvt.io.Shuffle.PER_PARTITION, apply_ops=True - ) - - dataset_2 = nvtabular.io.Dataset( - glob.glob(str(outdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac - ) - df_pp = cudf.concat(list(dataset_2.to_iter()), axis=0) - assert np.sum(df_pp["x_mul0_add100"] < 100) == 0 + assert is_integer_dtype(new_gdf["name-cat"].dtype) + assert np.sum(new_gdf["name-cat"] < 100) == 0 @pytest.mark.parametrize("freq_threshold", [0, 1, 2]) @@ -910,11 +563,12 @@ def test_join_external(tmpdir, df, dataset, engine, kind_ext, cache, how, drop_d # Define Op on = "id" + columns_left = list(df.columns) columns_ext = ["id", "new_col", "new_col_2"] df_ext_check = df_ext_check[columns_ext] if drop_duplicates: df_ext_check.drop_duplicates(ignore_index=True, inplace=True) - merge_op = ops.JoinExternal( + joined = nvt.ColumnGroup(columns_left) >> nvt.ops.JoinExternal( df_ext, on, how=how, @@ -922,61 +576,58 @@ def test_join_external(tmpdir, df, dataset, engine, kind_ext, cache, how, drop_d cache=cache, drop_duplicates_ext=drop_duplicates, ) - columns = mycols_pq if engine == "parquet" else mycols_csv - columns_ctx = {} - columns_ctx["all"] = {} - columns_ctx["all"]["base"] = columns - # Iterate, apply op, and check result - for gdf in dataset.to_iter(): - new_gdf = merge_op.apply_op(gdf, columns_ctx, "all") - check_gdf = gdf.merge(df_ext_check, how=how, on=on) - assert len(check_gdf) == len(new_gdf) - assert (new_gdf["id"] + shift).all() == new_gdf["new_col"].all() - assert gdf["id"].all() == new_gdf["id"].all() - assert "new_col_2" in new_gdf.columns - assert "new_col_3" not in new_gdf.columns + gdf = df.reset_index() + dataset = nvt.Dataset(gdf) + processor = nvt.Workflow(joined) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute().reset_index() + + check_gdf = gdf.merge(df_ext_check, how=how, on=on) + assert len(check_gdf) == len(new_gdf) + assert (new_gdf["id"] + shift).all() == new_gdf["new_col"].all() + assert gdf["id"].all() == new_gdf["id"].all() + assert "new_col_2" in new_gdf.columns + assert "new_col_3" not in new_gdf.columns @pytest.mark.parametrize("gpu_memory_frac", [0.1]) @pytest.mark.parametrize("engine", ["parquet"]) def test_filter(tmpdir, df, dataset, gpu_memory_frac, engine, client): - cont_names = ["x", "y"] - - columns = mycols_pq if engine == "parquet" else mycols_csv - columns_ctx = {} - columns_ctx["all"] = {} - columns_ctx["all"]["base"] = columns - - filter_op = ops.Filter(f=lambda df: df[df["y"] > 0.5]) - new_gdf = filter_op.apply_op(df, columns_ctx, "all", target_cols=columns) - assert new_gdf.columns.all() == df.columns.all() + filtered = cont_names >> ops.Filter(f=lambda df: df[df["y"] > 0.5]) + processor = nvtabular.Workflow(filtered) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute().reset_index() + filter_df = df[df["y"] > 0.5].reset_index() + for col in cont_names: + assert np.all((new_gdf[col] - filter_df[col]).abs().values <= 1e-2) # return isnull() rows - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = cont_names - for col in cont_names: idx = np.random.choice(df.shape[0] - 1, int(df.shape[0] * 0.2)) df[col].iloc[idx] = None - filter_op = ops.Filter(f=lambda df: df[df.x.isnull()]) - new_gdf = filter_op.apply_op(df, columns_ctx, "all", target_cols=columns) - assert new_gdf.columns.all() == df.columns.all() + dataset = nvt.Dataset(df) + filtered = cont_names >> ops.Filter(f=lambda df: df[df.x.isnull()]) + processor = nvtabular.Workflow(filtered) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() assert new_gdf.shape[0] < df.shape[0], "null values do not exist" # again testing filtering by returning a series rather than a df - filter_op = ops.Filter(f=lambda df: df.x.isnull()) - new_gdf = filter_op.apply_op(df, columns_ctx, "all", target_cols=columns) - assert new_gdf.columns.all() == df.columns.all() + filtered = cont_names >> ops.Filter(f=lambda df: df.x.isnull()) + processor = nvtabular.Workflow(filtered) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() assert new_gdf.shape[0] < df.shape[0], "null values do not exist" # if the filter returns an invalid type we should get an exception immediately # (rather than causing problems downstream in the workflow) - filter_op = ops.Filter(f=lambda df: "some invalid value") + filtered = cont_names >> ops.Filter(f=lambda df: "some invalid value") + processor = nvtabular.Workflow(filtered) with pytest.raises(ValueError): - filter_op.apply_op(df, columns_ctx, "all", target_cols=columns) + new_gdf = processor.transform(dataset).to_ddf().compute() def test_difference_lag(): @@ -984,78 +635,61 @@ def test_difference_lag(): {"userid": [0, 0, 0, 1, 1, 2], "timestamp": [1000, 1005, 1100, 2000, 2001, 3000]} ) - columns = ["userid", "timestamp"] - columns_ctx = {} - columns_ctx["all"] = {} - columns_ctx["all"]["base"] = columns - - op = ops.DifferenceLag("userid", shift=[1, -1], columns=["timestamp"]) - new_gdf = op.apply_op(df, columns_ctx, "all", target_cols=["timestamp"]) + diff_features = ["timestamp"] >> ops.DifferenceLag(partition_cols=["userid"], shift=[1, -1]) + dataset = nvt.Dataset(df) + processor = nvtabular.Workflow(diff_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() - assert new_gdf["timestamp_DifferenceLag_1"][0] is None - assert new_gdf["timestamp_DifferenceLag_1"][1] == 5 - assert new_gdf["timestamp_DifferenceLag_1"][2] == 95 - assert new_gdf["timestamp_DifferenceLag_1"][3] is None + assert new_gdf["timestamp_difference_lag_1"][0] is None + assert new_gdf["timestamp_difference_lag_1"][1] == 5 + assert new_gdf["timestamp_difference_lag_1"][2] == 95 + assert new_gdf["timestamp_difference_lag_1"][3] is None - assert new_gdf["timestamp_DifferenceLag_-1"][0] == -5 - assert new_gdf["timestamp_DifferenceLag_-1"][1] == -95 - assert new_gdf["timestamp_DifferenceLag_-1"][2] is None - assert new_gdf["timestamp_DifferenceLag_-1"][3] == -1 - assert new_gdf["timestamp_DifferenceLag_-1"][5] is None + assert new_gdf["timestamp_difference_lag_-1"][0] == -5 + assert new_gdf["timestamp_difference_lag_-1"][1] == -95 + assert new_gdf["timestamp_difference_lag_-1"][2] is None + assert new_gdf["timestamp_difference_lag_-1"][3] == -1 + assert new_gdf["timestamp_difference_lag_-1"][5] is None @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("use_dict", [True, False]) -def test_hashed_cross(tmpdir, df, dataset, gpu_memory_frac, engine, use_dict): +def test_hashed_cross(tmpdir, df, dataset, gpu_memory_frac, engine): # TODO: add tests for > 2 features, multiple crosses, etc. - cat_names = ("name-string", "id") + cat_names = ["name-string", "id"] num_buckets = 10 - if use_dict: - hashed_cross_op = ops.HashedCross({cat_names: num_buckets}) - else: - hashed_cross_op = ops.HashedCross([cat_names], [num_buckets]) - - columns_ctx = {} - columns_ctx["categorical"] = {} - columns_ctx["categorical"]["base"] = list(cat_names) + hashed_cross = cat_names >> ops.HashedCross(num_buckets) + dataset = nvt.Dataset(df) + processor = nvtabular.Workflow(hashed_cross) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() # check sums for determinancy - checksums = [] - for gdf in dataset.to_iter(): - new_gdf = hashed_cross_op.apply_op(gdf, columns_ctx, "categorical") - new_column_name = "_X_".join(cat_names) - assert np.all(new_gdf[new_column_name].values >= 0) - assert np.all(new_gdf[new_column_name].values <= 9) - checksums.append(new_gdf[new_column_name].sum()) - - for checksum, gdf in zip(checksums, dataset.to_iter()): - new_gdf = hashed_cross_op.apply_op(gdf, columns_ctx, "categorical") - assert new_gdf[new_column_name].sum() == checksum + new_column_name = "_X_".join(cat_names) + assert np.all(new_gdf[new_column_name].values >= 0) + assert np.all(new_gdf[new_column_name].values <= 9) + checksum = new_gdf[new_column_name].sum() + new_gdf = processor.transform(dataset).to_ddf().compute() + assert new_gdf[new_column_name].sum() == checksum @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) -@pytest.mark.parametrize("use_dict", [True, False]) -def test_bucketized(tmpdir, df, dataset, gpu_memory_frac, engine, use_dict): +def test_bucketized(tmpdir, df, dataset, gpu_memory_frac, engine): cont_names = ["x", "y"] boundaries = [[-1, 0, 1], [-4, 100]] - if use_dict: - bucketize_op = ops.Bucketize( - {name: boundary for name, boundary in zip(cont_names, boundaries)} - ) - else: - bucketize_op = ops.Bucketize(boundaries, cont_names) - - columns_ctx = {} - columns_ctx["continuous"] = {} - columns_ctx["continuous"]["base"] = list(cont_names) - for gdf in dataset.to_iter(): - new_gdf = bucketize_op.apply_op(gdf, columns_ctx, "continuous") - for col, bs in zip(cont_names, boundaries): - assert np.all(new_gdf[col].values >= 0) - assert np.all(new_gdf[col].values <= len(bs)) - # TODO: add checks for correctness here that don't just - # repeat the existing logic + bucketize_op = ops.Bucketize({name: boundary for name, boundary in zip(cont_names, boundaries)}) + + bucket_features = cont_names >> bucketize_op + processor = nvtabular.Workflow(bucket_features) + processor.fit(dataset) + new_gdf = processor.transform(dataset).to_ddf().compute() + + for col, bs in zip(cont_names, boundaries): + assert np.all(new_gdf[col].values >= 0) + assert np.all(new_gdf[col].values <= len(bs)) + # TODO: add checks for correctness here that don't just + # repeat the existing logic