Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor executors #77

Merged
merged 23 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install .[dev]
- name: Run pre-commit
uses: pre-commit/action@v2.0.0
- name: Test with pytest
run: |
py.test tests -v --cov=rechunker --cov-config .coveragerc --cov-report term-missing
coverage xml
- name: Codecov
uses: codecov/codecov-action@v1
- name: Check type hints
run: |
mypy rechunker

lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Run pre-commit
uses: pre-commit/action@v2.0.0
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
repos:
- repo: https://github.com/pycqa/isort
rev: 5.6.4
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/python/black
rev: 19.10b0
hooks:
Expand All @@ -9,3 +14,7 @@ repos:
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.790
hooks:
- id: mypy
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
print("sys.path:", sys.path)


import rechunker
import sphinx_pangeo_theme # noqa: F401

import rechunker

# -- Project information -----------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion rechunker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
except ImportError:
__version__ = "unknown"

from .api import rechunk, Rechunked
from .api import Rechunked, rechunk
2 changes: 1 addition & 1 deletion rechunker/algorithm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Core rechunking algorithm stuff."""
from typing import Sequence, Optional, List, Tuple
from typing import List, Optional, Sequence, Tuple

from rechunker.compat import prod

Expand Down
33 changes: 13 additions & 20 deletions rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
import textwrap
from typing import Union

import zarr
import dask
import dask.array
import xarray

from rechunker.algorithm import rechunking_plan
from rechunker.types import ArrayProxy, CopySpec, Executor
import zarr
from xarray.backends.zarr import (
DIMENSION_KEY,
encode_zarr_attr_value,
encode_zarr_variable,
extract_zarr_variable_encoding,
DIMENSION_KEY,
)
from xarray.conventions import encode_dataset_coordinates

from rechunker.algorithm import rechunking_plan
from rechunker.types import ArrayProxy, CopySpec, CopySpecExecutor


class Rechunked:
"""
Expand Down Expand Up @@ -178,25 +178,25 @@ def _validate_options(options):
)


def _get_executor(name: str) -> Executor:
def _get_executor(name: str) -> CopySpecExecutor:
# converts a string name into a Executor instance
# imports are conditional to avoid hard dependencies
if name.lower() == "dask":
from rechunker.executors.dask import DaskExecutor
from rechunker.pipeline import DaskCopySpecExecutor

return DaskExecutor()
return DaskCopySpecExecutor()
elif name.lower() == "beam":
from rechunker.executors.beam import BeamExecutor

return BeamExecutor()
elif name.lower() == "prefect":
from rechunker.executors.prefect import PrefectExecutor
from rechunker.pipeline import PrefectCopySpecExecutor

return PrefectExecutor()
return PrefectCopySpecExecutor()
elif name.lower() == "python":
from rechunker.executors.python import PythonExecutor
from rechunker.pipeline import PythonCopySpecExecutor

return PythonExecutor()
return PythonCopySpecExecutor()
elif name.lower() == "pywren":
from rechunker.executors.pywren import PywrenExecutor

Expand All @@ -213,7 +213,7 @@ def rechunk(
target_options=None,
temp_store=None,
temp_options=None,
executor: Union[str, Executor] = "dask",
executor: Union[str, CopySpecExecutor] = "dask",
) -> Rechunked:
"""
Rechunk a Zarr Array or Group, a Dask Array, or an Xarray Dataset
Expand Down Expand Up @@ -285,13 +285,6 @@ def rechunk(
"""
if isinstance(executor, str):
executor = _get_executor(executor)
if isinstance(source, (dask.array.Array, xarray.Dataset)):
from rechunker.executors.dask import DaskExecutor

if not isinstance(executor, DaskExecutor):
raise NotImplementedError(
f"Executor type {type(executor)} not supported for source {type(source)}."
)

copy_spec, intermediate, target = _setup_rechunk(
source=source,
Expand Down
2 changes: 1 addition & 1 deletion rechunker/compat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import reduce
import operator
from functools import reduce
from typing import Sequence


Expand Down
5 changes: 5 additions & 0 deletions rechunker/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .dask import DaskPipelineExecutor
from .prefect import PrefectPipelineExecutor
from .python import PythonPipelineExecutor

__all__ = ["PythonPipelineExecutor", "DaskPipelineExecutor", "PrefectPipelineExecutor"]
4 changes: 2 additions & 2 deletions rechunker/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
chunk_keys,
split_into_direct_copies,
)
from rechunker.types import CopySpec, Executor, ReadableArray, WriteableArray
from rechunker.types import CopySpec, CopySpecExecutor, ReadableArray, WriteableArray


class BeamExecutor(Executor[beam.PTransform]):
class BeamExecutor(CopySpecExecutor[beam.PTransform]):
"""An execution engine based on Apache Beam.

Supports copying between any arrays that implement ``__getitem__`` and
Expand Down
165 changes: 76 additions & 89 deletions rechunker/executors/dask.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,103 @@
import uuid
from typing import Iterable, Tuple
from functools import reduce
from typing import Iterable

import dask
import dask.array
from dask.delayed import Delayed
from dask.optimization import fuse

from rechunker.types import Array, CopySpec, Executor
from rechunker.types import (
MultiStagePipeline,
ParallelPipelines,
PipelineExecutor,
Stage,
)


class DaskExecutor(Executor[Delayed]):
class DaskPipelineExecutor(PipelineExecutor[Delayed]):
"""An execution engine based on dask.

Supports zarr and dask arrays as inputs. Outputs must be zarr arrays.

Execution plans for DaskExecutors are dask.delayed objects.
"""

def prepare_plan(self, specs: Iterable[CopySpec]) -> Delayed:
return _copy_all(specs)
def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Delayed:
return _make_pipelines(pipelines)

def execute_plan(self, plan: Delayed, **kwargs):
return plan.compute(**kwargs)


def _direct_array_copy(
source: Array, target: Array, chunks: Tuple[int, ...]
) -> Delayed:
"""Direct copy between arrays."""
if isinstance(source, dask.array.Array):
source_read = source
def _make_pipelines(pipelines: ParallelPipelines) -> Delayed:
pipelines_delayed = [_make_pipeline(pipeline) for pipeline in pipelines]
return _merge(*pipelines_delayed)


def _make_pipeline(pipeline: MultiStagePipeline) -> Delayed:
stages_delayed = [_make_stage(stage) for stage in pipeline]
d = reduce(_add_upstream, stages_delayed)
return d


def _make_stage(stage: Stage) -> Delayed:
if stage.map_args is None:
return dask.delayed(stage.func)()
else:
source_read = dask.array.from_zarr(source, chunks=chunks)
target_store_delayed = dask.array.store(
source_read, target, lock=False, compute=False
name = stage.func.__name__ + "-" + dask.base.tokenize(stage.func)
dsk = {(name, i): (stage.func, arg) for i, arg in enumerate(stage.map_args)}
# create a barrier
top_key = "stage-" + dask.base.tokenize(stage.func, stage.map_args)

def merge_all(*args):
# this function is dependent on its arguments but doesn't actually do anything
return None

dsk.update({top_key: (merge_all, *list(dsk))})
return Delayed(top_key, dsk)


def _merge_task(*args):
pass


def _merge(*args: Iterable[Delayed]) -> Delayed:
name = "merge-" + dask.base.tokenize(*args)
# mypy doesn't like arg.key
keys = [getattr(arg, "key") for arg in args]
new_task = (_merge_task, *keys)
# mypy doesn't like arg.dask
graph = dask.base.merge(
*[dask.utils.ensure_dict(getattr(arg, "dask")) for arg in args]
)
return target_store_delayed
graph[name] = new_task
d = Delayed(name, graph)
return d


def _chunked_array_copy(spec: CopySpec) -> Delayed:
"""Chunked copy between arrays."""
if spec.intermediate.array is None:
target_store_delayed = _direct_array_copy(
spec.read.array, spec.write.array, spec.read.chunks,
)
def _add_upstream(first: Delayed, second: Delayed):
upstream_key = first.key
dsk = second.dask
top_layer = _get_top_layer(dsk)
new_top_layer = {}

# fuse
target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
dsk_fused, _ = fuse(target_dsk)
for key, value in top_layer.items():
new_top_layer[key] = ((lambda a, b: a), value, upstream_key)

return Delayed(target_store_delayed.key, dsk_fused)
dsk_new = dask.base.merge(
dask.utils.ensure_dict(first.dask), dask.utils.ensure_dict(dsk), new_top_layer
)

return Delayed(second.key, dsk_new)


def _get_top_layer(dsk):
if hasattr(dsk, "layers"):
# this is a HighLevelGraph
top_layer_key = list(dsk.layers)[0]
top_layer = dsk.layers[top_layer_key]
else:
# do intermediate store
int_store_delayed = _direct_array_copy(
spec.read.array, spec.intermediate.array, spec.read.chunks,
)
target_store_delayed = _direct_array_copy(
spec.intermediate.array, spec.write.array, spec.write.chunks,
)

# now do some hacking to chain these together into a single graph.
# get the two graphs as dicts
int_dsk = dask.utils.ensure_dict(int_store_delayed.dask)
target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)

# find the root store key representing the read
root_keys = []
for key in target_dsk:
if isinstance(key, str):
if key.startswith("from-zarr"):
root_keys.append(key)
assert len(root_keys) == 1
root_key = root_keys[0]

# now rewrite the graph
target_dsk[root_key] = (
lambda a, *b: a,
target_dsk[root_key],
*int_dsk[int_store_delayed.key],
)
target_dsk.update(int_dsk)

# fuse
dsk_fused, _ = fuse(target_dsk)
return Delayed(target_store_delayed.key, dsk_fused)


def _barrier(*args):
return None


def _copy_all(specs: Iterable[CopySpec],) -> Delayed:

stores_delayed = [_chunked_array_copy(spec) for spec in specs]

if len(stores_delayed) == 1:
return stores_delayed[0]

# This next block makes a task that
# 1. depends on each of the component arrays
# 2. but doesn't require transmitting large dependencies (depend on barrier_name,
# rather than on part.key directly) to compute the result
always_new_token = uuid.uuid1().hex
barrier_name = "barrier-" + always_new_token
dsk2 = {
(barrier_name, i): (_barrier, part.key) for i, part in enumerate(stores_delayed)
}

name = "rechunked-" + dask.base.tokenize([x.name for x in stores_delayed])
dsk = dask.base.merge(*[x.dask for x in stores_delayed], dsk2)
dsk[name] = (_barrier,) + tuple(
(barrier_name, i) for i, _ in enumerate(stores_delayed)
)
return Delayed(name, dsk)
# could this go wrong?
first_key = next(iter(dsk))
first_task = first_key[0].split("-")[0]
top_layer = {k: v for k, v in dsk.items() if k[0].startswith(first_task + "-")}
return top_layer
Loading