Skip to content

Commit

Permalink
Merge pull request #4 from leap-stc/limit-concurrency-and-upstream-cl…
Browse files Browse the repository at this point in the history
…eanup

Refactor to upstream changes and limit concurrency
  • Loading branch information
jbusecke authored May 2, 2024
2 parents 067530d + 49fc434 commit 82838f0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 104 deletions.
2 changes: 1 addition & 1 deletion configs/config_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

c.Bake.prune = 0
c.Bake.bakery_class = "pangeo_forge_runner.bakery.dataflow.DataflowBakery"
c.DataflowBakery.use_dataflow_prime = False
c.DataflowBakery.use_dataflow_prime = True
c.DataflowBakery.use_public_ips = True
c.DataflowBakery.service_account_email = (
"julius-leap-dataflow@leap-pangeo.iam.gserviceaccount.com"
Expand Down
119 changes: 16 additions & 103 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,111 +19,24 @@
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
)
from ruamel.yaml import YAML

yaml = YAML(typ="safe")


# copied from cmip feedstock (TODO: move to central repo?)
@dataclass
class Copy(beam.PTransform):
target: str

def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
import os
import zarr
import gcsfs

# We do need the gs:// prefix?
# TODO: Determine this dynamically from zarr.storage.FSStore
source = f"gs://{os.path.normpath(store.path)}/" # FIXME more elegant. `.copytree` needs trailing slash
if self.target is False:
# dont do anything
return store
else:
fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this?
fs.cp(source, self.target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
return zarr.storage.FSStore(self.target)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | "Copying Store" >> beam.Map(self._copy)


@dataclass
class InjectAttrs(beam.PTransform):
inject_attrs: dict

def _update_zarr_attrs(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# TODO: Can we get a warning here if the store does not exist?
attrs = zarr.open(store, mode="a").attrs
attrs.update(self.inject_attrs)
# ? Should we consolidate here? We are explicitly doing that later...
return store

def expand(
self, pcoll: beam.PCollection[zarr.storage.FSStore]
) -> beam.PCollection[zarr.storage.FSStore]:
return pcoll | "Injecting Attributes" >> beam.Map(self._update_zarr_attrs)


def get_pangeo_forge_build_attrs() -> dict[str, Any]:
"""Get build information (git hash and time) to add to the recipe output"""
# Set up injection attributes
# This is for demonstration purposes only and should be discussed with the broader LEAP/PGF community
# - Bake in information from the top level of the meta.yaml
# - Add a timestamp
# - Add the git hash
# - Add link to the meta.yaml on main
# - Add the recipe id

git_url_hash = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/commit/{os.environ['GITHUB_SHA']}"
timestamp = datetime.now(timezone.utc).isoformat()

return {
"pangeo_forge_build_git_hash": git_url_hash,
"pangeo_forge_build_timestamp": timestamp,
}


# TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes?


def find_recipe_meta(catalog_meta: List[Dict[str, str]], r_id: str) -> Dict[str, str]:
# Iterate over each dictionary in the list
for d in catalog_meta:
# Check if the 'id' key matches the search_id
if d["id"] == r_id:
return d
print(
f"Could not find {r_id=}. Got the following recipe_ids: {[d['id'] for d in catalog_meta]}"
)
return None # Return None if no matching dictionary is found

from leap_data_management_utils.data_management_transforms import (
Copy,
InjectAttrs,
get_catalog_store_urls,
)

# load the global config values (we will have to decide where these ultimately live)
catalog_meta = yaml.load(open("feedstock/catalog.yaml"))
# parse the catalog store locations (this is where the data is copied to after successful write (and maybe testing)
catalog_store_urls = get_catalog_store_urls("feedstock/catalog.yaml")

print("DETECTING GITHUB ACTIONS RUN")
# if not run in a github workflow, assume local testing and deactivate the copy stage by setting all urls to False (see https://github.com/leap-stc/leap-data-management-utils/blob/b5762a17cbfc9b5036e1cd78d62c4e6a50c9691a/leap_data_management_utils/data_management_transforms.py#L121-L145)
if os.getenv("GITHUB_ACTIONS") == "true":
print("Running inside GitHub Actions.")

# Get final store path from catalog.yaml input
target_daily = find_recipe_meta(catalog_meta["stores"], "metaflux-daily")["url"]
target_monthly = find_recipe_meta(catalog_meta["stores"], "metaflux-monthly")["url"]
pgf_build_attrs = get_pangeo_forge_build_attrs()
else:
print("Running locally. Deactivating final copy stage.")
# this deactivates the final copy stage for local testing execution
target_daily = False
target_monthly = False
pgf_build_attrs = {}
catalog_store_urls = {k: False for k in catalog_store_urls.keys()}

print("Final output locations")
print(f"{target_daily=}")
print(f"{target_monthly=}")
print(f"{pgf_build_attrs=}")
print(f"{catalog_store_urls=}")

# Common Parameters
years = range(2001, 2022)
Expand All @@ -136,16 +49,16 @@ def find_recipe_meta(catalog_meta: List[Dict[str, str]], r_id: str) -> Dict[str,
pattern_monthly = pattern_from_file_sequence(input_urls_monthly, concat_dim="time")
METAFLUX_GPP_RECO_monthly = (
beam.Create(pattern_monthly.items())
| OpenURLWithFSSpec() # open_kwargs=open_kwargs
| OpenURLWithFSSpec(max_concurrency=1)
| OpenWithXarray()
| StoreToZarr(
store_name="METAFLUX_GPP_RECO_monthly.zarr",
combine_dims=pattern_monthly.combine_dim_keys,
)
| InjectAttrs(pgf_build_attrs)
| InjectAttrs()
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=target_monthly)
| Copy(target=catalog_store_urls["metaflux-monthly"])
)

## daily version
Expand All @@ -155,14 +68,14 @@ def find_recipe_meta(catalog_meta: List[Dict[str, str]], r_id: str) -> Dict[str,
pattern_daily = pattern_from_file_sequence(input_urls_daily, concat_dim="time")
METAFLUX_GPP_RECO_daily = (
beam.Create(pattern_daily.items())
| OpenURLWithFSSpec() # open_kwargs=open_kwargs
| OpenURLWithFSSpec(max_concurrency=1)
| OpenWithXarray()
| StoreToZarr(
store_name="METAFLUX_GPP_RECO_daily.zarr",
combine_dims=pattern_daily.combine_dim_keys,
)
| InjectAttrs(pgf_build_attrs)
| InjectAttrs()
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=target_daily)
| Copy(target=catalog_store_urls["metaflux-daily"])
)
1 change: 1 addition & 0 deletions feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pangeo-forge-recipes==0.10.7
apache-beam[gcp]
gcsfs
leap-data-management-utils==0.0.5

0 comments on commit 82838f0

Please sign in to comment.