Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StoreToZarr _invert_meshgrid() assertion error. #520

Closed
kbren opened this issue May 1, 2023 · 18 comments
Closed

StoreToZarr _invert_meshgrid() assertion error. #520

kbren opened this issue May 1, 2023 · 18 comments

Comments

@kbren
Copy link

kbren commented May 1, 2023

I've been running the beam-refactor branch following the example of netcdf_zarr_sequential.ipynb with a dataset with dimensions (time, height, lat, lon) and am running into an error at the following assertion.

assert all(
np.equal(actual, expected).all() for actual, expected in zip(arrays, np.meshgrid(*xi))
)

I'm wondering if the _invert_meshgrid() function does not generalize to more than two dimensions? Specifically, np.meshgrid() only acts on the first two dimensions, so the dimensions of "actual" and "expected" do not match. When I comment out this assertion, things seem to run.

See the full error message below:

Traceback
  Traceback (most recent call last):
  File "/homedirs/katibren/develop/pangeo-forge-recipes/pangeo_forge_recipes/rechunking.py", line 197, in combine_fragments
    starts = _invert_meshgrid(*starts_cube[::-1])[::-1]
  File "/homedirs/katibren/develop/pangeo-forge-recipes/pangeo_forge_recipes/rechunking.py", line 128, in _invert_meshgrid
    assert all(
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/homedirs/katibren/develop/pangeo-forge-recipes/pangeo_forge_recipes/rechunking.py", line 200, in combine_fragments
    raise ValueError("Cannot combine fragments because they do not form a regular hypercube.")
ValueError: Cannot combine fragments because they do not form a regular hypercube.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/homedirs/katibren/develop/pangeo-forge-recipes/docs/pangeo_forge_recipes/tutorials/xarray_zarr/zarr_zarr_debug.py", line 105, in <module>
    with beam.Pipeline() as p:
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
    self.result = self.run()
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
    return self.runner.run_pipeline(self, self._options)
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
    bundle_results = self._execute_bundle(
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
    self._run_bundle(
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1012, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1348, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 624, in do_instruction
    return getattr(self, request_type)(
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 662, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1062, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1420, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1508, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/homedirs/katibren/develop/pangeo-forge-recipes/pangeo_forge_recipes/rechunking.py", line 200, in combine_fragments
    raise ValueError("Cannot combine fragments because they do not form a regular hypercube.")
ValueError: Cannot combine fragments because they do not form a regular hypercube. [while running 'Create|OpenWithXarray|PreprocessMyDataset|StoreToZarr/StoreToZarr/Rechunk/MapTuple(combine_fragments)']
@norlandrhagen
Copy link
Contributor

Might be related to this issue: #517

@cisaacstern
Copy link
Member

cisaacstern commented May 1, 2023

Thanks for raising this issue. I agree it may be related to #517, but difficult to say for sure without a minimal reproducible example.

@kbren, could you post a self-contained example of the recipe you're working on, which when executed raises this error?

Thanks!

@cisaacstern
Copy link
Member

@kbren, is this issue still present with pangeo-forge-recipes==0.10.0?

#517 (which we suspected might be related) is fixed in that release, so I am hoping that may also have solved this problem.

@kbren
Copy link
Author

kbren commented Jul 17, 2023

@cisaacstern thanks for the heads up! I'm just getting back from vacation. I'll check to see if this is resolved, but it might take me a few days to get to it.

@ta-hill
Copy link
Contributor

ta-hill commented Aug 24, 2023

Hello, I've been encountering this same issue. I've been working on this with @kbren and we are still unsure of what specific circumstances that cause this error to be raised. Here's the context of the setup:

We are working with files that are split by months and contain hourly data. For example the month of January looks like this:

<xarray.Dataset>
Dimensions:  (time: 744, height: 3, sn: 9, we: 7)
Coordinates:
  * height   (height) int64 0 1 2
  * sn       (sn) int64 0 1 2 3 4 5 6 7 8
  * time     (time) datetime64[ns] 2010-01-01 ... 2010-01-31T23:00:00
  * we       (we) int64 0 1 2 3 4 5 6
Data variables:
    fvar     (time, height, sn, we) int64 dask.array<chunksize=(744, 3, 9, 7), meta=np.ndarray>

We are concatenating on the 'time' dimension across multiple months. All of the other dimensions are consistent across files. Here's a sample of the pipeline we are running:

with beam.Pipeline(options=phandle.options.pipeline_options) as p:
                      
      (p | beam.Create(side_args["pattern"].items())
          | OpenWithXarray(file_type=side_args["pattern"].file_type)
          | Preprocess()
          | StoreToZarr(
               target_root=side_args["target_root"],
               store_name=side_args["store_name"],
               combine_dims=side_args["pattern"].combine_dim_keys,
               target_chunks=side_args["target_chunk"])
       )

The issue depends on what we specify for 'target_chunks' for StoreToZarr. For example these target chunks work fine:
{"time":1000,"sn":4}

If we try to chunk across the 'we' dimension with target chunks {"time":1000,"sn":4,"we":4} the error is raised: in combine_fragments raise ValueError("Cannot combine fragments because they do not form a regular hypercube.") ValueError: Cannot combine fragments because they do not form a regular hypercube. [while running 'StoreToZarr/Rechunk/MapTuple(combine_fragments)']

Where that error is raised based on the output of "_invert_meshgrid":

starts_cube = [np.array(item[1]).reshape(shape) for item in dims_starts_sizes]
sizes_cube = [np.array(item[2]).reshape(shape) for item in dims_starts_sizes]
try:
# reversing order is necessary here because _sort_by_speed_of_varying puts the
# arrays into the opposite order as wanted by np.meshgrid
starts = _invert_meshgrid(*starts_cube[::-1])[::-1]
sizes = _invert_meshgrid(*sizes_cube[::-1])[::-1]
except AssertionError:
raise ValueError("Cannot combine fragments because they do not form a regular hypercube.")

I've also saved the inputs passed to _invert_meshgrid for target_chunks that do work and attempts that didn't work that I can share if it's helpful

Perhaps relevant, if we specify the target_chunk for time to be 24, it works for chunking with {"time":24,"sn":4,"we":4}
as well as whatever chunk sizes we want for sn, we dimensions.

TLDR: What are the constraints on the target chunks that can be used in the StoreToZarr transform?

Thank You!

@cisaacstern
Copy link
Member

cisaacstern commented Aug 24, 2023

@tom-the-hill thanks for the detailed report.

TLDR: What are the constraints on the target chunks that can be used in the StoreToZarr transform?

I don't think this is fully defined at this point. In particular, I note that the place we should be testing our expectations of combine_fragments errors...

def test_combine_fragments_errors():

...does not actually test for this particular error. It would be a great contribution if you and/or @kbren would consider extending our testing in this area to capture this error. I believe having a test that triggers this error would go a long way to building our shared understanding of how it arises.

If you are interested in working on this, I'm happy to review/discuss approaches on GitHub and/or video. In brief, we'd need to:

  • Add a block in that test something like:

    with pytest.raises(ValueError, match="do not form a regular hypercube"):
          _ = combine_fragments(group, [(index0, ds), (index1, ds)])
  • And feed it some variation(s) of [(index0, ds), (index1, ds)] inputs that trigger this error.

  • Where it gets a bit tricky is that combine_fragments is not aware of target_chunks per se. That is handled by split_fragment, which generates the input fragments for combine_fragments. While admittedly a bit dense, the following block of code from one of the other rechunking tests demonstrates how to get from target_chunks through to calling split_fragment (using synthetic test data):

    target_chunks = {"time": time_chunks}
    nt, dayparam = nt_dayparam
    ds = make_ds(nt=nt)
    dsets, _, _ = split_up_files_by_variable_and_day(ds, dayparam)
    # replicates indexes created by IndexItems transform.
    time_positions = {t: i for i, t in enumerate(ds.time.values)}
    merge_dim = Dimension("variable", CombineOp.MERGE)
    concat_dim = Dimension("time", CombineOp.CONCAT)
    indexes = [
    Index(
    {
    merge_dim: Position((0 if "bar" in ds.data_vars else 1)),
    concat_dim: IndexedPosition(time_positions[ds.time[0].values], dimsize=nt),
    }
    )
    for ds in dsets
    ]
    # split the (mock indexed) datasets into sub-fragments.
    # the splits list are nested tuples which are a bit confusing for humans to think about.
    # create a namedtuple to help remember the structure of these tuples and cast the
    # elements of splits list to this more descriptive type.
    splits = [
    list(split_fragment((index, ds), target_chunks=target_chunks))
    for index, ds in zip(indexes, dsets)
    ]

    If you click through to the continuation of this ☝️ test, you can see how the outputs of split_fragment are passed on to combine_fragments.

@ta-hill
Copy link
Contributor

ta-hill commented Sep 6, 2023

@cisaacstern Sorry for the delayed response. Thank you for pointing me to that test, it really helped to understand the process of indexing, splitting and combining.
With minimal changes to the existing test "test_split_and_combine_fragments_with_merge_dim" I believe I can illustrate my question. The only changes were to pass "target_chunks" instead of "time_chunks" and comment out assertions checking for "time_chunks".

def test_split_and_combine_fragments_with_merge_dim(nt_dayparam,target_chunks,do_print=False):
    """Test if sub-fragments split from datasets with merge dims can be combined with each other."""

    def dprint(s):
        if do_print:
            print(s)
        else:
            pass
    #target_chunks = {"time": time_chunks}
    nt, dayparam = nt_dayparam
    ds = make_ds(nt=nt)
    dprint(f"original ds {ds}")
    dsets, _, _ = split_up_files_by_variable_and_day(ds, dayparam)
    dprint(f"{dsets = }")

    # replicates indexes created by IndexItems transform.
    time_positions = {t: i for i, t in enumerate(ds.time.values)}
    merge_dim = Dimension("variable", CombineOp.MERGE)
    concat_dim = Dimension("time", CombineOp.CONCAT)
    indexes = [
        Index(
            {
                merge_dim: Position((0 if "bar" in ds.data_vars else 1)),
                concat_dim: IndexedPosition(time_positions[ds.time[0].values], dimsize=nt),
            }
        )
        for ds in dsets
    ]
    dprint(f"{indexes = }")

    # split the (mock indexed) datasets into sub-fragments.
    # the splits list are nested tuples which are a bit confusing for humans to think about.
    # create a namedtuple to help remember the structure of these tuples and cast the
    # elements of splits list to this more descriptive type.
    splits = [
        list(split_fragment((index, ds), target_chunks=target_chunks))
        for index, ds in zip(indexes, dsets)
    ]
    dprint(f"{splits = }")
    Subfragment = namedtuple("Subfragment", "groupkey, content")
    subfragments = list(itertools.chain(*[[Subfragment(*s) for s in split] for split in splits]))

    # combine subfragments, starting by grouping subfragments by groupkey.
    # replicates behavior of `... | beam.GroupByKey() | beam.MapTuple(combine_fragments)`
    # in the `Rechunk` transform.
    groupkeys = set([sf.groupkey for sf in subfragments])
    grouped_subfragments: dict[GroupKey, list[Subfragment]] = {g: [] for g in groupkeys}
    for sf in subfragments:
        grouped_subfragments[sf.groupkey].append(sf)

    dprint(f"{grouped_subfragments = }")
    for g in sorted(groupkeys):
        # just confirms that grouping logic within this test is correct
        assert all([sf.groupkey == g for sf in grouped_subfragments[g]])
        # for the merge dimension of each subfragment in the current group, assert that there
        # is only one positional value present. this verifies that `split_fragments` has not
        # grouped distinct merge dimension positional values together under the same groupkey.
        merge_position_vals = [sf.content[0][merge_dim].value for sf in grouped_subfragments[g]]
        assert all([v == merge_position_vals[0] for v in merge_position_vals])
        # now actually try to combine the fragments
        _, ds_combined = combine_fragments(
            g,
            [sf.content for sf in grouped_subfragments[g]],
        )
        dprint(f"{ds_combined = }")
        # ensure vars are *not* combined (we only want to concat, not merge)
        assert len([k for k in ds_combined.data_vars.keys()]) == 1
        # check that time chunking is correct
        #if nt % time_chunks == 0:
        #    assert len(ds_combined.time) == time_chunks
        #else:
        #    # if `nt` is not evenly divisible by `time_chunks`, all chunks will be of
        #    # `len(time_chunks)` except the last one, which will be the lenth of the remainder
        #    assert len(ds_combined.time) in [time_chunks, nt % time_chunks]

Running that test with the parameter nt_dayparam=(8,"4D") to emulate having two files with 4 time samples each I got these results for different target chunks.
For chunking on 1 or 2 dimensions, any chunk sizes seemed to work e.g: target_chunks={"time":1} and target_chunks={"time":3,"lat":5}

However, when chunking by 3 dimensions the test only passes when chunk size for time evenly divides into the number of time samples per file ( for this example it is "4D" , 4 time samples ). For example these chunk sizes work: target_chunks={"time":1 or 2 or 4,"lat":5,"lon":7} but target_chunks={"time":3,"lat":5,"lon":7} fails and raises ValueError: Cannot combine fragments because they do not form a regular hypercube.

So if chunking by 3 or more dimensions, it seems that the first chunk size must evenly divide into the size of that dimension per file?

@cisaacstern
Copy link
Member

Incredible investigation, @tom-the-hill!

So if chunking by 3 or more dimensions, it seems that the first chunk size must evenly divide into the size of that dimension per file?

I think the work you've done above makes you perhaps our resident expert on this subject! So I'd defer to you on that 😄 .

Could you take what you've illustrated above and turn it into a PR? Ideally we'd use pytest parametrization (other examples should be available in the same testing module, or elsewhere in the tests) to preserve the existing test coverage, while also adding the test cases you describe here.

Once we can see the test in a PR, I could envision perhaps adding a more descriptive error along the lines you've described, but would help to see the parametrized test first.

@cisaacstern
Copy link
Member

xref leap-stc/cmip6-leap-feedstock#72 (comment) which appears to be the same issue

@cisaacstern
Copy link
Member

Per a meeting I had this week with @kbren and @tom-the-hill, the next action point here is for Tom to open a PR with a failing test that minimally reproduces this error. Our working hypothesis is that this is a corner case bug which needs to be fixed (not user error), and the failing test will give us something concrete to work against.

@jbusecke
Copy link
Contributor

Let me know if I can help in any way here. This has quite a high priority on my end.

@thodson-usgs
Copy link
Contributor

I think I'm hitting this one also with my ssebop recipe,
which fails if time chunk !=1.

@jbusecke
Copy link
Contributor

This is becoming a major roadblock for both the Pangeo/ESGF CMIP6 Zarr 2.0 data and our data ingestion at LEAP and seems like a central issue for PGF. @cisaacstern is there any way we can carve out some time to tackle this?

@cisaacstern
Copy link
Member

Thanks for weighing in, all!

And apologies for the delay here.

I have blocked off tomorrow to work on this.

@thodson-usgs
Copy link
Contributor

Thanks, @cisaacstern! Although in my case, ignorance was to blame. I did get this working once I set target_chunk equal to a factor of len(time)! 🤕

@cisaacstern
Copy link
Member

Thanks all for your patience. To the best of my understanding, this is now fixed by #708 and released in 0.10.6. Please re-open this issue or open a new one if you find otherwise! Thanks for everyone's patience and collaboration on this!

@rabernat
Copy link
Contributor

Charles this is huge! This bug has been bugging us for almost a year!!!

🚀 🚀 🚀

@cisaacstern
Copy link
Member

Credit to @jbusecke and @ta-hill for major contributions towards the fix!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants