Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Groupby rolling standard deviation #8696

Closed
beckernick opened this issue Jul 8, 2021 · 1 comment · Fixed by #9097
Closed

[FEA] Groupby rolling standard deviation #8696

beckernick opened this issue Jul 8, 2021 · 1 comment · Fixed by #9097
Assignees
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API.

Comments

@beckernick
Copy link
Member

beckernick commented Jul 8, 2021

Today, I can calculate rolling mean, sum, and a variety of other aggregations. I can also calculate rolling mean and sum on a per group basis (groupby.rolling / grouped window). I'd like to also calculate the groupby rolling standard deviation.

To use the same example as in the related #8695 (rolling standard deviation), I might have a large set of sensor data. To make sure my sensors are behaving within normal range, I'd like to measure the rolling standard deviation and post-process the results to alert me if any window has a standard deviation more than some threshold beyond an acceptable range. However, in this case, I have many sensors, so I need to measure the rolling standard deviation for each individual sensor. (In the example below, I'm assuming the data is pre-sorted in the relevant order, as the pandas groupby preserves exist sort order by default).

Spark differentiates between the sample and population standard deviation (stddev_samp vs stddev_pop), while pandas instead parameterized the std function with an argument for degrees of freedom.

Filing a separate issue per a chat with @jrhemstad that the implementations may be different enough to warrant discussing them separately.

Pandas:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

np.random.seed(12)

spark = SparkSession.builder \
    .master("local") \
    .getOrCreate()

nrows = 100
keycol = [0] * (nrows//2) + [1] * (nrows//2)

df = pd.DataFrame({
    "key": keycol,
    "a": np.random.randint(0, 100, nrows),
    "b": np.random.randint(0, 100, nrows),
    "c": np.random.randint(0, 100, nrows),
})
print(df.groupby("key").a.rolling(4).std().head(10))
key   
0    0          NaN
     1          NaN
     2          NaN
     3    33.511192
     4    11.789826
     5    31.712248
     6    40.008332
     7    32.501282
     8    23.879908
     9    22.051077
Name: a, dtype: float64

Spark:

sdf = spark.createDataFrame(df)
sdf.createOrReplaceTempView("df")
​
sdf.withColumn(
    "std",
    F.stddev_samp("a").over(Window.partitionBy("key").rowsBetween(-3, 0))
).show(10)
+---+---+---+---+------------------+
|key|  a|  b|  c|               std|
+---+---+---+---+------------------+
|  0| 75| 68| 89|              null|
|  0| 27| 25| 93| 33.94112549695428|
|  0|  6| 44| 30|35.369478367654786|
|  0|  2| 22| 19| 33.51119216023208|
|  0|  3| 69| 66|11.789826122551597|
|  0| 67|  9| 98|31.712247896777463|
|  0| 76| 59| 47| 40.00833246545858|
|  0| 48| 27| 59| 32.50128202599604|
|  0| 22| 28|  8|23.879907872519105|
|  0| 49|  2| 41|22.051077071199945|
+---+---+---+---+------------------+
only showing top 10 rows
@beckernick beckernick added feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API. labels Jul 8, 2021
@revans2
Copy link
Contributor

revans2 commented Jul 9, 2021

On the Spark side of things the standard deviation and variance calculations are based off of calculating M2, COUNT, and AVERAGE, then combining them to get the desired result. We are just starting to add in support for this on group by aggregations #8605, but have not finished plumbing it into the plugin yet. We need the M2 based approach because Spark does this in a distributed way. Not all of the data for the calculation may be in memory at once, or even on the same GPU at once.

For most window operations the cudf code is structured so each cuda thread handles calculating the result for an entire window. Spark also tends to keep all of the data for a given window cached in memory. That means there are no distributed special cases that we have to take into account. So if we have a window aggregation that lets us produce the same results as the Spark functions we are happy to use it.

That said there are some special cases where we could save a lot of GPU memory and get orders of magnitude better performance for running window aggregations. These are windows with unbounded rows preceding to current row. For these cases we are going to start using a scan or a segmented scan instead of window directluy. I am going to need to sit down and do some math, but I think we can do it with just an M2 aggregation as a scan/segmented scan. Although we would not ask for this until we had a customer with a use case for it.

@beckernick beckernick added this to the Test milestone Jul 14, 2021
@isVoid isVoid self-assigned this Aug 23, 2021
rapids-bot bot pushed a commit that referenced this issue Sep 15, 2021
…rolling.std` (#9097)

Closes #8695
Closes #8696 

This PR creates bindings for rolling aggregations for variance and standard deviations. Unlike pandas, the underlying implementation from libcudf computes each window independently from other windows.

Authors:
  - Michael Wang (https://github.com/isVoid)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Sheilah Kirui (https://github.com/skirui-source)

URL: #9097
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants