diff --git a/nvtabular/loader/backend.py b/nvtabular/loader/backend.py index 3f74dcffe72..05c3557af56 100644 --- a/nvtabular/loader/backend.py +++ b/nvtabular/loader/backend.py @@ -23,7 +23,6 @@ from nvtabular.io.shuffle import _shuffle_gdf from nvtabular.ops import _get_embedding_order -from nvtabular.workflow import BaseWorkflow def _num_steps(num_samples, step_size): @@ -155,21 +154,6 @@ def get_batch_div_chunk(self, chunks, batch_size): return chunks, spill -def _validate_workflows(workflows, cat_names, cont_names, label_names): - assert all([isinstance(w, BaseWorkflow) for w in workflows]) - # TODO: commenting out until it's clearer what the - # columns in workflow.columns_cts["final"]["ctx"] mean - # for workflow in workflows: - # assert set(workflow.columns_ctx["categorical"]["base"]) == set(cat_names) - # assert set(workflow.columns_ctx["continuous"]["base"]) == set(cont_names) - # assert set(workflow.columns_ctx["label"]["base"]) == set(label_names) - - # cat_names = workflow.columns_ctx["final"]["ctx"]["categorical"] - # cont_names = workflow.columns_ctx["final"]["ctx"]["continuous"] - # label_name = workflow.columns_ctx["final"]["ctx"]["label"][0] - return workflows - - # TODO: implement as metaclass and assign methods to children # to avoid having to do Dataset. calls? class DataLoader: @@ -184,15 +168,12 @@ def __init__( batch_size, shuffle, parts_per_chunk=1, - workflows=None, devices=None, ): self.data = dataset self.indices = cp.arange(dataset.to_ddf().npartitions) devices = devices or [0] - workflows = workflows or [] - self.workflows = _validate_workflows(workflows, cat_names, cont_names, label_names) self.cat_names = cat_names or [] self.cont_names = cont_names or [] @@ -298,20 +279,7 @@ def _get_next_batch(self): batch = next(self._batch_itr) return batch - def map(self, workflow): - """ - Map an NVTabular Workflow on to a data loader to do - online preprocessing - """ - workflows = self.workflows + [workflow] - self.workflows = _validate_workflows( - workflows, self.cat_names, self.cont_names, self.label_names - ) - # TODO: update cat/cont/label names after - def make_tensors(self, gdf, use_nnz=False): - for workflow in self.workflows: - gdf = workflow.apply_ops(gdf) split_idx = self._get_segment_lengths(len(gdf)) # map from big chunk to framework-specific tensors