diff --git a/docs/user-guide/QualityFiltering.rst b/docs/user-guide/QualityFiltering.rst index 46a8c9d8..ba2c34ad 100644 --- a/docs/user-guide/QualityFiltering.rst +++ b/docs/user-guide/QualityFiltering.rst @@ -153,6 +153,33 @@ Here is the ``WordCountFilter`` rewritten to use batches in the ``keep_document` pass_max = score <= self._max_words return pass_min & pass_max +When you use the ``batched`` decorator, the index of the series returned from the function must remain the same as the index that was passed in. +The index may not be continuous due to filters being applied prior to the current filter. +In the above code, the index will be the same automatically so no change is required. +However, when writing functions that transform the series into a different structure like a list, special care is needed. +The following code example demonstrates what this error may look like, and how to fix it. + +.. code-block:: python + + class BuggyLengthFilter(DocumentFilter): + + @batched + def score_document(self, documents: pd.Series): + scores = [] + for document in documents: + scores.append(len(document)) + + return pd.Series(scores) # Bad! Does not preserve the index + + class CorrectLengthFilter(DocumentFilter): + + @batched + def score_document(self, documents: pd.Series): + scores = [] + for document in documents: + scores.append(len(document)) + + return pd.Series(scores, index=documents.index) # Good! Preserves the index ----------------------------------------- diff --git a/nemo_curator/filters/classifier_filter.py b/nemo_curator/filters/classifier_filter.py index 3ade004e..4f06c8b2 100644 --- a/nemo_curator/filters/classifier_filter.py +++ b/nemo_curator/filters/classifier_filter.py @@ -37,7 +37,7 @@ def __init__(self, model_path=None, label="__label__hq", alpha=3, seed=42): self._name = "fasttext_quality_filter" @batched - def score_document(self, df): + def score_document(self, df: pd.Series): model_attr = f"{self._name}_{self._model_path}" try: model = load_object_on_worker(model_attr, self._load_model, {}) @@ -56,7 +56,7 @@ def _score_document(text): return df.apply(_score_document) @batched - def keep_document(self, df): + def keep_document(self, df: pd.Series): return np.random.pareto(self._alpha, size=len(df)) > 1 - df def _load_model(self): @@ -82,7 +82,7 @@ def __init__(self, model_path=None, min_langid_score=0.3): dask.config.set({"dataframe.convert-string": False}) @batched - def score_document(self, df): + def score_document(self, df: pd.Series): model_attr = f"{self._name}_{self._model_path}" try: model = load_object_on_worker(model_attr, self._load_model, {}) diff --git a/nemo_curator/modifiers/__init__.py b/nemo_curator/modifiers/__init__.py index 4c05a31e..f6511fdb 100644 --- a/nemo_curator/modifiers/__init__.py +++ b/nemo_curator/modifiers/__init__.py @@ -15,6 +15,7 @@ from .c4 import BoilerPlateStringModifier from .doc_modifier import DocumentModifier from .fasttext import FastTextLabelModifier +from .pii_modifier import PiiModifier from .unicode_reformatter import UnicodeReformatter __all__ = [ @@ -22,4 +23,5 @@ "BoilerPlateStringModifier", "FastTextLabelModifier", "UnicodeReformatter", + "PiiModifier", ] diff --git a/nemo_curator/modifiers/pii_modifier.py b/nemo_curator/modifiers/pii_modifier.py index 23c713fb..c2a398b4 100644 --- a/nemo_curator/modifiers/pii_modifier.py +++ b/nemo_curator/modifiers/pii_modifier.py @@ -85,8 +85,8 @@ def modify_document(self, text: pd.Series, partition_info: Dict = None): logging.error( f"Encountered error {str(e)} in partition {partition_info['number']}" ) - return pd.Series([True]) - output: pd.Series = pd.Series(output) + return pd.Series([True], index=text.index) + output: pd.Series = pd.Series(output, text.index) return output def load_deidentifier(self): diff --git a/tests/test_filters.py b/tests/test_filters.py index 50676f38..951c1977 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -282,6 +282,23 @@ def test_score_type(self, letter_count_data): expected_scores == scores.compute() ), f"Expected {expected_scores} but got {scores}" + def test_chain_filter(self, letter_count_data): + letter_count_filter = LetterCountFilter(min_count=4) + length_filter = BatchedLengthFilter(min_length=8, max_length=11) + filters = Sequential( + [ + ScoreFilter(letter_count_filter, text_field="documents"), + ScoreFilter(length_filter, text_field="documents"), + ] + ) + filtered_data = filters(letter_count_data) + + expected_indices = [2] + expected_data = DocumentDataset(letter_count_data.df.loc[expected_indices]) + assert all_equal( + expected_data, filtered_data + ), f"Expected {expected_data} but got {filtered_data}" + class TestHeuristicFilters: def test_nonalpha(self): diff --git a/tests/test_pii_accuracy.py b/tests/test_pii_accuracy.py index 9431779a..7e7d5866 100644 --- a/tests/test_pii_accuracy.py +++ b/tests/test_pii_accuracy.py @@ -16,9 +16,17 @@ import re from pathlib import Path +import pandas as pd import pytest +from dask import dataframe as dd +from dask.distributed import Client, LocalCluster +import nemo_curator as nc +from nemo_curator.datasets import DocumentDataset +from nemo_curator.filters import DocumentFilter +from nemo_curator.modifiers import PiiModifier from nemo_curator.pii.algorithm import PiiDeidentifier +from nemo_curator.utils.decorators import batched LOGGER = logging.getLogger(__name__) @@ -118,3 +126,63 @@ def test_batch_accuracy(self): match = all(compare_outputs(x, y) for x, y in zip(outputs, targets)) print("Matches:", "No" if not match else "Yes") assert match == True + + +class BatchedLengthFilter(DocumentFilter): + """ + Keeps documents of a given length + """ + + def __init__(self, min_length=5, max_length=10): + super().__init__() + self.min_length = min_length + self.max_length = max_length + + @batched + def score_document(self, df): + return df.str.len() + + @batched + def keep_document(self, scores): + min_threshold = self.min_length <= scores + max_threshold = scores <= self.max_length + return min_threshold & max_threshold + + +class TestPIIModule: + def test_filter_chain(self): + inputs = [ + "Alice goes on a walk", + "Bob goes on a walk", + "Someone named Charlie goes on a walk", + "A human walking is David", + "A human walking is Eliza", + ] + targets = [ + "***** goes on a walk", + "*** goes on a walk", + "A human walking is *****", + "A human walking is *****", + ] + input_df = pd.DataFrame({"text": inputs}) + target_df = pd.DataFrame({"text": targets}) + with LocalCluster(n_workers=1, threads_per_worker=1) as cluster: + with Client(cluster): + input_dataset = DocumentDataset(dd.from_pandas(input_df, npartitions=1)) + pipeline = nc.Sequential( + [ + nc.ScoreFilter( + BatchedLengthFilter(min_length=0, max_length=25) + ), + nc.Modify( + PiiModifier( + language="en", anonymize_action="mask", device="cpu" + ) + ), + ] + ) + output_dataset = pipeline(input_dataset) + + output_df = output_dataset.df.compute().reset_index(drop=True) + match = all(output_df["text"] == target_df["text"]) + assert match