Skip to content

Commit

Permalink
Merge pull request #170 from pangeo-forge/npz/feature/cut-0.9recipe-s…
Browse files Browse the repository at this point in the history
…upport

Remove support for pfr<=0.9.x
  • Loading branch information
ranchodeluxe authored Feb 15, 2024
2 parents 0eda0ca + 626a5d9 commit 028903c
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 106 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/dataflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ jobs:
matrix:
python-version: ["3.9", "3.10", "3.11"]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "pangeo-forge-recipes==0.10.0",
# "pangeo-forge-recipes==0.10.3",
"pangeo-forge-recipes==0.10.4",
]
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/flink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ jobs:
matrix:
python-version: [ "3.9", "3.10", "3.11" ]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "pangeo-forge-recipes==0.10.0",
# "pangeo-forge-recipes==0.10.3",
"pangeo-forge-recipes==0.10.4",
]
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ jobs:
matrix:
python-version: ["3.9", "3.10", "3.11"]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "pangeo-forge-recipes==0.10.0",
# "pangeo-forge-recipes==0.10.3",
"pangeo-forge-recipes==0.10.4",
]
Expand Down
1 change: 0 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
include LICENSE
include pangeo_forge_runner/commands/pangeo-forge-recipes-0.9-requirements.txt
14 changes: 14 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
coverage:
precision: 2
round: down
status:
project:
default:
target: 95
informational: true
patch: off
changes: off
ignore:
- "setup.py"
- "tests/*"
- "**/__init__.py"
6 changes: 0 additions & 6 deletions docs/reference/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,3 @@ where `pangeo-forge-runner` puts intermediate & final data products.
```{eval-rst}
.. autoconfigurable:: pangeo_forge_runner.storage.InputCacheStorage
```

## MetadataCacheStorage

```{eval-rst}
.. autoconfigurable:: pangeo_forge_runner.storage.MetadataCacheStorage
```
66 changes: 12 additions & 54 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,18 @@
from pathlib import Path

import escapism
from apache_beam import Pipeline, PTransform
from apache_beam import Pipeline
from traitlets import Bool, Type, Unicode, validate

from .. import Feedstock
from ..bakery.base import Bakery
from ..bakery.flink import FlinkOperatorBakery
from ..bakery.local import LocalDirectBakery
from ..plugin import get_injections, get_injectionspecs_from_entrypoints
from ..storage import InputCacheStorage, MetadataCacheStorage, TargetStorage
from ..storage import InputCacheStorage, TargetStorage
from ..stream_capture import redirect_stderr, redirect_stdout
from .base import BaseCommand, common_aliases, common_flags

PFR_0_9_REQUIREMENTS_FILE_PATH = (
Path(__file__).parent / "pangeo-forge-recipes-0.9-requirements.txt"
)


class Bake(BaseCommand):
"""
Expand Down Expand Up @@ -184,18 +180,13 @@ def start(self):
# with appropriate config from config file / commandline / defaults.
target_storage = TargetStorage(parent=self)
input_cache_storage = InputCacheStorage(parent=self)
metadata_cache_storage = MetadataCacheStorage(parent=self)

self.log.info(
f"Target Storage is {target_storage}\n", extra={"status": "setup"}
)
self.log.info(
f"Input Cache Storage is {input_cache_storage}\n", extra={"status": "setup"}
)
self.log.info(
f"Metadata Cache Storage is {metadata_cache_storage}\n",
extra={"status": "setup"},
)

injection_specs = get_injectionspecs_from_entrypoints()

Expand Down Expand Up @@ -236,17 +227,17 @@ def start(self):
self.log.info(f"Baking only recipe_id='{self.recipe_id}'")
recipes = {k: r for k, r in recipes.items() if k == self.recipe_id}

if self.prune:
# Prune recipes to only run on certain items if we are asked to
if hasattr(next(iter(recipes.values())), "copy_pruned"):
# pangeo-forge-recipes version < 0.10 has a `copy_pruned` method
recipes = {k: r.copy_pruned() for k, r in recipes.items()}

bakery: Bakery = self.bakery_class(parent=self)

extra_options = {}

for name, recipe in recipes.items():
if hasattr(recipe, "to_beam"):
# Catch recipes following pre-0.10 conventions and throw
raise ValueError(
"Unsupported recipe: please update to support pfr >=0.10 conventions."
)

if len(recipes) > 1:
recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5]
per_recipe_unique_job_name = (
Expand All @@ -261,16 +252,9 @@ def start(self):
else:
per_recipe_unique_job_name = None

# if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt
# file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes
if isinstance(recipe, PTransform):
requirements_path = feedstock.feedstock_dir / "requirements.txt"
if requirements_path.exists():
extra_options["requirements_file"] = str(requirements_path)
else:
extra_options["requirements_file"] = str(
PFR_0_9_REQUIREMENTS_FILE_PATH
)
requirements_path = feedstock.feedstock_dir / "requirements.txt"
if requirements_path.exists():
extra_options["requirements_file"] = str(requirements_path)

pipeline_options = bakery.get_pipeline_options(
job_name=(per_recipe_unique_job_name or self.job_name),
Expand All @@ -285,33 +269,7 @@ def start(self):
# Chain our recipe to the pipeline. This mutates the `pipeline` object!
# We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam'
# method that returns a transform.
if isinstance(recipe, PTransform):
# This means we are in pangeo-forge-recipes >=0.9
pipeline | recipe
elif hasattr(recipe, "to_beam"):
# We are in pangeo-forge-recipes <=0.9
# The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9
# NOTE: `StorageConfig` only requires a target; input and metadata caches are optional,
# so those are handled conditionally if provided.
from pangeo_forge_recipes.storage import StorageConfig

recipe.storage_config = StorageConfig(
target_storage.get_forge_target(job_name=self.job_name),
)
for attrname, optional_storage in zip(
("cache", "metadata"),
(input_cache_storage, metadata_cache_storage),
):
if not optional_storage.is_default():
setattr(
recipe.storage_config,
attrname,
optional_storage.get_forge_target(
job_name=self.job_name
),
)
# with configured storage now attached, compile recipe to beam
pipeline | recipe.to_beam()
pipeline | recipe

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
Expand Down

This file was deleted.

8 changes: 0 additions & 8 deletions pangeo_forge_runner/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,3 @@ class InputCacheStorage(StorageTargetConfig):
"""

pangeo_forge_target_class = "CacheFSSpecTarget"


class MetadataCacheStorage(StorageTargetConfig):
"""
Storage configuration for caching metadata during recipe baking
"""

pangeo_forge_target_class = "MetadataTarget"
12 changes: 0 additions & 12 deletions tests/integration/flink/test_flink_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
from importlib.metadata import version

import pytest
import xarray as xr
from packaging.version import parse as parse_version

Expand All @@ -19,12 +18,6 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
else:
recipe_version_ref = "0.9.x"
pytest.xfail(
f"{pfr_version = }, which is < 0.10. "
"Flink tests timeout with this recipes version, so we xfail this test."
)

bucket = "s3://gpcp-out"
config = {
Expand All @@ -47,11 +40,6 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):
"fsspec_args": fsspec_args,
"root_path": bucket + "/input-cache/{job_name}",
},
"MetadataCacheStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": bucket + "/metadata-cache/{job_name}",
},
"FlinkOperatorBakery": {
"flink_version": flinkversion,
"job_manager_resources": {"memory": "1024m", "cpu": 0.30},
Expand Down
8 changes: 3 additions & 5 deletions tests/integration/test_dataflow_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def test_dataflow_integration():
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
else:
recipe_version_ref = "0.9.x"
raise ValueError(
f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer."
)
bucket = "gs://pangeo-forge-runner-ci-testing"
config = {
"Bake": {
Expand All @@ -30,10 +32,6 @@ def test_dataflow_integration():
"fsspec_class": "gcsfs.GCSFileSystem",
"root_path": bucket + "/input-cache/{job_name}",
},
"MetadataCacheStorage": {
"fsspec_class": "gcsfs.GCSFileSystem",
"root_path": bucket + "/metadata-cache/{job_name}",
},
}

with tempfile.NamedTemporaryFile("w", suffix=".json") as f:
Expand Down
12 changes: 3 additions & 9 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ def recipes_version_ref(request):
if pfr_version >= parse_version("0.10"):
recipes_version_ref = "0.10.x"
else:
recipes_version_ref = "0.9.x"
raise ValueError(
f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer."
)
return (
recipes_version_ref
if not request.param == "dict_object"
Expand Down Expand Up @@ -168,11 +170,6 @@ def test_gpcp_bake(
"fsspec_args": fsspec_args,
"root_path": "s3://gpcp/input-cache/",
},
"MetadataCacheStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": "s3://gpcp/metadata-cache/",
},
}

if no_input_cache:
Expand Down Expand Up @@ -208,9 +205,6 @@ def test_gpcp_bake(
if expected_error:
assert proc.returncode == 1
stdout[-1] == expected_error
elif no_input_cache and recipes_version_ref == "0.9.x":
# no_input_cache is only supported in 0.10.x and above
assert proc.returncode == 1
else:
assert proc.returncode == 0

Expand Down

0 comments on commit 028903c

Please sign in to comment.