Skip to content

Commit

Permalink
Standardize join internals around DataFrame (#11184)
Browse files Browse the repository at this point in the history
This PR un-deprecates the `BaseIndex.join` method, which was erroneously deprecated, but reimplements it by converting the index objects to DataFrames under the hood. This reimplementation allows us to simplify much of the internals around merging, providing a single main code path implemented only for DataFrames. The behavior of `cudf.merge(Series, DataFrame)` is preserved via an explicit upcast as well. These changes allow further simplification of the (currently extremely complex) internals of merging, which hopefully will eventually allow us to extract a fast and simple merge function for internal use from the complexities of the public DataFrame.merge API. This change also removes any vestigial accesses to the Frame._index to enable us to remove that.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Michael Wang (https://github.com/isVoid)

URL: #11184
  • Loading branch information
vyasr authored Jul 5, 2022
1 parent 2dd6ade commit 36ec9a7
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 181 deletions.
64 changes: 33 additions & 31 deletions python/cudf/cudf/core/_base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import pickle
import warnings
from functools import cached_property
from typing import Any, Set, TypeVar

Expand Down Expand Up @@ -717,7 +716,7 @@ def difference(self, other, sort=None):
other.names = self.names
difference = cudf.core.index._index_from_data(
cudf.DataFrame._from_data(self._data)
._merge(
.merge(
cudf.DataFrame._from_data(other._data),
how="leftanti",
on=self.name,
Expand Down Expand Up @@ -1010,7 +1009,7 @@ def _intersection(self, other, sort=None):
other_unique.names = self.names
intersection_result = cudf.core.index._index_from_data(
cudf.DataFrame._from_data(self.unique()._data)
._merge(
.merge(
cudf.DataFrame._from_data(other_unique._data),
how="inner",
on=self.name,
Expand Down Expand Up @@ -1168,21 +1167,18 @@ def join(
(1, 2)],
names=['a', 'b'])
"""
warnings.warn(
"Index.join is deprecated and will be removed", FutureWarning
)

if isinstance(self, cudf.MultiIndex) and isinstance(
other, cudf.MultiIndex
):
raise TypeError(
"Join on level between two MultiIndex objects is ambiguous"
)
self_is_multi = isinstance(self, cudf.MultiIndex)
other_is_multi = isinstance(other, cudf.MultiIndex)
if level is not None:
if self_is_multi and other_is_multi:
raise TypeError(
"Join on level between two MultiIndex objects is ambiguous"
)

if level is not None and not is_scalar(level):
raise ValueError("level should be an int or a label only")
if not is_scalar(level):
raise ValueError("level should be an int or a label only")

if isinstance(other, cudf.MultiIndex):
if other_is_multi:
if how == "left":
how = "right"
elif how == "right":
Expand All @@ -1193,34 +1189,40 @@ def join(
lhs = self.copy(deep=False)
rhs = other.copy(deep=False)

on = level
# In case of MultiIndex, it will be None as
# we don't need to update name
left_names = lhs.names
right_names = rhs.names
# There should be no `None` values in Joined indices,
# so essentially it would be `left/right` or 'inner'
# in case of MultiIndex
if isinstance(lhs, cudf.MultiIndex):
if level is not None and isinstance(level, int):
on = lhs._data.select_by_index(level).names[0]
right_names = (on,) if on is not None else right_names
on = right_names[0]
on = (
lhs._data.select_by_index(level).names[0]
if isinstance(level, int)
else level
)

if on is not None:
rhs.names = (on,)
on = rhs.names[0]
if how == "outer":
how = "left"
elif how == "right":
how = "inner"
else:
# Both are normal indices
right_names = left_names
on = right_names[0]
on = rhs.names[0]
rhs.names = lhs.names

lhs.names = left_names
rhs.names = right_names
lhs = lhs.to_frame()
rhs = rhs.to_frame()

output = lhs._merge(rhs, how=how, on=on, sort=sort)
output = lhs.merge(rhs, how=how, on=on, sort=sort)

return output
# If both inputs were MultiIndexes, the output is a MultiIndex.
# Otherwise, the output is only a MultiIndex if there are multiple
# columns
if self_is_multi and other_is_multi:
return cudf.MultiIndex._from_data(output._data)
else:
return cudf.core.index._index_from_data(output._data)

def rename(self, name, inplace=False):
"""
Expand Down
50 changes: 41 additions & 9 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
_indices_from_labels,
doc_reset_index_template,
)
from cudf.core.join import Merge, MergeSemi
from cudf.core.missing import NA
from cudf.core.multiindex import MultiIndex
from cudf.core.resample import DataFrameResampler
Expand Down Expand Up @@ -3659,9 +3660,37 @@ def merge(
- For outer joins, the result will be the union of categories
from both sides.
"""
# Compute merge
gdf_result = super()._merge(
right,
if indicator:
raise NotImplementedError(
"Only indicator=False is currently supported"
)

if lsuffix or rsuffix:
raise ValueError(
"The lsuffix and rsuffix keywords have been replaced with the "
"``suffixes=`` keyword. "
"Please provide the following instead: \n\n"
" suffixes=('%s', '%s')"
% (lsuffix or "_x", rsuffix or "_y")
)
else:
lsuffix, rsuffix = suffixes

lhs, rhs = self, right
merge_cls = Merge
if how == "right":
# Merge doesn't support right, so just swap
how = "left"
lhs, rhs = right, self
left_on, right_on = right_on, left_on
left_index, right_index = right_index, left_index
suffixes = (suffixes[1], suffixes[0])
elif how in {"leftsemi", "leftanti"}:
merge_cls = MergeSemi

return merge_cls(
lhs,
rhs,
on=on,
left_on=left_on,
right_on=right_on,
Expand All @@ -3671,10 +3700,7 @@ def merge(
sort=sort,
indicator=indicator,
suffixes=suffixes,
lsuffix=lsuffix,
rsuffix=rsuffix,
)
return gdf_result
).perform_merge()

@_cudf_nvtx_annotate
def join(
Expand Down Expand Up @@ -6988,15 +7014,21 @@ def from_pandas(obj, nan_as_null=None):

@_cudf_nvtx_annotate
def merge(left, right, *args, **kwargs):
return super(type(left), left)._merge(right, *args, **kwargs)
if isinstance(left, Series):
left = left.to_frame()
return left.merge(right, *args, **kwargs)


# a bit of fanciness to inject docstring with left parameter
merge_doc = DataFrame.merge.__doc__
if merge_doc is not None:
idx = merge_doc.find("right")
merge.__doc__ = "".join(
[merge_doc[:idx], "\n\tleft : DataFrame\n\t", merge_doc[idx:]]
[
merge_doc[:idx],
"\n\tleft : Series or DataFrame\n\t",
merge_doc[idx:],
]
)


Expand Down
63 changes: 0 additions & 63 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
serialize_columns,
)
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.join import Merge, MergeSemi
from cudf.core.mixins import BinaryOperand, Scannable
from cudf.core.window import Rolling
from cudf.utils import ioutils
Expand Down Expand Up @@ -1558,68 +1557,6 @@ def abs(self):
"""
return self._unaryop("abs")

@_cudf_nvtx_annotate
def _merge(
self,
right,
on=None,
left_on=None,
right_on=None,
left_index=False,
right_index=False,
how="inner",
sort=False,
indicator=False,
suffixes=("_x", "_y"),
lsuffix=None,
rsuffix=None,
):
if indicator:
raise NotImplementedError(
"Only indicator=False is currently supported"
)

if lsuffix or rsuffix:
raise ValueError(
"The lsuffix and rsuffix keywords have been replaced with the "
"``suffixes=`` keyword. "
"Please provide the following instead: \n\n"
" suffixes=('%s', '%s')"
% (lsuffix or "_x", rsuffix or "_y")
)
else:
lsuffix, rsuffix = suffixes

lhs, rhs = self, right
merge_cls = Merge
if how == "right":
# Merge doesn't support right, so just swap
how = "left"
lhs, rhs = right, self
left_on, right_on = right_on, left_on
left_index, right_index = right_index, left_index
suffixes = (suffixes[1], suffixes[0])
elif how in {"leftsemi", "leftanti"}:
merge_cls = MergeSemi

# TODO: the two isinstance checks below indicates that `_merge` should
# not be defined in `Frame`, but in `IndexedFrame`.
return merge_cls(
lhs,
rhs,
on=on,
left_on=left_on,
right_on=right_on,
left_index=left_index,
right_index=right_index,
lhs_is_index=isinstance(lhs, cudf.core._base_index.BaseIndex),
rhs_is_index=isinstance(rhs, cudf.core._base_index.BaseIndex),
how=how,
sort=sort,
indicator=indicator,
suffixes=suffixes,
).perform_merge()

@_cudf_nvtx_annotate
def _is_sorted(self, ascending=None, null_position=None):
"""
Expand Down
8 changes: 8 additions & 0 deletions python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,14 @@ def _split(self, splits):
def _binaryop(self, other, op: str):
return self._as_int64()._binaryop(other, op=op)

def join(
self, other, how="left", level=None, return_indexers=False, sort=False
):
# TODO: pandas supports directly merging RangeIndex objects and can
# intelligently create RangeIndex outputs depending on the type of
# join. We need to implement that for the supported special cases.
return self._as_int64().join(other, how, level, return_indexers, sort)


# Patch in all binops and unary ops, which bypass __getattr__ on the instance
# and prevent the above overload from working.
Expand Down
20 changes: 7 additions & 13 deletions python/cudf/cudf/core/join/_join_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

if TYPE_CHECKING:
from cudf.core.column import ColumnBase
from cudf.core.frame import Frame


class _Indexer:
Expand All @@ -35,24 +34,19 @@ def __init__(self, name: Any):


class _ColumnIndexer(_Indexer):
def get(self, obj: Frame) -> ColumnBase:
def get(self, obj: cudf.DataFrame) -> ColumnBase:
return obj._data[self.name]

def set(self, obj: Frame, value: ColumnBase, validate=False):
def set(self, obj: cudf.DataFrame, value: ColumnBase, validate=False):
obj._data.set_by_label(self.name, value, validate=validate)


class _IndexIndexer(_Indexer):
def get(self, obj: Frame) -> ColumnBase:
if obj._index is not None:
return obj._index._data[self.name]
raise KeyError

def set(self, obj: Frame, value: ColumnBase, validate=False):
if obj._index is not None:
obj._index._data.set_by_label(self.name, value, validate=validate)
else:
raise KeyError
def get(self, obj: cudf.DataFrame) -> ColumnBase:
return obj._index._data[self.name]

def set(self, obj: cudf.DataFrame, value: ColumnBase, validate=False):
obj._index._data.set_by_label(self.name, value, validate=validate)


def _match_join_keys(
Expand Down
Loading

0 comments on commit 36ec9a7

Please sign in to comment.