-
Notifications
You must be signed in to change notification settings - Fork 891
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] dask-cudf doesn't support "corr"/correlation function like Pandas and cuDF #3363
Comments
Reproducer in a dev-nightly container as of 11/12/19: import dask_cudf
import cudf
df = cudf.datasets.randomdata(10)
ddf = dask_cudf.from_cudf(df, 2)
ddf.x.corr(ddf.y)
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
<ipython-input-3-4ce11b32272a> in <module>
4 df = cudf.datasets.randomdata(10)
5 ddf = dask_cudf.from_cudf(df, 2)
----> 6 ddf.x.corr(ddf.y)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in corr(self, other, method, min_periods, split_every)
2978 if method != "pearson":
2979 raise NotImplementedError("Only Pearson correlation has been implemented")
-> 2980 df = concat([self, other], axis=1)
2981 return cov_corr(
2982 df, min_periods, corr=True, scalar=True, split_every=split_every
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/multi.py in concat(dfs, axis, join, interleave_partitions)
1046 if axis == 1:
1047 if all(df.known_divisions for df in dasks):
-> 1048 return concat_indexed_dataframes(dfs, axis=axis, join=join)
1049 elif (
1050 len(dasks) == len(dfs)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/multi.py in concat_indexed_dataframes(dfs, axis, join)
884 warn = axis != 0
885 meta = methods.concat(
--> 886 [df._meta for df in dfs], axis=axis, join=join, filter_warning=warn
887 )
888 empties = [strip_unknown_categories(df._meta) for df in dfs]
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/methods.py in concat(dfs, axis, join, uniform, filter_warning)
354 func = concat_dispatch.dispatch(type(dfs[0]))
355 return func(
--> 356 dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
357 )
358
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf(dfs, axis, join, uniform, filter_warning, sort)
31 dfs, axis=0, join="outer", uniform=False, filter_warning=True, sort=None
32 ):
---> 33 assert axis == 0
34 assert join == "outer"
35 return cudf.concat(dfs)
AssertionError: |
In the following code: It looks like we need to change the However, it looks like this function might also use record arrays, which may not be supported in cupy. We can probably have this function return a tuple/list/dict instead. |
@rjzamora any interest in diving in here? |
Sure - as long as you are okay with my suggestion here :) |
Re-opening this due to new information and cross-linking. This is partially dependent on dask/dask#5643 , but also currently fails for me. In a small test, I end up with a different error from dask/dask#5643 . This test can get past that issue because the dataframes have known divisions, which leads to the generalized Example: import cudf
import dask_cudf
d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
d2 = cudf.datasets.randomdata(100, dtypes={"c":float, "d":float})
dd1 = dask_cudf.from_cudf(d1, 5)
dd2 = dask_cudf.from_cudf(d2, 5)
dd1.corr(dd2)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
168 try:
--> 169 yield
170 except Exception as e:
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
4421 with raise_on_meta_error(funcname(op)):
-> 4422 meta = partial_by_order(*parts, function=op, other=other)
4423
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/utils.py in partial_by_order(*args, **kwargs)
1075 args2.insert(i, arg)
-> 1076 return function(*args2, **kwargs)
1077
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py in __ne__(self, other)
893 def __ne__(self, other):
--> 894 return self._apply_op("__ne__", other)
895
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py in _apply_op(self, fn, other, fill_value)
754 for k, col in enumerate(self._cols):
--> 755 result[col] = getattr(self._cols[col], fn)(other[k])
756 elif isinstance(other, DataFrame):
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in __ne__(self, other)
1000 def __ne__(self, other):
-> 1001 return self._unordered_compare(other, "ne")
1002
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in _unordered_compare(self, other, cmpops)
948 result_name = utils.get_result_name(self, other)
--> 949 other = self._normalize_binop_value(other)
950 outcol = self._column.unordered_compare(cmpops, other)
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in _normalize_binop_value(self, other)
943 else:
--> 944 return self._column.normalize_binop_value(other)
945
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/column/numerical.py in normalize_binop_value(self, other)
139 else:
--> 140 raise TypeError("cannot broadcast {}".format(type(other)))
141
TypeError: cannot broadcast <class 'str'>
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-36-b0dbaa32cc44> in <module>
6 dd1 = dask_cudf.from_cudf(d1, 5)
7 dd2 = dask_cudf.from_cudf(d2, 5)
----> 8 dd1.corr(dd2)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in corr(self, method, min_periods, split_every)
4013 @derived_from(pd.DataFrame)
4014 def corr(self, method="pearson", min_periods=None, split_every=False):
-> 4015 if method != "pearson":
4016 raise NotImplementedError("Only Pearson correlation has been implemented")
4017 return cov_corr(self, min_periods, True, split_every=split_every)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in <lambda>(self, other)
1333 return lambda self, other: elemwise(op, other, self)
1334 else:
-> 1335 return lambda self, other: elemwise(op, self, other)
1336
1337 def rolling(self, window, min_periods=None, center=False, win_type=None, axis=0):
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
4420 ]
4421 with raise_on_meta_error(funcname(op)):
-> 4422 meta = partial_by_order(*parts, function=op, other=other)
4423
4424 result = new_dd_object(graph, _name, meta, divisions)
/opt/conda/envs/rapids/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
97 value = type()
98 try:
---> 99 self.gen.throw(type, value, traceback)
100 except StopIteration as exc:
101 # Suppress StopIteration *unless* it's the same exception that
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
188 )
189 msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 190 raise ValueError(msg)
191
192
ValueError: Metadata inference failed in `ne`.
Original error is below:
------------------------
TypeError("cannot broadcast <class 'str'>",)
Traceback:
---------
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py", line 169, in raise_on_meta_error
yield
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py", line 4422, in elemwise
meta = partial_by_order(*parts, function=op, other=other)
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/utils.py", line 1076, in partial_by_order
return function(*args2, **kwargs)
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py", line 894, in __ne__
return self._apply_op("__ne__", other)
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py", line 755, in _apply_op
result[col] = getattr(self._cols[col], fn)(other[k])
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 1001, in __ne__
return self._unordered_compare(other, "ne")
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 949, in _unordered_compare
other = self._normalize_binop_value(other)
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 944, in _normalize_binop_value
return self._column.normalize_binop_value(other)
File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/column/numerical.py", line 140, in normalize_binop_value
raise TypeError("cannot broadcast {}".format(type(other))) |
@rjzamora , thanks for pointing out that the above example shouldn't work. It's incorrectly using dataframes, which likely explains binaryop issue. This is representative of the workflow (correlation of two columns: import cudf
import dask_cudf
d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
dd1 = dask_cudf.from_cudf(d1, 5)
dd1.a.corr(dd1.b).compute()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-2-7816cf3671a0> in <module>
4 d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
5 dd1 = dask_cudf.from_cudf(d1, 5)
----> 6 dd1.a.corr(dd1.b).compute()
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
163 dask.base.compute
164 """
--> 165 (result,) = compute(self, traverse=False, **kwargs)
166 return result
167
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
434 keys = [x.__dask_keys__() for x in collections]
435 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436 results = schedule(dsk, keys, **kwargs)
437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
438
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
79 get_id=_thread_get_id,
80 pack_exception=pack_exception,
---> 81 **kwargs
82 )
83
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in cov_corr_chunk(df, corr)
5147 )
5148 mu_discrepancy[mask] = np.nan
-> 5149 m[idx] = np.nansum(mu_discrepancy, axis=0)
5150 m = m.T
5151 dtype.append(("m", m.dtype))
<__array_function__ internals> in nansum(*args, **kwargs)
TypeError: no implementation found for 'numpy.nansum' on types that implement __array_function__: [<class 'cupy.core.core.ndarray'>] CuPy just recently implemented nansum in v7.0, which is ahead of ours. I'll test this the master branch of CuPy, and report back. Thanks for looking into it. |
Right, im using |
Love it @rjzamora . That solves the problem. Let's leave this issue open for tracking purposes and keep an eye on when CuPy v7.0 is considered the "stable" release and then evaluate updating our CuPy dependency accordingly. |
Brief summary of the current status below: In a vanilla cuDF / Dask environment, we cannot use Dask-cuDF's correlation. To use correlation on indexed dataframes, we need the following:
The NumPy upgrade can be done via conda, and the CuPy upgrade is best done by cloning the repository and then running To use correlation on unindexed dataframes, we also need:
Since general usage can include both, the functional requirements are:
@kkraus14 @shwina , do you have any strong feelings on updating our dependency to NumPy >= 1.17? I suspect Dask will take care of itself and CuPy v7.0 is not yet considered the stable release. |
We haven't pinned a numpy version in general for cuDF so I'm somewhat hesitant to require such a new version. What chunk of code requires numpy > 1.17? I'm fine with upgrading to Cupy >= 7.0 once there's a conda-forge release. |
Ah, I guess I always just get 1.16 from another dependency then. Good to know. This is the chunk of code that necessitates it, currently: Without numpy > 1.17, we don't currently leverage duck typing to create a CuPy internal to Dask's |
From my perspective this should be a dispatched function to allow for CuPy/Numpy agnostic behavior. @mrocklin @pentschev do you agree? |
We've been relying on the I suspect that the 1.17 requirement that @beckernick is referring to is just that that is when |
To complement @mrocklin 's comment, for that functionality we need CuPy >= 6.4.0 only, and we can still use NumPy 1.16 if people are fine with setting the |
I think this is something different than |
I'm happy to enforce numpy >= 1.17 for cudf then, but does the |
cc @rjzamora , too. My understanding is consistent with Keith's. The NEP-18 flag doesn't solve the problem because if |
@pentschev I believe we also need CuPy >7.0 in order to dispatch to |
Confirmed this will work as expected, so no need for a Dask dispatch, sorry for false alarm 😅
|
zeros_like allows for CuPy array allocation: import numpy as np
import cupy as cp
garr = cp.array(range(5))
print(type(np.zeros_like(garr)))
<class 'cupy.core.core.ndarray'> But the issue is that without the shape parameter we switch codepaths, currently. |
The |
As long as we enforce a sufficiently new numpy and CuPy version all should be well which I'm perfectly happy to do for cuDF. |
Now that CuPy v7.0 is officially released, are we ready to undo the CuPy version restriction from #3539 and also enforce NumPy > 1.17? I believe that would officially close this issue. cc @randerzander CuPy 7.0 is available from conda-forge |
We now support CuPy 7.0.0 ( #3619 ) 🙂 |
This should be resolved as of now with CuPy >= 7. |
@kkraus14 are there unit tests covering that? If not maybe it is worth to keep this issue open just for adding unit tests, so eventual regression in future can be spotted. Any idea if it works in groupby already? Or that should be a separate issue? |
It's important to note that this failure was the product of an issue with cuDF and upstream libraries. That said, IIRC @rjzamora included a fix to cuDF and to Dask both of which include tests. @pentschev also implemented |
@jakirkham thanks for clarifying. It was not obvious from reading this thread. It is always useful to refer the issue from commit or PR so it is clear. |
When attempting to perform a correlation like
sales_corr = sales['pr_review_rating', 'count'].corr(sales['pr_review_rating', 'mean'])
dask-cudf fails with the following error.TypeError: cannot concatenate object of type "<class 'cudf.core.series.Series'>"; only pd.Series, pd.DataFrame, and pd.Panel (deprecated) objs are valid
It seems that we might limit this currently. However dask-cudf should behave exactly like cuDF and Pandas.
cudf/python/dask_cudf/dask_cudf/backends.py
Lines 30 to 33 in 4613ba8
The text was updated successfully, but these errors were encountered: