Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include merge dim positions in group keys emitted by split_fragments #521

Merged

Conversation

norlandrhagen
Copy link
Contributor

Opening up a PR to start working on a bugfix for the error described in Issue #517 .

The error from #517 happens in L185 of rechunking.py in the combine_fragments function.

As @rabernat pointed out in #517, the error might be upstream in the split_fragments function.

Basically, it splits up the original dataset pieces into fragments,
and then puts them back together. Each merge dim should end up in a separate group.

So if I'm understanding this correctly, split_fragments does not take the MergeDim into account and outputs PCollections/Fragments that contain multiple variables. When fed into combine_fragments, we get the error: Expected a hypercube of shape [1] but got 2 fragments

There is the begining of a test named test_split_fragment_merge_dim in test_rechunking.py

cc @cisaacstern

@cisaacstern
Copy link
Member

cisaacstern commented May 25, 2023

@norlandrhagen thanks for getting this started.

Would it be okay with you if I take a shot at finishing this PR?

@norlandrhagen
Copy link
Contributor Author

@cisaacstern that would be great!

@cisaacstern
Copy link
Member

As of the last commit, the test added here now replicates the bug reported in #517:

pytest -vx tests/test_rechunking.py -k test_split_and_combine_fragments_with_merge_dim
        ...
        total_size = functools.reduce(operator.mul, shape)
        if len(fragments) != total_size:
            # this error path is currently untested
>           raise ValueError(
                "Cannot combine fragments. "
                f"Expected a hypercube of shape {shape} but got {len(fragments)} fragments."
            )
E           ValueError: Cannot combine fragments. Expected a hypercube of shape [1] but got 2 fragments.

Thanks again to @norlandrhagen for kicking this off, and @rabernat for pairing on this yesterday. 🙏

Now that the test is in place, I'll work on fixing the bug!

@cisaacstern
Copy link
Member

Summary to date:

  • I believe the simplest way to make combine_fragments capable of merge (as well as concat), is to keep the existing concat logic unchanged, and to simply first merge any fragments which require merging, and then pass those merged_fragments downstream to the existing concat logic. The work here in rechunking.py reflects this approach.
  • I've parametrized the xarray -> zarr end-to-end test with the multivariable file patterns. Before the current changes to rechunking.py were added, this raised the same error as reported in [beam-refactor] StoreToZarr - Cannot Combine Fragments Possible Error #517. Following addition of these changes, all of the multivariable end-to-end tests pass. 🎉
  • All except one of the parameterizations of the unit test also pass. The failing case in the unit tests is for (nt=10, resample="2D", time_chunks=2). It fails on the check of equality between sizes and expected_sizes... I suspect this may have to do with the way I am mocking the IndexedPositions emitted by the IndexItems transform in the unit test, because AFAICT this combination of parameters is also covered (and passes) in the end-to-end testing. Going to keep digging a bit on this.

@cisaacstern
Copy link
Member

I suspect this may have to do with the way I am mocking the IndexedPositions emitted by the IndexItems transform in the unit test

Correction... the IndexedPositions look correct, but the subfragments generated by split_fragment in the unit test are a bit suspicious, being of len 14, with group keys:

[(('time', 0),), (('time', 0),), (('time', 0),), (('time', 0),), (('time', 1),), (('time', 1),), (('time', 1),), (('time', 1),), (('time', 1),), (('time', 1),), (('time', 2),), (('time', 2),), (('time', 2),), (('time', 2),)]

Note that (('time', 1),) appears 6 times... which is confusing to me. Getting closer to the issue here I think... 🤔

@norlandrhagen
Copy link
Contributor Author

Super exciting progress @cisaacstern !

@cisaacstern
Copy link
Member

Thanks to @rabernat for a thoughtful offline critique of this PR. In brief, Ryan pointed out that we parallelize writes horizontally across merge dimensions, therefore combining merge dimensions in the combine_fragments step, as reflected in recent work here, would result in loss of parallelism and/or OOM errors (the latter in cases of recipes with large merge dimensions). The better solution, based on Ryan's suggestion, is to ensure that split_fragments splits fragments which share a concat dim key, but have distinct merge indexes, into separate groups. I am reworking this PR to achieve that goal now.

@cisaacstern
Copy link
Member

cisaacstern commented Jun 27, 2023

I believe this is now quite close to the intended behavior. The relevant integration tests appear to all be passing, and most of the unit tests are as well. There are two unit test cases which are failing, I just need to figure out if that's a problem with some assumption in the unit test itself, or if I'm actually catching a meaningful corner case.

@rabernat
Copy link
Contributor

Great progress Charles! Let me know if I can be helpful here.

@cisaacstern
Copy link
Member

Thanks so much to @jbusecke for pairing on this, which helped me understand the specific failure mode of the failing test cases (which do appear to be a meaningful bug, and not a testing mistake...):

  • All failing cases involve the parametrization nt_dayparam=(10, "2D"), wherein an initial aggregate dataset of 10 daily time steps and 2 variables is split into a collection of 2-day, single variable datasets, with a total collection size of

    (10 / 2 time steps) * (2 variables) = 10 datasets
    
  • Each of the 10 datasets is of length 2 in the time dimension. Therefore, in the case of the target_chunks={"time": 1} parametrization, we would expect split_fragments to divide these 10 datasets into a total of 20 subfragments, each of length 1 in the time dimension. Indeed these subfragment datasets are emitted as expected, and they are grouped correctly according variable, but their time dimension groupkeys are incorrect. As shown in the table below, the time keys ('time', 1), ('time', 2), ('time', 3), and ('time', 4) each have two time steps grouped under them. IIUC, this is incorrect, as there should only be one time step per time key, for the chunking scheme target_chunks={"time": 1}. This would seem to be a bug in the splitting logic, which I will now try to track down.

    Code for generating table
    # this code was run from within a debugger console, opened from a breakpoint set
    # within `test_rechunking::test_split_and_combine_fragments_with_merge_dim`, which
    # was run with the `1-nt_dayparam1` parametrization (the same as described above)
    print("nfragment | len(ds.time) | date | vars | groupkey")
    print("--------- | ------------ | ---- | ---- | --------")
    for i, sf in enumerate(subfragments):
        ds = sf.content[1]
        print(
            i, "|",
            len(ds.time), "|", str(ds.time[0].values)[:10], "|",
            [k for k in ds.data_vars.keys()], "|",
            f"`{sf.groupkey}`",
        )
    nfragment len(ds.time) date vars groupkey
    0 1 2010-01-01 ['bar'] (('time', 0), ('variable', 0))
    1 1 2010-01-02 ['bar'] (('time', 1), ('variable', 0))
    2 1 2010-01-03 ['bar'] (('time', 1), ('variable', 0))
    3 1 2010-01-04 ['bar'] (('time', 2), ('variable', 0))
    4 1 2010-01-05 ['bar'] (('time', 2), ('variable', 0))
    5 1 2010-01-06 ['bar'] (('time', 3), ('variable', 0))
    6 1 2010-01-07 ['bar'] (('time', 3), ('variable', 0))
    7 1 2010-01-08 ['bar'] (('time', 4), ('variable', 0))
    8 1 2010-01-09 ['bar'] (('time', 4), ('variable', 0))
    9 1 2010-01-10 ['bar'] (('time', 5), ('variable', 0))
    10 1 2010-01-01 ['foo'] (('time', 0), ('variable', 1))
    11 1 2010-01-02 ['foo'] (('time', 1), ('variable', 1))
    12 1 2010-01-03 ['foo'] (('time', 1), ('variable', 1))
    13 1 2010-01-04 ['foo'] (('time', 2), ('variable', 1))
    14 1 2010-01-05 ['foo'] (('time', 2), ('variable', 1))
    15 1 2010-01-06 ['foo'] (('time', 3), ('variable', 1))
    16 1 2010-01-07 ['foo'] (('time', 3), ('variable', 1))
    17 1 2010-01-08 ['foo'] (('time', 4), ('variable', 1))
    18 1 2010-01-09 ['foo'] (('time', 4), ('variable', 1))
    19 1 2010-01-10 ['foo'] (('time', 5), ('variable', 1))

@cisaacstern
Copy link
Member

@rabernat, the test added in f0d3159 reproduces the behavior documented in #521 (comment) above. To your eye, is this indeed a bug, or have I misunderstood what group keys to expect in this situation?

@rabernat
Copy link
Contributor

Trying to parse this example scenario, which is indeed pretty complicated. But the crux of it seems to be

  • IIUC, this is incorrect, as there should only be one time step per time key,

Based on my reading, this conclusion seems correct.

@rabernat
Copy link
Contributor

The bug in question seems purely related to the rechunking logic along the concat dim. So it's suspicious that it only arises once you bring in a merge dim. That seems like an important clue.

Comment on lines 91 to 92
offset = 1
index = Index([(dimension, IndexedPosition(offset, dimsize=nt_total))])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't quite seem right to me. If each dataset has two days in it (nt=2), the only possible offsets are even (0, 2, 4, 6, and 8). So it feels like we are creating inconsistent test data here.

Comment on lines 29 to 31
# replicates indexes created by IndexItems transform.
unique_times = np.unique([ds.time[0].values for ds in dsets])
time_positions = {t: i for i, t in enumerate(unique_times)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems wrong. The index in IndexedPosition needs to be an actual offset from the beginning of the array.

@cisaacstern
Copy link
Member

@rabernat thanks so much for the review, your two inline comments were a serious a-ha moment for me. I thought I must've been misunderstanding something, and it turns out I was.

To recap (for my future self and any others reading this), there are actually 3 indexing spaces in play here:

  • FilePattern index space: the arrangement of input files as defined by file pattern CombineOps.
  • Array index space: dataset-level indexing.
  • Chunk index space: zarr chunk indexing.

I had misunderstood the IndexedPosition.value to be referencing an FilePattern index space, when it is in fact referencing array index space. Fixing this misunderstanding, per your comment, in the "possible bug test" b813097 allows the test to pass (and reveals that this is not, in fact, a bug).

I'll now remove the possible bug test, and fix this issue in the unit test. 🚀

@rabernat
Copy link
Contributor

I had misunderstood the IndexedPosition.value to be referencing an FilePattern index space, when it is in fact referencing array index space.

I am not super happy about how these types look, but I do believe this is documented.

@dataclass(frozen=True, order=True)
class Position:
"""
:param indexed: If True, this position represents an offset within a dataset
If False, it is a position within a sequence.
"""

@cisaacstern cisaacstern changed the title Cannot Combine Fragments Error - Issue 517 - Testing Include merge dim positions in group keys emitted by split_fragments Jun 29, 2023
@cisaacstern
Copy link
Member

@rabernat AFAICT this is good to go. Would love your final review, in case there's anything I've overlooked.

Two other gut checks in process:

IMHO the testing here is pretty robust, so I don't think we should wait on either of these items to merge, but whenever they both complete, they'll be great further verification of these changes.

Copy link
Contributor

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

@rabernat rabernat merged commit e8e6609 into pangeo-forge:beam-refactor Jun 29, 2023
@cisaacstern
Copy link
Member

🥳

@norlandrhagen
Copy link
Contributor Author

Incredible @cisaacstern! Super exciting to have this PR merged in.

@jbusecke
Copy link
Contributor

Awesome @cisaacstern!

@alxmrs
Copy link
Contributor

alxmrs commented Jun 30, 2023

So great to see the release within reach!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants