From 980cc877d99c63297e41536e643d71731a4d36b2 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Jul 2024 10:35:20 -0700 Subject: [PATCH 1/7] start adding dask-expr support Signed-off-by: rjzamora --- examples/slurm/start-slurm.sh | 1 - nemo_curator/modules/exact_dedup.py | 2 +- .../utils/fuzzy_dedup_utils/merge_utils.py | 13 +++++++-- .../utils/fuzzy_dedup_utils/shuffle_utils.py | 29 +++++++++++++++++-- tests/__init__.py | 20 ------------- 5 files changed, 39 insertions(+), 26 deletions(-) diff --git a/examples/slurm/start-slurm.sh b/examples/slurm/start-slurm.sh index ab407465..02c211f6 100644 --- a/examples/slurm/start-slurm.sh +++ b/examples/slurm/start-slurm.sh @@ -67,7 +67,6 @@ export CUDF_SPILL="1" export RMM_SCHEDULER_POOL_SIZE="1GB" export RMM_WORKER_POOL_SIZE="72GiB" export LIBCUDF_CUFILE_POLICY=OFF -export DASK_DATAFRAME__QUERY_PLANNING=False # ================================================================= diff --git a/nemo_curator/modules/exact_dedup.py b/nemo_curator/modules/exact_dedup.py index 2831f516..4e756c56 100644 --- a/nemo_curator/modules/exact_dedup.py +++ b/nemo_curator/modules/exact_dedup.py @@ -88,7 +88,7 @@ def _exact_dup_ids(self, df: dd.DataFrame): Get the id's for text/documents that are exact duplicates Parameters ---------- - df: dask.dataframe.core.DataFrame + df: dask.dataframe.DataFrame A dataframe with the following requirements: * A column where each row is the text from one document * A unique ID column for each document diff --git a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py index 70bf7300..ef347bf8 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py @@ -13,14 +13,13 @@ # limitations under the License. import math +import sys from operator import getitem import numpy as np import pandas as pd from dask.base import tokenize -from dask.dataframe.core import new_dd_object from dask.dataframe.shuffle import partitioning_index -from dask.highlevelgraph import HighLevelGraph from dask.utils import M from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE @@ -36,6 +35,16 @@ def _split_part(part, nsplits): def text_bytes_aware_merge(text_df, right_df, broadcast=True, *, on): + + if "dask_expr" in sys.modules: + raise NotImplementedError( + "The text_bytes_aware_merge function is not supported when " + "query-planning is enabled." + ) + + from dask.dataframe.core import new_dd_object + from dask.highlevelgraph import HighLevelGraph + if not isinstance(on, list): on = [on] diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index e104ee0c..850a8572 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys + import cudf import dask_cuda import numpy as np from dask import config -from dask.dataframe.shuffle import rearrange_by_column -from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle from packaging.version import Version from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import ( @@ -50,6 +50,10 @@ def rearange_by_column_direct( ): # Execute a "direct" shuffle operation without staging if config.get("explicit-comms", excomms_default): + from dask_cuda.explicit_comms.dataframe.shuffle import ( + shuffle as explicit_comms_shuffle, + ) + # Use explicit comms unless the user has # disabled it with the dask config system, # or we are using an older version of dask-cuda @@ -59,7 +63,28 @@ def rearange_by_column_direct( npartitions=npartitions, ignore_index=ignore_index, ) + + elif "dask_expr" in sys.modules: + from dask_expr._collection import new_collection + from dask_expr._shuffle import RearrangeByColumn + + # Use the internal dask-expr API + return new_collection( + RearrangeByColumn( + frame=df.expr, + partitioning_index=df[col].expr, + npartitions_out=npartitions, + ignore_index=ignore_index, + method="tasks", + # Prevent staged shuffling by setting max_branch + # to the number of input partitions + 1 + options={"max_branch": npartitions + 1}, + ) + ) + else: + from dask.dataframe.shuffle import rearrange_by_column + return rearrange_by_column( df, col=col, diff --git a/tests/__init__.py b/tests/__init__.py index ec57fdca..d9155f92 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -11,23 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import sys - -import dask - -# Disable query planning if possible -# https://github.com/NVIDIA/NeMo-Curator/issues/73 -if dask.config.get("dataframe.query-planning") is True or "dask_expr" in sys.modules: - raise NotImplementedError( - """ - NeMo Curator does not support query planning yet. - Please disable query planning before importing - `dask.dataframe` or `dask_cudf`. This can be done via: - `export DASK_DATAFRAME__QUERY_PLANNING=False`, or - importing `dask.dataframe/dask_cudf` after importing - `nemo_curator`. - """ - ) -else: - dask.config.set({"dataframe.query-planning": False}) From 66384e186df805b9349e4f0398b281a732487c8b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Jul 2024 10:51:07 -0700 Subject: [PATCH 2/7] add query_planning_enabled util Signed-off-by: rjzamora --- nemo_curator/_compat.py | 16 ++++++++++++++++ .../utils/fuzzy_dedup_utils/merge_utils.py | 5 ++--- .../utils/fuzzy_dedup_utils/shuffle_utils.py | 5 ++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/nemo_curator/_compat.py b/nemo_curator/_compat.py index a89426d5..9067dcb8 100644 --- a/nemo_curator/_compat.py +++ b/nemo_curator/_compat.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys + import dask from packaging.version import parse as parseVersion @@ -21,3 +23,17 @@ DASK_SHUFFLE_METHOD_ARG = _dask_version > parseVersion("2024.1.0") DASK_P2P_ERROR = _dask_version < parseVersion("2023.10.0") DASK_SHUFFLE_CAST_DTYPE = _dask_version > parseVersion("2023.12.0") + +# Query-planning check (and cache) +_DASK_QUERY_PLANNING_ENABLED = None + + +def query_planning_enabled(): + if _DASK_QUERY_PLANNING_ENABLED is None: + if _dask_version > parseVersion("2024.6.0"): + import dask.dataframe as dd + + _DASK_QUERY_PLANNING_ENABLED = dd.DASK_EXPR_ENABLED + else: + _DASK_QUERY_PLANNING_ENABLED = "dask_expr" in sys.modules + return _DASK_QUERY_PLANNING_ENABLED diff --git a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py index ef347bf8..f5bff316 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py @@ -13,7 +13,6 @@ # limitations under the License. import math -import sys from operator import getitem import numpy as np @@ -22,7 +21,7 @@ from dask.dataframe.shuffle import partitioning_index from dask.utils import M -from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE +from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE, query_planning_enabled def _split_part(part, nsplits): @@ -36,7 +35,7 @@ def _split_part(part, nsplits): def text_bytes_aware_merge(text_df, right_df, broadcast=True, *, on): - if "dask_expr" in sys.modules: + if query_planning_enabled(): raise NotImplementedError( "The text_bytes_aware_merge function is not supported when " "query-planning is enabled." diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 850a8572..af32494d 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys - import cudf import dask_cuda import numpy as np from dask import config from packaging.version import Version +from nemo_curator._compat import query_planning_enabled from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import ( build_partition, get_agg_text_bytes_df, @@ -64,7 +63,7 @@ def rearange_by_column_direct( ignore_index=ignore_index, ) - elif "dask_expr" in sys.modules: + elif query_planning_enabled(): from dask_expr._collection import new_collection from dask_expr._shuffle import RearrangeByColumn From 4200ea015a8614289f2f4c3183b78a0b07354793 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Jul 2024 10:58:23 -0700 Subject: [PATCH 3/7] add global keyword Signed-off-by: rjzamora --- nemo_curator/_compat.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_curator/_compat.py b/nemo_curator/_compat.py index 9067dcb8..6034067e 100644 --- a/nemo_curator/_compat.py +++ b/nemo_curator/_compat.py @@ -29,6 +29,8 @@ def query_planning_enabled(): + global _DASK_QUERY_PLANNING_ENABLED + if _DASK_QUERY_PLANNING_ENABLED is None: if _dask_version > parseVersion("2024.6.0"): import dask.dataframe as dd From 77682e909aecaed10eac238c38048b658059411d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Jul 2024 12:39:25 -0700 Subject: [PATCH 4/7] Forgot to remove top level query-planning check Signed-off-by: rjzamora --- nemo_curator/__init__.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/nemo_curator/__init__.py b/nemo_curator/__init__.py index b440156e..0508ada8 100644 --- a/nemo_curator/__init__.py +++ b/nemo_curator/__init__.py @@ -12,27 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys - import dask -# Disable query planning if possible -# https://github.com/NVIDIA/NeMo-Curator/issues/73 -if dask.config.get("dataframe.query-planning") is True or "dask_expr" in sys.modules: - raise NotImplementedError( - """ - NeMo Curator does not support query planning yet. - Please disable query planning before importing - `dask.dataframe` or `dask_cudf`. This can be done via: - `export DASK_DATAFRAME__QUERY_PLANNING=False`, or - importing `dask.dataframe/dask_cudf` after importing - `nemo_curator`. - """ - ) -else: - dask.config.set({"dataframe.query-planning": False}) - - from .modules import * from .utils.distributed_utils import get_client From 57628c98c74f97244b487bdd5daade9a1976c3b6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 8 Jul 2024 08:27:00 -0700 Subject: [PATCH 5/7] fix other shuffle-arg problems that don't 'work' with dask-expr Signed-off-by: rjzamora --- nemo_curator/modules/fuzzy_dedup.py | 12 ++++++------ .../utils/fuzzy_dedup_utils/output_map_utils.py | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 6694dd42..a05f063e 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -29,7 +29,6 @@ import numpy as np from cugraph import MultiGraph from dask import dataframe as dd -from dask.dataframe.shuffle import shuffle as dd_shuffle from dask.utils import M from tqdm import tqdm @@ -766,8 +765,7 @@ def map_buckets_with_anchors( transform_divisions=False, align_dataframes=False, ) - ddf_anchor_docs_with_bk = dd_shuffle( - ddf_anchor_docs_with_bk, + ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.shuffle( self.id_fields, ignore_index=True, shuffle_method=shuffle_type, @@ -1343,8 +1341,7 @@ def _write_dedup_encoded_jaccard_pair(self, encoded_jaccard_pair_path): align_dataframes=False, ) - ddf = dd_shuffle( - ddf, + ddf = ddf.shuffle( [self.left_id, self.right_id], ignore_index=True, shuffle_method="tasks", @@ -1392,7 +1389,10 @@ def _write_dedup_parsed_id(self): unique_docs = ddf.map_partitions( ConnectedComponents._get_unique_ids_per_partition, id_columns=id_columns ) - unique_docs = unique_docs.drop_duplicates(split_out=ddf.npartitions // 4) + unique_docs = unique_docs.drop_duplicates( + # Dask does not guard against split_out=0 + split_out=max(ddf.npartitions // 4, 1) + ) unique_docs["uid"] = np.uint64(1) unique_docs["uid"] = unique_docs["uid"].cumsum() unique_docs["uid"] = unique_docs["uid"] - 1 diff --git a/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py index 6a70e6d2..dc1f5795 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py @@ -19,7 +19,7 @@ import numba import numpy as np -from nemo_curator._compat import DASK_SHUFFLE_METHOD_ARG +from nemo_curator._compat import DASK_SHUFFLE_METHOD_ARG, query_planning_enabled def get_agg_text_bytes_df( @@ -32,11 +32,20 @@ def get_agg_text_bytes_df( """ Groupby bucket and calculate total bytes for a bucket. """ - shuffle_arg = "shuffle_method" if DASK_SHUFFLE_METHOD_ARG else "shuffle" + if query_planning_enabled(): + # `shuffle_method: bool` doesn't really make sense + # when query-planning is enabled, because dask-expr + # will ALWAYS use a shuffle-based reduction when + # `split_out>1` + shuffle_arg = {} + else: + shuffle_arg = { + "shuffle_method" if DASK_SHUFFLE_METHOD_ARG else "shuffle": shuffle + } agg_df = ( df[[agg_column, bytes_column]] .groupby([agg_column]) - .agg({bytes_column: "sum"}, split_out=n_partitions, **{shuffle_arg: shuffle}) + .agg({bytes_column: "sum"}, split_out=n_partitions, **shuffle_arg) ) agg_df = agg_df.reset_index(drop=False) # Doing a per partition sort From 8ba6e7af4091d1895070593363e5c214f89958b8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 16 Sep 2024 12:11:58 -0700 Subject: [PATCH 6/7] remove name arg usage for now Signed-off-by: rjzamora --- nemo_curator/datasets/doc_dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 48840270..b3c595cf 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -155,7 +155,6 @@ def from_pandas( npartitions=npartitions, chunksize=chunksize, sort=sort, - name=name, ) ) From 51e54ffd4e0832a428760b5cdbc6a71f636dec60 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Sep 2024 09:47:13 -0700 Subject: [PATCH 7/7] fix bugs Signed-off-by: rjzamora --- nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py | 2 +- nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index 40c42ec9..e434f62a 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -54,7 +54,7 @@ def main(args): dask_cudf.read_parquet(data_path, blocksize="2GB", aggregate_files=True) ) df = dask_cudf.concat(dfs, ignore_unknown_divisions=True) - df = df[~df.id_field.isna()] + df = df[~df[id_field].isna()] df = df.map_partitions( convert_str_id_to_int, id_column=id_field, diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 4b8d9286..7d05b399 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -74,7 +74,7 @@ def rearange_by_column_direct( return new_collection( RearrangeByColumn( frame=df.expr, - partitioning_index=df[col].expr, + partitioning_index=col, npartitions_out=npartitions, ignore_index=ignore_index, method="tasks",