Skip to content

Commit

Permalink
Remove unused function
Browse files Browse the repository at this point in the history
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
  • Loading branch information
VibhuJawa committed Oct 15, 2024
1 parent ec09cfd commit f039fc9
Showing 1 changed file with 0 additions and 35 deletions.
35 changes: 0 additions & 35 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,41 +1712,6 @@ def _merge_and_write(self, ddf, ddf_id, output_path, id_columns):
et = time.time()
print(f"Merge and write completed in {et - st} seconds", flush=True)

def _batched_merge_and_write(
self, ddf, ddf_id, output_path, id_columns, batch_size=32
):
total_batches = (ddf.npartitions + batch_size - 1) // batch_size
for batch_id, offset in enumerate(range(0, ddf.npartitions, batch_size)):
st = time.time()
subset_ddf = ddf.partitions[offset : offset + batch_size]
for tag in ["x", "y"]:
pair_ids = []
for id_col in id_columns:
pair_ids.append(f"{id_col}_{tag}")
subset_ddf = subset_ddf.merge(
ddf_id,
left_on=pair_ids,
right_on=id_columns,
how="inner",
broadcast=True,
shuffle_method="p2p",
)
subset_ddf = subset_ddf.drop(
columns=pair_ids,
)
subset_ddf = subset_ddf.rename(
columns={"uid": f"{self.id_column}_{tag}"}
)

subset_ddf = subset_ddf[[self.left_id, self.right_id, "jaccard"]]
output_batch_path = os.path.join(output_path, f"{batch_id}.parquet")
subset_ddf.to_parquet(output_batch_path, write_index=False)

et = time.time()
print(
f"batch_id = {batch_id}/{total_batches}, time = {et - st}", flush=True
)

@staticmethod
def _get_unique_ids_per_partition(df, id_columns):
unique_df_ls = []
Expand Down

0 comments on commit f039fc9

Please sign in to comment.