Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port EOOffshore CCMP v0.2.1.NRT recipe to use beam-refactor branch #453

Open
derekocallaghan opened this issue Dec 20, 2022 · 3 comments
Open

Comments

@derekocallaghan
Copy link
Contributor

Following approach in #445, this will port the EOOffshore CCMP recipe (pangeo-forge/staged-recipes#145) to use the beam-refactor branch. It will involve writing a custom PTransform for computing new wind speed and direction variables etc.

@derekocallaghan
Copy link
Contributor Author

derekocallaghan commented Dec 20, 2022

Here's the current draft of the recipe port, containing a new IcsWindSpeedDirection transform, that appears to run successfully:

import apache_beam as beam
import dask
import dask.array as da
from dataclasses import dataclass, field
from datetime import datetime
from metpy.calc import wind_direction, wind_speed
import pandas as pd
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, prune_pattern
from pangeo_forge_recipes.transforms import _add_keys, OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from tempfile import TemporaryDirectory
import xarray as xr


missing_dates = [
    pd.Timestamp(d)
    for d in [
        "2017-03-11",
        "2017-03-12",
        "2017-04-10",
        "2017-05-25",
        "2017-05-26",
        "2018-01-04",
        "2020-07-09",
        "2020-07-13",
        "2020-07-15",
        "2020-08-04",
        "2020-08-09",
        "2020-08-11",
        "2020-10-22",
        "2020-10-23",
    ]
]

dates = pd.date_range("2015-01-16", "2021-09-30", freq="D")

# Drop missing dates
dates = dates.drop(missing_dates)


def make_url(time):
    year = time.strftime("%Y")
    month = time.strftime("%m")
    day = time.strftime("%d")
    url = f"https://data.remss.com/ccmp/v02.1.NRT/Y{year}/M{month}/CCMP_RT_Wind_Analysis_{year}{month}{day}_V02.1_L3.0_RSS.nc"
    return url


# Daily products with 6-hourly values
NITEMS_PER_FILE = 4

pattern = FilePattern(
    make_url,
    ConcatDim(name="time", keys=dates, nitems_per_file=NITEMS_PER_FILE),
)

pattern_pruned = prune_pattern(pattern)


def ics_wind_speed_direction(ds: xr.Dataset) -> xr.Dataset:
    """
    Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and
    direction for the u and v components in the specified product. Dask arrays are
    created for delayed execution.
    """
    # ICS grid
    geospatial_lat_min = 45.75
    geospatial_lat_max = 58.25
    geospatial_lon_min = 333.85
    geospatial_lon_max = 355.35
    icds = ds.sel(
        latitude=slice(geospatial_lat_min, geospatial_lat_max),
        longitude=slice(geospatial_lon_min, geospatial_lon_max),
    )

    # Remove subset of original attrs as they're no longer relevant
    for attr in ["base_date", "date_created", "history"]:
        del icds.attrs[attr]

    # Update the grid attributes
    icds.attrs.update(
        {
            "geospatial_lat_min": geospatial_lat_min,
            "geospatial_lat_max": geospatial_lat_max,
            "geospatial_lon_min": geospatial_lon_min,
            "geospatial_lon_max": geospatial_lon_max,
        }
    )
    u = icds.uwnd
    v = icds.vwnd
    # Original wind speed 'units': 'm s-1' attribute not accepted by MetPy,
    # use the unit contained in ERA5 data
    ccmp_wind_speed_units = u.units
    era5_wind_speed_units = "m s**-1"
    u.attrs["units"] = era5_wind_speed_units
    v.attrs["units"] = era5_wind_speed_units

    variables = [
        {
            "name": "wind_speed",
            "metpy_fn": wind_speed,
            "attrs": {"long_name": "Wind speed", "units": ccmp_wind_speed_units},
        },
        {
            "name": "wind_direction",
            "metpy_fn": wind_direction,
            "attrs": {"long_name": "Wind direction", "units": "degree"},
        },
    ]

    # CCMP provides u/v at a single height, 10m
    for variable in variables:
        icds[variable["name"]] = (
            xr.DataArray(
                da.from_delayed(
                    dask.delayed(variable["metpy_fn"](u, v).values), u.shape, dtype=u.dtype
                ),
                coords=u.coords,
                dims=u.dims,
            )
            .assign_coords(height=10)
            .expand_dims(["height"])
        )
        icds[variable["name"]].attrs.update(variable["attrs"])

    icds.height.attrs.update(
        {
            "long_name": "Height above the surface",
            "standard_name": "height",
            "units": "m",
        }
    )
    # Restore units
    for variable in ["uwnd", "vwnd"]:
        icds[variable].attrs["units"] = ccmp_wind_speed_units

    icds.attrs["eooffshore_zarr_creation_time"] = datetime.strftime(
        datetime.now(), "%Y-%m-%dT%H:%M:%SZ"
    )
    icds.attrs[
        "eooffshore_zarr_details"
    ] = "EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com."
    return icds


@dataclass
class IcsWindSpeedDirection(beam.PTransform):
    """
    Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and
    direction for the u and v components in the specified product. Dask arrays are
    created for delayed execution.
    """

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | beam.Map(_add_keys(ics_wind_speed_direction))


# TODO: I'm seeing issues where the required 8000 chunk size is used for the pruned recipe
# I think XarrayZarrRecipe reduces this to the total number of time observations when 
# they're < specified chunk size. I need to investigate further.
# For now, chunk size is based on the pruned pattern size
# target_chunks = {"time": 8000, "latitude": -1, "longitude": -1}
target_chunks = {"time": pattern_pruned.dims["time"] * NITEMS_PER_FILE, "latitude": -1, "longitude": -1}

td = TemporaryDirectory()
target_path = td.name + "/ccmp.zarr"
target_path = "./ccmp.zarr"
target_path

transforms = (
    beam.Create(pattern_pruned.items())
    | OpenURLWithFSSpec(cache_url="./testcache")
    | OpenWithXarray(file_type=pattern_pruned.file_type)
    | IcsWindSpeedDirection()
    | StoreToZarr(
        target_url=target_path,
        combine_dims=pattern.combine_dim_keys,
        target_chunks=target_chunks,
    )
)

with beam.Pipeline() as p:
    p | transforms

ds = xr.open_zarr(target_path)
print(ds)

I've confirmed the expected output as I did with the CCMP staged recipe:

In [86]: ds = xr.open_zarr('./ccmp.zarr')

In [87]: ds
Out[87]: 
<xarray.Dataset>
Dimensions:         (height: 1, latitude: 50, longitude: 86, time: 8)
Coordinates:
  * height          (height) int64 10
  * latitude        (latitude) float32 45.88 46.12 46.38 ... 57.62 57.88 58.12
  * longitude       (longitude) float32 333.9 334.1 334.4 ... 354.6 354.9 355.1
  * time            (time) datetime64[ns] 2015-01-16 2015-01-16 ... 2015-01-17
Data variables:
    nobs            (time, latitude, longitude) float32 dask.array<chunksize=(8, 50, 86), meta=np.ndarray>
    uwnd            (time, latitude, longitude) float32 dask.array<chunksize=(8, 50, 86), meta=np.ndarray>
    vwnd            (time, latitude, longitude) float32 dask.array<chunksize=(8, 50, 86), meta=np.ndarray>
    wind_direction  (height, time, latitude, longitude) float32 dask.array<chunksize=(1, 8, 50, 86), meta=np.ndarray>
    wind_speed      (height, time, latitude, longitude) float32 dask.array<chunksize=(1, 8, 50, 86), meta=np.ndarray>
Attributes: (12/35)
    Conventions:                    CF-1.6
    comment:                        none
    contact:                        Remote Sensing Systems, support@remss.com
    contributor_name:               Carl Mears, Joel Scott, Frank Wentz, Ross...
    contributor_role:               Co-Investigator, Software Engineer, Proje...
    creator_email:                  support@remss.com
    ...                             ...
    publisher_email:                support@remss.com
    publisher_name:                 Remote Sensing Systems
    publisher_url:                  http://www.remss.com/
    references:                     Mears et al., Journal of Geophysical Rese...
    summary:                        CCMP_RT V2.1 has been created using the s...
    title:                          RSS CCMP_RT V2.1 derived surface winds (L...

In [88]: ds.time.values
Out[88]: 
array(['2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000'],
      dtype='datetime64[ns]')

In [89]: ds.resample(time='D').mean().wind_speed.isel(latitude=0,longitude=0).compute()
Out[89]: 
<xarray.DataArray 'wind_speed' (time: 2, height: 1)>
array([[10.636068],
       [14.07321 ]], dtype=float32)
Coordinates:
  * height     (height) int64 10
    latitude   float32 45.88
    longitude  float32 333.9
  * time       (time) datetime64[ns] 2015-01-16 2015-01-17
Attributes:
    long_name:  Wind speed
    units:      m s-1

In [90]: ds.eooffshore_zarr_details
Out[90]: 'EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com.'

FYI @rabernat:

  • I rechecked the code after yesterday's call, and I'd forgotten to specify target_chunks to StoreToZarr. Once I did this, the time chunking was as expected.
  • I think that XarrayZarrRecipe currently reduces the (time) chunk size when the pruned pattern total size is lower (here, 8) than the specified chunk size (e.g. 8000). With the Beam version, if I specify the full size (8000) it looks like this reduction isn't happening, so I've based the size on the pruned pattern for now. I'll look into this and get back to you.
  • As mentioned yesterday, I'm currently using pangeo_forge_recipes.transforms._add_keys in the new transform.

@derekocallaghan
Copy link
Contributor Author

It looks like the time values aren't being stored correctly in the output Zarr, i.e. in the output above:

In [88]: ds.time.values
Out[88]: 
array(['2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000'],
      dtype='datetime64[ns]')

These should be 6-hourly values as generated by the original recipe e.g.:

In [4]: ds.time.values
Out[4]: 
array(['2015-01-16T00:00:00.000000000', '2015-01-16T06:00:00.000000000',
       '2015-01-16T12:00:00.000000000', '2015-01-16T18:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-17T06:00:00.000000000',
       '2015-01-17T12:00:00.000000000', '2015-01-17T18:00:00.000000000'],
      dtype='datetime64[ns]')

@derekocallaghan
Copy link
Contributor Author

This might be a candidate for the proposed separate issue for migrating tutorials from notebooks -> scripts (with CI testing)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant