Skip to content

Commit

Permalink
Remove workflow code from dataloaders
Browse files Browse the repository at this point in the history
We should be doing online transforms like
```KerasSequenceLoader(workflow.transform(dataset), ...```  instead of
```KerasSequenceLoader(dataset, workflows=[workflow], ...``` now
  • Loading branch information
benfred committed Dec 16, 2020
1 parent 82f1c17 commit b21ffd8
Showing 1 changed file with 0 additions and 32 deletions.
32 changes: 0 additions & 32 deletions nvtabular/loader/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from nvtabular.io.shuffle import _shuffle_gdf
from nvtabular.ops import _get_embedding_order
from nvtabular.workflow import BaseWorkflow


def _num_steps(num_samples, step_size):
Expand Down Expand Up @@ -155,21 +154,6 @@ def get_batch_div_chunk(self, chunks, batch_size):
return chunks, spill


def _validate_workflows(workflows, cat_names, cont_names, label_names):
assert all([isinstance(w, BaseWorkflow) for w in workflows])
# TODO: commenting out until it's clearer what the
# columns in workflow.columns_cts["final"]["ctx"] mean
# for workflow in workflows:
# assert set(workflow.columns_ctx["categorical"]["base"]) == set(cat_names)
# assert set(workflow.columns_ctx["continuous"]["base"]) == set(cont_names)
# assert set(workflow.columns_ctx["label"]["base"]) == set(label_names)

# cat_names = workflow.columns_ctx["final"]["ctx"]["categorical"]
# cont_names = workflow.columns_ctx["final"]["ctx"]["continuous"]
# label_name = workflow.columns_ctx["final"]["ctx"]["label"][0]
return workflows


# TODO: implement as metaclass and assign methods to children
# to avoid having to do Dataset.<method> calls?
class DataLoader:
Expand All @@ -184,15 +168,12 @@ def __init__(
batch_size,
shuffle,
parts_per_chunk=1,
workflows=None,
devices=None,
):
self.data = dataset
self.indices = cp.arange(dataset.to_ddf().npartitions)

devices = devices or [0]
workflows = workflows or []
self.workflows = _validate_workflows(workflows, cat_names, cont_names, label_names)

self.cat_names = cat_names or []
self.cont_names = cont_names or []
Expand Down Expand Up @@ -298,20 +279,7 @@ def _get_next_batch(self):
batch = next(self._batch_itr)
return batch

def map(self, workflow):
"""
Map an NVTabular Workflow on to a data loader to do
online preprocessing
"""
workflows = self.workflows + [workflow]
self.workflows = _validate_workflows(
workflows, self.cat_names, self.cont_names, self.label_names
)
# TODO: update cat/cont/label names after

def make_tensors(self, gdf, use_nnz=False):
for workflow in self.workflows:
gdf = workflow.apply_ops(gdf)
split_idx = self._get_segment_lengths(len(gdf))

# map from big chunk to framework-specific tensors
Expand Down

0 comments on commit b21ffd8

Please sign in to comment.