Skip to content

Commit

Permalink
feature: add interpolation functionality (#109)
Browse files Browse the repository at this point in the history
* feat: add interpolation

* feat(interpolation): add support for multuple partitions, and target columns

* test: add interpolation zero fill test

* test: add additional interpolation tests

* chore: convert linear interpolation to use spark native functions

* chore: allow for interpolation to be called directly from the TSDF object

* chore: update series fill logic

* chore: change default behaviour for target_cols

* chore: rename to be more consistent with pandas and the tsdf class

* chore(interpolation): make show if interpolated column optional

* chore(interpolation): remove caching

* Troubleshooting (#2)

* Refactor interpolation code to remove joins, and double` resample`
* Added additional test coverage to interpolation code
* Updated `test` folder structure

Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com>

* chore: add additional comments

* chore: update branches in test.yml

* fix: update interpolate_column params

* chore: add interpolation details in readme.md

* chore: update main readme.md

* chore: update main readme.md

* Merge branch 'master' of github.com:guanjieshen/tempo

* chore: make readme more consistent

* chore: add build and downloads badge to readme

* changes

* fix: fourier test java error

* fix: try to configure netty changes so tests for fourier will work

* change

* housekeeping: organize imports on tsdf.py

* chore(interpolation): change back to bfill, change forward to ffill

* interpolation: add the ability to call interpolate after resample

* housekeeping: add missing type hint

* chore(interpolate): update readme

* chore: update interpolation documentation to be more clear

* adding one unit test

Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com>
Co-authored-by: Ricardo Portilla <ricardo.portilla@databricks.com>
  • Loading branch information
3 people authored Jan 20, 2022
1 parent 4eb3e8a commit c43888f
Show file tree
Hide file tree
Showing 12 changed files with 1,211 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ jobs:
- name: Set Spark env
run: |
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_SUBMIT_OPTS="--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true"
- name: Generate coverage report
working-directory: ./python
run: |
pip install -r requirements.txt
pip install coverage
coverage run -m unittest
coverage run -m unittest discover -s tests -p '*_tests.py'
coverage xml
- name: Publish test coverage
uses: codecov/codecov-action@v1
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ignore IntelliJ/PyCharm IDE files
# ignore IntelliJ/PyCharm/VSCode IDE files
.idea
*.iml
.vscode


# coverage files
.coverage
Expand Down
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
## Project Description
The purpose of this project is to make time series manipulation with Spark simpler. Operations covered under this package include AS OF joins, rolling statistics with user-specified window lengths, featurization of time series using lagged values, and Delta Lake optimization on time and partition fields.

[![image](https://github.com/databrickslabs/tempo/workflows/build/badge.svg)](https://github.com/databrickslabs/tempo/actions?query=workflow%3Abuild)
[![codecov](https://codecov.io/gh/databrickslabs/tempo/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/tempo)
[![Downloads](https://pepy.tech/badge/dbl-tempo/month)](https://pepy.tech/project/dbl-tempo)
[![PyPI version](https://badge.fury.io/py/dbl-tempo.svg)](https://badge.fury.io/py/dbl-tempo)

## Using the Project
Expand Down Expand Up @@ -164,6 +166,7 @@ moving_avg = watch_accel_tsdf.withRangeStats("y", rangeBackWindowSecs=600)
moving_avg.select('event_ts', 'x', 'y', 'z', 'mean_y').show(10, False)
```


#### 6 - Fourier Transform

Method for transforming the time series to frequency domain based on the distinguished data column
Expand All @@ -178,7 +181,78 @@ valueCol = name of the time domain data column which will be transformed
ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)
```
#### 7 - Interpolation

Interpolate a series to fill in missing values using a specified function. The following interpolation methods are supported:

- Zero Fill : `zero`
- Null Fill: `null`
- Backwards Fill: `bfill`
- Forwards Fill: `ffill`
- Linear Fill: `linear`

The `interpolate` method can either be use in conjunction with `resample` or independently.

If `interpolate` is not chained after a `resample` operation, the method automatically first re-samples the input dataset into a given frequency, then performs interpolation on the sampled time-series dataset.

Possible values for frequency include patterns such as 1 minute, 4 hours, 2 days or simply sec, min, day. For the accepted functions to aggregate data, options are 'floor', 'ceil', 'min', 'max', 'mean'.

`NULL` values after re-sampling are treated the same as missing values. Ability to specify `NULL` as a valid value is currently not supported.

Valid columns data types for interpolation are: `["int", "bigint", "float", "double"]`.

```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
)

```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Expand Down
73 changes: 73 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
## Project Description
The purpose of this project is to make time series manipulation with Spark simpler. Operations covered under this package include AS OF joins, rolling statistics with user-specified window lengths, featurization of time series using lagged values, and Delta Lake optimization on time and partition fields.

[![image](https://github.com/databrickslabs/tempo/workflows/build/badge.svg)](https://github.com/databrickslabs/tempo/actions?query=workflow%3Abuild)
[![codecov](https://codecov.io/gh/databrickslabs/tempo/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/tempo)
[![Downloads](https://pepy.tech/badge/dbl-tempo/month)](https://pepy.tech/project/dbl-tempo)

## Using the Project

Expand Down Expand Up @@ -181,6 +183,77 @@ ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)
```

#### 8- Interpolation
Interpolate a series to fill in missing values using a specified function. The following interpolation methods are supported:

- Zero Fill : `zero`
- Null Fill: `null`
- Backwards Fill: `bfill`
- Forwards Fill: `ffill`
- Linear Fill: `linear`

The `interpolate` method can either be use in conjunction with `resample` or independently.

If `interpolate` is not chained after a `resample` operation, the method automatically first re-samples the input dataset into a given frequency, then performs interpolation on the sampled time-series dataset.

Possible values for frequency include patterns such as 1 minute, 4 hours, 2 days or simply sec, min, day. For the accepted functions to aggregate data, options are 'floor', 'ceil', 'min', 'max', 'mean'.

`NULL` values after re-sampling are treated the same as missing values. Ability to specify `NULL` as a valid value is currently not supported.

Valid columns data types for interpolation are: `["int", "bigint", "float", "double"]`.

```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
)

```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Expand Down
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ipython==7.28.0
numpy==1.19.1
chispa==0.8.2
pandas==1.1.0
py4j==0.10.9
pyarrow==6.0.1
Expand Down
3 changes: 2 additions & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
packages=find_packages(where=".", include=["tempo"]),
install_requires=[
'ipython',
'pandas'
'pandas',
'scipy'
],
extras_require=dict(tests=["pytest"]),
classifiers=[
Expand Down
2 changes: 1 addition & 1 deletion python/tempo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from tempo.tsdf import TSDF
from tempo.utils import display
from tempo.utils import display
Loading

0 comments on commit c43888f

Please sign in to comment.