diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b1b79c8..6f7cc17 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -19,14 +19,20 @@ jobs: run: | python -m pip install --upgrade pip pip install .[dev] - - name: Run pre-commit - uses: pre-commit/action@v2.0.0 - name: Test with pytest run: | py.test tests -v --cov=rechunker --cov-config .coveragerc --cov-report term-missing coverage xml - name: Codecov uses: codecov/codecov-action@v1 - - name: Check type hints - run: | - mypy rechunker + + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.8 + uses: actions/setup-python@v2 + with: + python-version: 3.8 + - name: Run pre-commit + uses: pre-commit/action@v2.0.0 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d354670..0d011a5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,9 @@ repos: +- repo: https://github.com/pycqa/isort + rev: 5.6.4 + hooks: + - id: isort + args: ["--profile", "black", "--filter-files"] - repo: https://github.com/python/black rev: 19.10b0 hooks: @@ -9,3 +14,7 @@ repos: hooks: - id: flake8 language_version: python3 +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.790 + hooks: + - id: mypy diff --git a/docs/conf.py b/docs/conf.py index 2f444d3..37da4a4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -17,9 +17,9 @@ print("sys.path:", sys.path) -import rechunker import sphinx_pangeo_theme # noqa: F401 +import rechunker # -- Project information ----------------------------------------------------- diff --git a/rechunker/__init__.py b/rechunker/__init__.py index 43b9c60..44018dc 100644 --- a/rechunker/__init__.py +++ b/rechunker/__init__.py @@ -4,4 +4,4 @@ except ImportError: __version__ = "unknown" -from .api import rechunk, Rechunked +from .api import Rechunked, rechunk diff --git a/rechunker/algorithm.py b/rechunker/algorithm.py index 2bd3a64..d4abf55 100644 --- a/rechunker/algorithm.py +++ b/rechunker/algorithm.py @@ -1,5 +1,5 @@ """Core rechunking algorithm stuff.""" -from typing import Sequence, Optional, List, Tuple +from typing import List, Optional, Sequence, Tuple from rechunker.compat import prod diff --git a/rechunker/api.py b/rechunker/api.py index aeb1366..a04d5b1 100644 --- a/rechunker/api.py +++ b/rechunker/api.py @@ -3,21 +3,21 @@ import textwrap from typing import Union -import zarr import dask import dask.array import xarray - -from rechunker.algorithm import rechunking_plan -from rechunker.types import ArrayProxy, CopySpec, Executor +import zarr from xarray.backends.zarr import ( + DIMENSION_KEY, encode_zarr_attr_value, encode_zarr_variable, extract_zarr_variable_encoding, - DIMENSION_KEY, ) from xarray.conventions import encode_dataset_coordinates +from rechunker.algorithm import rechunking_plan +from rechunker.types import ArrayProxy, CopySpec, CopySpecExecutor + class Rechunked: """ @@ -178,25 +178,25 @@ def _validate_options(options): ) -def _get_executor(name: str) -> Executor: +def _get_executor(name: str) -> CopySpecExecutor: # converts a string name into a Executor instance # imports are conditional to avoid hard dependencies if name.lower() == "dask": - from rechunker.executors.dask import DaskExecutor + from rechunker.pipeline import DaskCopySpecExecutor - return DaskExecutor() + return DaskCopySpecExecutor() elif name.lower() == "beam": from rechunker.executors.beam import BeamExecutor return BeamExecutor() elif name.lower() == "prefect": - from rechunker.executors.prefect import PrefectExecutor + from rechunker.pipeline import PrefectCopySpecExecutor - return PrefectExecutor() + return PrefectCopySpecExecutor() elif name.lower() == "python": - from rechunker.executors.python import PythonExecutor + from rechunker.pipeline import PythonCopySpecExecutor - return PythonExecutor() + return PythonCopySpecExecutor() elif name.lower() == "pywren": from rechunker.executors.pywren import PywrenExecutor @@ -213,7 +213,7 @@ def rechunk( target_options=None, temp_store=None, temp_options=None, - executor: Union[str, Executor] = "dask", + executor: Union[str, CopySpecExecutor] = "dask", ) -> Rechunked: """ Rechunk a Zarr Array or Group, a Dask Array, or an Xarray Dataset @@ -285,13 +285,6 @@ def rechunk( """ if isinstance(executor, str): executor = _get_executor(executor) - if isinstance(source, (dask.array.Array, xarray.Dataset)): - from rechunker.executors.dask import DaskExecutor - - if not isinstance(executor, DaskExecutor): - raise NotImplementedError( - f"Executor type {type(executor)} not supported for source {type(source)}." - ) copy_spec, intermediate, target = _setup_rechunk( source=source, diff --git a/rechunker/compat.py b/rechunker/compat.py index 7663465..6438a21 100644 --- a/rechunker/compat.py +++ b/rechunker/compat.py @@ -1,5 +1,5 @@ -from functools import reduce import operator +from functools import reduce from typing import Sequence diff --git a/rechunker/executors/__init__.py b/rechunker/executors/__init__.py index e69de29..97582ee 100644 --- a/rechunker/executors/__init__.py +++ b/rechunker/executors/__init__.py @@ -0,0 +1,5 @@ +from .dask import DaskPipelineExecutor +from .prefect import PrefectPipelineExecutor +from .python import PythonPipelineExecutor + +__all__ = ["PythonPipelineExecutor", "DaskPipelineExecutor", "PrefectPipelineExecutor"] diff --git a/rechunker/executors/beam.py b/rechunker/executors/beam.py index e6153cc..a2d51b0 100644 --- a/rechunker/executors/beam.py +++ b/rechunker/executors/beam.py @@ -8,10 +8,10 @@ chunk_keys, split_into_direct_copies, ) -from rechunker.types import CopySpec, Executor, ReadableArray, WriteableArray +from rechunker.types import CopySpec, CopySpecExecutor, ReadableArray, WriteableArray -class BeamExecutor(Executor[beam.PTransform]): +class BeamExecutor(CopySpecExecutor[beam.PTransform]): """An execution engine based on Apache Beam. Supports copying between any arrays that implement ``__getitem__`` and diff --git a/rechunker/executors/dask.py b/rechunker/executors/dask.py index 2946c26..dcf9727 100644 --- a/rechunker/executors/dask.py +++ b/rechunker/executors/dask.py @@ -1,15 +1,19 @@ -import uuid -from typing import Iterable, Tuple +from functools import reduce +from typing import Iterable import dask import dask.array from dask.delayed import Delayed -from dask.optimization import fuse -from rechunker.types import Array, CopySpec, Executor +from rechunker.types import ( + MultiStagePipeline, + ParallelPipelines, + PipelineExecutor, + Stage, +) -class DaskExecutor(Executor[Delayed]): +class DaskPipelineExecutor(PipelineExecutor[Delayed]): """An execution engine based on dask. Supports zarr and dask arrays as inputs. Outputs must be zarr arrays. @@ -17,100 +21,83 @@ class DaskExecutor(Executor[Delayed]): Execution plans for DaskExecutors are dask.delayed objects. """ - def prepare_plan(self, specs: Iterable[CopySpec]) -> Delayed: - return _copy_all(specs) + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Delayed: + return _make_pipelines(pipelines) def execute_plan(self, plan: Delayed, **kwargs): return plan.compute(**kwargs) -def _direct_array_copy( - source: Array, target: Array, chunks: Tuple[int, ...] -) -> Delayed: - """Direct copy between arrays.""" - if isinstance(source, dask.array.Array): - source_read = source +def _make_pipelines(pipelines: ParallelPipelines) -> Delayed: + pipelines_delayed = [_make_pipeline(pipeline) for pipeline in pipelines] + return _merge(*pipelines_delayed) + + +def _make_pipeline(pipeline: MultiStagePipeline) -> Delayed: + stages_delayed = [_make_stage(stage) for stage in pipeline] + d = reduce(_add_upstream, stages_delayed) + return d + + +def _make_stage(stage: Stage) -> Delayed: + if stage.map_args is None: + return dask.delayed(stage.func)() else: - source_read = dask.array.from_zarr(source, chunks=chunks) - target_store_delayed = dask.array.store( - source_read, target, lock=False, compute=False + name = stage.func.__name__ + "-" + dask.base.tokenize(stage.func) + dsk = {(name, i): (stage.func, arg) for i, arg in enumerate(stage.map_args)} + # create a barrier + top_key = "stage-" + dask.base.tokenize(stage.func, stage.map_args) + + def merge_all(*args): + # this function is dependent on its arguments but doesn't actually do anything + return None + + dsk.update({top_key: (merge_all, *list(dsk))}) + return Delayed(top_key, dsk) + + +def _merge_task(*args): + pass + + +def _merge(*args: Iterable[Delayed]) -> Delayed: + name = "merge-" + dask.base.tokenize(*args) + # mypy doesn't like arg.key + keys = [getattr(arg, "key") for arg in args] + new_task = (_merge_task, *keys) + # mypy doesn't like arg.dask + graph = dask.base.merge( + *[dask.utils.ensure_dict(getattr(arg, "dask")) for arg in args] ) - return target_store_delayed + graph[name] = new_task + d = Delayed(name, graph) + return d -def _chunked_array_copy(spec: CopySpec) -> Delayed: - """Chunked copy between arrays.""" - if spec.intermediate.array is None: - target_store_delayed = _direct_array_copy( - spec.read.array, spec.write.array, spec.read.chunks, - ) +def _add_upstream(first: Delayed, second: Delayed): + upstream_key = first.key + dsk = second.dask + top_layer = _get_top_layer(dsk) + new_top_layer = {} - # fuse - target_dsk = dask.utils.ensure_dict(target_store_delayed.dask) - dsk_fused, _ = fuse(target_dsk) + for key, value in top_layer.items(): + new_top_layer[key] = ((lambda a, b: a), value, upstream_key) - return Delayed(target_store_delayed.key, dsk_fused) + dsk_new = dask.base.merge( + dask.utils.ensure_dict(first.dask), dask.utils.ensure_dict(dsk), new_top_layer + ) + return Delayed(second.key, dsk_new) + + +def _get_top_layer(dsk): + if hasattr(dsk, "layers"): + # this is a HighLevelGraph + top_layer_key = list(dsk.layers)[0] + top_layer = dsk.layers[top_layer_key] else: - # do intermediate store - int_store_delayed = _direct_array_copy( - spec.read.array, spec.intermediate.array, spec.read.chunks, - ) - target_store_delayed = _direct_array_copy( - spec.intermediate.array, spec.write.array, spec.write.chunks, - ) - - # now do some hacking to chain these together into a single graph. - # get the two graphs as dicts - int_dsk = dask.utils.ensure_dict(int_store_delayed.dask) - target_dsk = dask.utils.ensure_dict(target_store_delayed.dask) - - # find the root store key representing the read - root_keys = [] - for key in target_dsk: - if isinstance(key, str): - if key.startswith("from-zarr"): - root_keys.append(key) - assert len(root_keys) == 1 - root_key = root_keys[0] - - # now rewrite the graph - target_dsk[root_key] = ( - lambda a, *b: a, - target_dsk[root_key], - *int_dsk[int_store_delayed.key], - ) - target_dsk.update(int_dsk) - - # fuse - dsk_fused, _ = fuse(target_dsk) - return Delayed(target_store_delayed.key, dsk_fused) - - -def _barrier(*args): - return None - - -def _copy_all(specs: Iterable[CopySpec],) -> Delayed: - - stores_delayed = [_chunked_array_copy(spec) for spec in specs] - - if len(stores_delayed) == 1: - return stores_delayed[0] - - # This next block makes a task that - # 1. depends on each of the component arrays - # 2. but doesn't require transmitting large dependencies (depend on barrier_name, - # rather than on part.key directly) to compute the result - always_new_token = uuid.uuid1().hex - barrier_name = "barrier-" + always_new_token - dsk2 = { - (barrier_name, i): (_barrier, part.key) for i, part in enumerate(stores_delayed) - } - - name = "rechunked-" + dask.base.tokenize([x.name for x in stores_delayed]) - dsk = dask.base.merge(*[x.dask for x in stores_delayed], dsk2) - dsk[name] = (_barrier,) + tuple( - (barrier_name, i) for i, _ in enumerate(stores_delayed) - ) - return Delayed(name, dsk) + # could this go wrong? + first_key = next(iter(dsk)) + first_task = first_key[0].split("-")[0] + top_layer = {k: v for k, v in dsk.items() if k[0].startswith(first_task + "-")} + return top_layer diff --git a/rechunker/executors/prefect.py b/rechunker/executors/prefect.py index 712a70c..54b041e 100644 --- a/rechunker/executors/prefect.py +++ b/rechunker/executors/prefect.py @@ -1,17 +1,9 @@ -from typing import Iterable, Tuple - import prefect -from rechunker.executors.util import chunk_keys, split_into_direct_copies -from rechunker.types import ( - CopySpec, - Executor, - ReadableArray, - WriteableArray, -) +from rechunker.types import ParallelPipelines, PipelineExecutor -class PrefectExecutor(Executor[prefect.Flow]): +class PrefectPipelineExecutor(PipelineExecutor[prefect.Flow]): """An execution engine based on Prefect. Supports copying between any arrays that implement ``__getitem__`` and @@ -21,33 +13,45 @@ class PrefectExecutor(Executor[prefect.Flow]): Execution plans for PrefectExecutor are prefect.Flow objects. """ - def prepare_plan(self, specs: Iterable[CopySpec]) -> prefect.Flow: - return _make_flow(specs) + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> prefect.Flow: + return _make_flow(pipelines) def execute_plan(self, plan: prefect.Flow, **kwargs): - return plan.run(**kwargs) + state = plan.run(**kwargs) + return state + + +class MappedTaskWrapper(prefect.Task): + def __init__(self, stage, **kwargs): + self.stage = stage + super().__init__(**kwargs) + + def run(self, key): + return self.stage.func(key) + +class SingleTaskWrapper(prefect.Task): + def __init__(self, stage, **kwargs): + self.stage = stage + super().__init__(**kwargs) -@prefect.task -def _copy_chunk( - source: ReadableArray, target: WriteableArray, key: Tuple[int, ...] -) -> None: - target[key] = source[key] + def run(self): + return self.stage.func() -def _make_flow(specs: Iterable[CopySpec]) -> prefect.Flow: +def _make_flow(pipelines: ParallelPipelines) -> prefect.Flow: with prefect.Flow("Rechunker") as flow: # iterate over different arrays in the group - for spec in specs: - copy_tasks = [] + for pipeline in pipelines: + stage_tasks = [] # iterate over the different stages of the array copying - for (source, target, chunks) in split_into_direct_copies(spec): - keys = list(chunk_keys(source.shape, chunks)) - copy_task = _copy_chunk.map( - prefect.unmapped(source), prefect.unmapped(target), keys - ) - copy_tasks.append(copy_task) + for stage in pipeline: + if stage.map_args is None: + stage_task = SingleTaskWrapper(stage) + else: + stage_task = MappedTaskWrapper(stage).map(stage.map_args) + stage_tasks.append(stage_task) # create dependence between stages - for n in range(len(copy_tasks) - 1): - copy_tasks[n + 1].set_upstream(copy_tasks[n]) + for n in range(len(stage_tasks) - 1): + stage_tasks[n + 1].set_upstream(stage_tasks[n]) return flow diff --git a/rechunker/executors/python.py b/rechunker/executors/python.py index 8ba834e..fa69d4d 100644 --- a/rechunker/executors/python.py +++ b/rechunker/executors/python.py @@ -1,17 +1,14 @@ from functools import partial +from typing import Callable, Iterable -from typing import Callable, Iterable, Tuple - -from rechunker.executors.util import chunk_keys, split_into_direct_copies -from rechunker.types import CopySpec, Executor, ReadableArray, WriteableArray - +from rechunker.types import ParallelPipelines, PipelineExecutor # PythonExecutor represents delayed execution tasks as functions that require # no arguments. Task = Callable[[], None] -class PythonExecutor(Executor[Task]): +class PythonPipelineExecutor(PipelineExecutor[Task]): """An execution engine based on Python loops. Supports copying between any arrays that implement ``__getitem__`` and @@ -20,25 +17,21 @@ class PythonExecutor(Executor[Task]): Execution plans for PythonExecutor are functions that accept no arguments. """ - def prepare_plan(self, specs: Iterable[CopySpec]) -> Task: + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Task: tasks = [] - for spec in specs: - for direct_spec in split_into_direct_copies(spec): - tasks.append(partial(_direct_array_copy, *direct_spec)) + for pipeline in pipelines: + for stage in pipeline: + if stage.map_args is None: + tasks.append(stage.func) + else: + for arg in stage.map_args: + tasks.append(partial(stage.func, arg)) return partial(_execute_all, tasks) def execute_plan(self, plan: Task, **kwargs): plan() -def _direct_array_copy( - source: ReadableArray, target: WriteableArray, chunks: Tuple[int, ...] -) -> None: - """Direct copy between arrays.""" - for key in chunk_keys(source.shape, chunks): - target[key] = source[key] - - def _execute_all(tasks: Iterable[Task]) -> None: for task in tasks: task() diff --git a/rechunker/executors/pywren.py b/rechunker/executors/pywren.py index 5bdb34d..781dfd2 100644 --- a/rechunker/executors/pywren.py +++ b/rechunker/executors/pywren.py @@ -1,19 +1,18 @@ from functools import partial - from typing import Callable, Iterable, Tuple -from rechunker.executors.util import chunk_keys, split_into_direct_copies -from rechunker.types import CopySpec, Executor, ReadableArray, WriteableArray - import pywren_ibm_cloud as pywren from pywren_ibm_cloud.executor import FunctionExecutor +from rechunker.executors.util import chunk_keys, split_into_direct_copies +from rechunker.types import CopySpec, CopySpecExecutor, ReadableArray, WriteableArray + # PywrenExecutor represents delayed execution tasks as functions that require # a FunctionExecutor. Task = Callable[[FunctionExecutor], None] -class PywrenExecutor(Executor[Task]): +class PywrenExecutor(CopySpecExecutor[Task]): """An execution engine based on Pywren. Supports zarr arrays as inputs. Outputs must be zarr arrays. diff --git a/rechunker/executors/util.py b/rechunker/executors/util.py index be20310..9dada6e 100644 --- a/rechunker/executors/util.py +++ b/rechunker/executors/util.py @@ -1,6 +1,5 @@ import itertools import math - from typing import Iterator, NamedTuple, Tuple from rechunker.types import CopySpec, ReadableArray, WriteableArray diff --git a/rechunker/pipeline.py b/rechunker/pipeline.py new file mode 100644 index 0000000..ec721b0 --- /dev/null +++ b/rechunker/pipeline.py @@ -0,0 +1,96 @@ +import itertools +import math +from typing import Any, Iterable, Iterator, Tuple, TypeVar + +import dask +import numpy as np + +from .executors.dask import DaskPipelineExecutor +from .executors.prefect import PrefectPipelineExecutor +from .executors.python import PythonPipelineExecutor +from .types import ( + CopySpec, + CopySpecExecutor, + MultiStagePipeline, + ParallelPipelines, + ReadableArray, + Stage, + WriteableArray, +) + + +def chunk_keys( + shape: Tuple[int, ...], chunks: Tuple[int, ...] +) -> Iterator[Tuple[slice, ...]]: + """Iterator over array indexing keys of the desired chunk sized. + + The union of all keys indexes every element of an array of shape ``shape`` + exactly once. Each array resulting from indexing is of shape ``chunks``, + except possibly for the last arrays along each dimension (if ``chunks`` + do not even divide ``shape``). + """ + ranges = [range(math.ceil(s / c)) for s, c in zip(shape, chunks)] + for indices in itertools.product(*ranges): + yield tuple( + slice(c * i, min(c * (i + 1), s)) for i, s, c in zip(indices, shape, chunks) + ) + + +def copy_stage( + source: ReadableArray, target: WriteableArray, chunks: Tuple[int, ...] +) -> Stage: + # use a closure to eliminate extra arguments + def _copy_chunk(chunk_key): + # calling np.asarray here allows the source to be a dask array + # TODO: could we asyncify this to operate in a streaming fashion + # make sure this is not happening inside a dask scheduler + print(f"_copy_chunk({chunk_key})") + with dask.config.set(scheduler="single-threaded"): + data = np.asarray(source[chunk_key]) + target[chunk_key] = data + + keys = list(chunk_keys(source.shape, chunks)) + return Stage(_copy_chunk, keys) + + +def spec_to_pipeline(spec: CopySpec) -> MultiStagePipeline: + pipeline = [] + if spec.intermediate.array is None: + pipeline.append(copy_stage(spec.read.array, spec.write.array, spec.read.chunks)) + else: + pipeline.append( + copy_stage(spec.read.array, spec.intermediate.array, spec.read.chunks) + ) + pipeline.append( + copy_stage(spec.intermediate.array, spec.write.array, spec.write.chunks) + ) + return pipeline + + +def specs_to_pipelines(specs: Iterable[CopySpec]) -> ParallelPipelines: + return [spec_to_pipeline(spec) for spec in specs] + + +T = TypeVar("T") + + +class CopySpecToPipelinesMixin(CopySpecExecutor): + def prepare_plan(self, specs: Iterable[CopySpec]) -> T: + pipelines = specs_to_pipelines(specs) + return self.pipelines_to_plan(pipelines) + + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Any: + """Transform ParallelPiplines to an execution plan""" + raise NotImplementedError + + +class PythonCopySpecExecutor(PythonPipelineExecutor, CopySpecToPipelinesMixin): + pass + + +class DaskCopySpecExecutor(DaskPipelineExecutor, CopySpecToPipelinesMixin): + pass + + +class PrefectCopySpecExecutor(PrefectPipelineExecutor, CopySpecToPipelinesMixin): + pass diff --git a/rechunker/types.py b/rechunker/types.py index d2769b5..aa44431 100644 --- a/rechunker/types.py +++ b/rechunker/types.py @@ -1,5 +1,14 @@ """Types definitions used by executors.""" -from typing import Any, Generic, Iterable, NamedTuple, Optional, Tuple, TypeVar +from typing import ( + Any, + Callable, + Generic, + Iterable, + NamedTuple, + Optional, + Tuple, + TypeVar, +) # TODO: replace with Protocols, once Python 3.8+ is required Array = Any @@ -47,18 +56,54 @@ class CopySpec(NamedTuple): write: ArrayProxy +class Stage(NamedTuple): + """A Stage is when single function is mapped over multiple imputs. + + Attributes + ---------- + func: Callable + A function to be called in this stage. Accepts either 0 or 1 arguments. + map_args: List, Optional + Arguments which will be mapped to the function + """ + + func: Callable + map_args: Optional[Iterable] = None + # TODO: figure out how to make optional, like for a dataclass + # annotations: Dict = {} + + +# A MultiStagePipeline contains one or more stages, to be executed in sequence +MultiStagePipeline = Iterable[Stage] + +# ParallelPipelines contains one or more MultiStagePipelines, to be executed in parallel +ParallelPipelines = Iterable[MultiStagePipeline] + T = TypeVar("T") -class Executor(Generic[T]): - """Base class for execution engines. +class PipelineExecutor(Generic[T]): + """Base class for pipeline-based execution engines. Executors prepare and execute scheduling plans, in whatever form is most convenient for users of that executor. """ - # TODO: add support for multi-stage copying plans (in the form of a new, - # dedicated method) + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> T: + """Convert pipeline specifications into a plan.""" + raise NotImplementedError + + def execute_plan(self, plan: T, **kwargs): + """Execute a plan.""" + raise NotImplementedError + + +class CopySpecExecutor(Generic[T]): + """Base class for copy-spec execution engines. + + Executors prepare and execute scheduling plans, in whatever form is most + convenient for users of that executor. + """ def prepare_plan(self, specs: Iterable[CopySpec]) -> T: """Convert copy specifications into a plan.""" diff --git a/setup.cfg b/setup.cfg index 9a326ca..d514d49 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,5 +26,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pywren_ibm_cloud.*] ignore_missing_imports = True +[mypy-numpy.*] +ignore_missing_imports = True [mypy-zarr.*] ignore_missing_imports = True diff --git a/setup.py b/setup.py index 91d3962..4724d73 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ import os + from setuptools import find_packages, setup -from setuptools import setup here = os.path.dirname(__file__) with open(os.path.join(here, "README.md"), encoding="utf-8") as f: diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index b4df1de..fd6189f 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -1,13 +1,12 @@ #!/usr/bin/env python """Tests for `rechunker` package.""" -from rechunker.compat import prod - -import pytest -from hypothesis import given, assume import hypothesis.strategies as st +import pytest +from hypothesis import assume, given from rechunker.algorithm import consolidate_chunks, rechunking_plan +from rechunker.compat import prod @pytest.mark.parametrize("shape, chunks", [((8, 8), (1, 2))]) diff --git a/tests/test_compat.py b/tests/test_compat.py index de2600b..ab20008 100644 --- a/tests/test_compat.py +++ b/tests/test_compat.py @@ -1,4 +1,5 @@ import numpy as np + from rechunker.compat import prod diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py new file mode 100644 index 0000000..56427c4 --- /dev/null +++ b/tests/test_pipelines.py @@ -0,0 +1,45 @@ +""" +Test ParallelPiplines and related executors +""" + +import pytest + +from rechunker.executors.dask import DaskPipelineExecutor +from rechunker.executors.prefect import PrefectPipelineExecutor +from rechunker.executors.python import PythonPipelineExecutor +from rechunker.types import Stage + + +@pytest.fixture +def example_pipeline(tmpdir_factory): + + tmp = tmpdir_factory.mktemp("pipeline_data") + + def func0(): + tmp.join("func0.log").ensure(file=True) + assert not tmp.join("func1_a.log").check(file=True) + + stage0 = Stage(func0) + + def func1(arg): + tmp.join(f"func1_{arg}.log").ensure(file=True) + + stage1 = Stage(func1, ["a", "b", 3]) + + # MultiStagePipeline + pipeline = [stage0, stage1] + # ParallelPipelines + pipelines = [pipeline] + return pipelines, tmp + + +@pytest.mark.parametrize( + "Executor", [PythonPipelineExecutor, DaskPipelineExecutor, PrefectPipelineExecutor] +) +def test_pipeline(example_pipeline, Executor): + pipeline, tmpdir = example_pipeline + executor = Executor() + plan = executor.pipelines_to_plan(pipeline) + executor.execute_plan(plan) + for fname in ["func0.log", "func1_a.log", "func1_b.log", "func1_3.log"]: + assert tmpdir.join(fname).check(file=True) diff --git a/tests/test_rechunk.py b/tests/test_rechunk.py index fd5cc90..3350809 100644 --- a/tests/test_rechunk.py +++ b/tests/test_rechunk.py @@ -1,19 +1,18 @@ -from functools import partial import importlib -import pytest - +from functools import partial from pathlib import Path -import zarr -import dask.array as dsa + import dask +import dask.array as dsa import dask.core -import xarray -import numpy import fsspec +import numpy +import pytest +import xarray +import zarr from rechunker import api - _DIMENSION_KEY = "_ARRAY_DIMENSIONS" @@ -50,7 +49,7 @@ def test_invalid_executor(): [{"a": (20, 10), "b": (20,)}, {"a": {"x": 20, "y": 10}, "b": {"x": 20}}], ) @pytest.mark.parametrize("max_mem", ["10MB"]) -@pytest.mark.parametrize("executor", ["dask"]) +@pytest.mark.parametrize("executor", ["dask", "python", "prefect"]) @pytest.mark.parametrize("target_store", ["target.zarr", "mapper.target.zarr"]) @pytest.mark.parametrize("temp_store", ["temp.zarr", "mapper.temp.zarr"]) def test_rechunk_dataset( @@ -105,7 +104,12 @@ def test_rechunk_dataset( executor=executor, ) assert isinstance(rechunked, api.Rechunked) - rechunked.execute() + with dask.config.set(scheduler="single-threaded"): + rechunked.execute() + + # check zarr store directly + # zstore = zarr.open_group(target_store) + # print(zstore.tree()) # Validate encoded variables dst = xarray.open_zarr(target_store, decode_cf=False) @@ -479,36 +483,6 @@ def test_rechunk_invalid_source(tmp_path): ) -@pytest.mark.parametrize( - "source,target_chunks", - [ - (sample_xarray_dataset(), {"a": (10, 5, 4), "b": (100,)}), - (dsa.ones((20, 10), chunks=(5, 5)), (10, 10)), - ], -) -@pytest.mark.parametrize( - "executor", - [ - "python", - requires_beam("beam"), - requires_prefect("prefect"), - requires_pywren("pywren"), - ], -) -def test_unsupported_executor(tmp_path, source, target_chunks, executor): - with pytest.raises( - NotImplementedError, match="Executor type .* not supported for source", - ): - api.rechunk( - source, - target_chunks=target_chunks, - max_mem=1600, - target_store=str(tmp_path / "target.zarr"), - temp_store=str(tmp_path / "temp.zarr"), - executor=executor, - ) - - def test_rechunk_no_target_chunks(rechunk_args): rechunk_args = dict(rechunk_args) if _is_collection(rechunk_args["source"]): @@ -549,8 +523,8 @@ def test_no_intermediate_fused(tmp_path): def test_pywren_function_executor(tmp_path): pytest.importorskip("pywren_ibm_cloud") from rechunker.executors.pywren import ( - pywren_local_function_executor, PywrenExecutor, + pywren_local_function_executor, ) # Create a Pywren function exectutor that we manage ourselves