diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 125bc6ca..b42b8b7e 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1712,41 +1712,6 @@ def _merge_and_write(self, ddf, ddf_id, output_path, id_columns): et = time.time() print(f"Merge and write completed in {et - st} seconds", flush=True) - def _batched_merge_and_write( - self, ddf, ddf_id, output_path, id_columns, batch_size=32 - ): - total_batches = (ddf.npartitions + batch_size - 1) // batch_size - for batch_id, offset in enumerate(range(0, ddf.npartitions, batch_size)): - st = time.time() - subset_ddf = ddf.partitions[offset : offset + batch_size] - for tag in ["x", "y"]: - pair_ids = [] - for id_col in id_columns: - pair_ids.append(f"{id_col}_{tag}") - subset_ddf = subset_ddf.merge( - ddf_id, - left_on=pair_ids, - right_on=id_columns, - how="inner", - broadcast=True, - shuffle_method="p2p", - ) - subset_ddf = subset_ddf.drop( - columns=pair_ids, - ) - subset_ddf = subset_ddf.rename( - columns={"uid": f"{self.id_column}_{tag}"} - ) - - subset_ddf = subset_ddf[[self.left_id, self.right_id, "jaccard"]] - output_batch_path = os.path.join(output_path, f"{batch_id}.parquet") - subset_ddf.to_parquet(output_batch_path, write_index=False) - - et = time.time() - print( - f"batch_id = {batch_id}/{total_batches}, time = {et - st}", flush=True - ) - @staticmethod def _get_unique_ids_per_partition(df, id_columns): unique_df_ls = []