From dbe76060c5b0b77880c8131112c001f35d7d32c1 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Tue, 23 Apr 2024 11:34:56 -0700 Subject: [PATCH] Improve speed of AddId module (#36) * Add fast id method Signed-off-by: Ryan Wolf * Add type conversion Signed-off-by: Ryan Wolf * Fix off by one errors in tests Signed-off-by: Ryan Wolf --------- Signed-off-by: Ryan Wolf Signed-off-by: Nicole Luo --- nemo_curator/modules/add_id.py | 45 ++++++++++++++++++++++++--- nemo_curator/scripts/add_id.py | 6 ++-- nemo_curator/utils/module_utils.py | 5 +++ tests/test_add_id.py | 50 ++++++++++++++++++++++++++---- 4 files changed, 94 insertions(+), 12 deletions(-) diff --git a/nemo_curator/modules/add_id.py b/nemo_curator/modules/add_id.py index e8f30739..83da7bd2 100644 --- a/nemo_curator/modules/add_id.py +++ b/nemo_curator/modules/add_id.py @@ -12,22 +12,58 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional + import dask.dataframe as dd import numpy as np from dask import delayed from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.module_utils import count_digits class AddId: - def __init__(self, id_field, id_prefix="doc_id", start_index=0) -> None: + def __init__( + self, id_field, id_prefix: str = "doc_id", start_index: Optional[int] = None + ) -> None: self.id_field = id_field self.id_prefix = id_prefix self.start_index = start_index def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + if self.start_index is None: + return self._add_id_fast(dataset) + else: + return self._add_id_ordered(dataset) + + def _add_id_fast(self, dataset: DocumentDataset) -> DocumentDataset: + meta = dataset.df.dtypes.to_dict() + meta[self.id_field] = "string" + + partition_zero_padding = count_digits(dataset.df.npartitions) + id_df = dataset.df.map_partitions( + self._add_id_fast_partition, + partition_zero_padding, + meta=meta, + ) + + return DocumentDataset(id_df) + + def _add_id_fast_partition(self, partition, global_padding, partition_info=None): + local_padding = count_digits(len(partition)) + global_id = partition_info["number"] + + id_column = [ + f"{self.id_prefix}-{local_id:0{local_padding}d}{global_id:0{global_padding}d}" + for local_id in range(len(partition)) + ] + partition[self.id_field] = id_column + + return partition + + def _add_id_ordered(self, dataset: DocumentDataset) -> DocumentDataset: original_meta = dataset.df.dtypes.to_dict() - original_meta[self.id_field] = "object" + original_meta[self.id_field] = "string" delayed_dataset = dataset.df.to_delayed() parition_lengths = [0] @@ -38,7 +74,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: delayed_id_dataset = [] for i, partition in enumerate(delayed_dataset): delayed_id_dataset.append( - delayed(self._add_id_to_partition)(partition, lower_id_bounds[i]) + delayed(self._add_id_ordered_partition)(partition, lower_id_bounds[i]) ) id_dataset = DocumentDataset( @@ -47,11 +83,12 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return id_dataset - def _add_id_to_partition(self, partition, partition_start_id): + def _add_id_ordered_partition(self, partition, partition_start_id): id_column = [ f"{self.id_prefix}-{int(i + self.start_index):010d}" for i in range(partition_start_id, len(partition) + partition_start_id) ] partition[self.id_field] = id_column + partition[self.id_field] = partition[self.id_field].astype("string") return partition diff --git a/nemo_curator/scripts/add_id.py b/nemo_curator/scripts/add_id.py index 4e49663a..3e91e806 100644 --- a/nemo_curator/scripts/add_id.py +++ b/nemo_curator/scripts/add_id.py @@ -79,8 +79,10 @@ def attach_args( parser.add_argument( "--starting-index", type=int, - default=0, - help="Starting index from which to start indexing the documents", + default=None, + help="If supplied, determines the starting index from which to start " + "indexing the documents. By default, it is unspecified, and uses an id" + " scheme that is fast to calculate and is not guaranteed to be ordered.", ) parser.add_argument( "--output-data-dir", diff --git a/nemo_curator/utils/module_utils.py b/nemo_curator/utils/module_utils.py index dc4a693d..388a949f 100644 --- a/nemo_curator/utils/module_utils.py +++ b/nemo_curator/utils/module_utils.py @@ -11,7 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import math def is_batched(function): return hasattr(function, "batched") and function.batched + + +def count_digits(num): + return math.floor(math.log10(num)) + 1 diff --git a/tests/test_add_id.py b/tests/test_add_id.py index 458b4868..42a8575e 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -16,7 +16,7 @@ import pandas as pd import pytest -import nemo_curator +import nemo_curator as nc from nemo_curator.datasets import DocumentDataset @@ -41,10 +41,10 @@ def two_partition_dataset(): ) -class TestPrepareTaskData: +class TestAddId: def test_basic_id(self, single_partition_dataset): id_field = "id" - add_id = nemo_curator.AddId(id_field) + add_id = nc.AddId(id_field, start_index=0) id_dataset = add_id(single_partition_dataset) actual_ids = id_dataset.df[id_field].compute() expected_ids = pd.Series( @@ -63,7 +63,7 @@ def test_basic_id(self, single_partition_dataset): def test_two_partitions(self, two_partition_dataset): id_field = "id" - add_id = nemo_curator.AddId(id_field) + add_id = nc.AddId(id_field, start_index=0) id_dataset = add_id(two_partition_dataset) actual_ids = id_dataset.df[id_field].compute() expected_ids = pd.Series( @@ -83,7 +83,7 @@ def test_two_partitions(self, two_partition_dataset): def test_id_prefix(self, two_partition_dataset): id_field = "id" id_prefix = "my_id" - add_id = nemo_curator.AddId(id_field, id_prefix=id_prefix) + add_id = nc.AddId(id_field, id_prefix=id_prefix, start_index=0) id_dataset = add_id(two_partition_dataset) actual_ids = id_dataset.df[id_field].compute() expected_ids = pd.Series( @@ -103,7 +103,7 @@ def test_id_prefix(self, two_partition_dataset): def test_start_index(self, two_partition_dataset): id_field = "id" start_index = 13 - add_id = nemo_curator.AddId(id_field, start_index=start_index) + add_id = nc.AddId(id_field, start_index=start_index) id_dataset = add_id(two_partition_dataset) actual_ids = id_dataset.df[id_field].compute() expected_ids = pd.Series( @@ -119,3 +119,41 @@ def test_start_index(self, two_partition_dataset): assert all( expected_ids == actual_ids ), f"Expected: {expected_ids}, got: {actual_ids}" + + def test_fast_id_single_partition(self, single_partition_dataset): + id_field = "id" + add_id = nc.AddId(id_field) + id_dataset = add_id(single_partition_dataset) + actual_ids = id_dataset.df[id_field].compute() + expected_ids = pd.Series( + [ + "doc_id-00", + "doc_id-10", + "doc_id-20", + "doc_id-30", + "doc_id-40", + ] + ) + + assert all( + expected_ids == actual_ids + ), f"Expected: {expected_ids}, got: {actual_ids}" + + def test_fast_id_two_partitions(self, two_partition_dataset): + id_field = "id" + add_id = nc.AddId(id_field) + id_dataset = add_id(two_partition_dataset) + actual_ids = id_dataset.df[id_field].compute() + expected_ids = pd.Series( + [ + "doc_id-00", + "doc_id-10", + "doc_id-20", + "doc_id-01", + "doc_id-11", + ] + ) + + assert all( + expected_ids == actual_ids + ), f"Expected: {expected_ids}, got: {actual_ids}"