Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add interpolation functionality #109

Merged
merged 42 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
012c7f5
feat: add interpolation
guanjieshen Dec 22, 2021
862ae78
feat(interpolation): add support for multuple partitions, and target …
guanjieshen Dec 22, 2021
35cdd54
test: add interpolation zero fill test
guanjieshen Dec 23, 2021
32220b0
test: add additional interpolation tests
guanjieshen Dec 23, 2021
2fc47fc
chore: convert linear interpolation to use spark native functions
guanjieshen Dec 23, 2021
8638b0e
chore: allow for interpolation to be called directly from the TSDF ob…
guanjieshen Dec 23, 2021
1ae7f75
chore: update series fill logic
guanjieshen Jan 5, 2022
734a5cc
chore: change default behaviour for target_cols
guanjieshen Jan 5, 2022
9e2ff70
Merge pull request #1 from guanjieshen/poc/sample
guanjieshen Jan 5, 2022
70b6efd
chore: rename to be more consistent with pandas and the tsdf class
guanjieshen Jan 5, 2022
697dfad
chore(interpolation): make show if interpolated column optional
guanjieshen Jan 5, 2022
342c484
chore(interpolation): remove caching
guanjieshen Jan 5, 2022
29cfd56
Troubleshooting (#2)
guanjieshen Jan 11, 2022
45f536e
chore: add additional comments
guanjieshen Jan 11, 2022
706bd42
chore: update branches in test.yml
guanjieshen Jan 11, 2022
76b8b97
fix: update interpolate_column params
guanjieshen Jan 11, 2022
4be3cb9
chore: add interpolation details in readme.md
guanjieshen Jan 11, 2022
53f2181
chore: update main readme.md
guanjieshen Jan 11, 2022
e9282cb
chore: update main readme.md
guanjieshen Jan 11, 2022
ef11013
Merge branch 'master' of github.com:guanjieshen/tempo
guanjieshen Jan 11, 2022
2c9ac6f
Merge branch 'master' of github.com:guanjieshen/tempo
guanjieshen Jan 11, 2022
7fcbaac
chore: make readme more consistent
guanjieshen Jan 11, 2022
69777d6
chore: add build and downloads badge to readme
guanjieshen Jan 11, 2022
33f6efa
Merge remote-tracking branch 'upstream/master'
guanjieshen Jan 11, 2022
42f73d5
Merge remote-tracking branch 'upstream/release_Q42021'
guanjieshen Jan 11, 2022
4f460a0
changes
rportilla-databricks Jan 12, 2022
4a10e1d
Merge remote-tracking branch 'upstream/release_Q42021'
guanjieshen Jan 12, 2022
bcc5837
merge
rportilla-databricks Jan 12, 2022
968eb51
commit
rportilla-databricks Jan 12, 2022
0cdcae5
fix: fourier test java error
guanjieshen Jan 12, 2022
828eda7
fix: try to configure netty changes so tests for fourier will work
guanjieshen Jan 12, 2022
4a88cb0
change
rportilla-databricks Jan 12, 2022
20b8605
Merge branch 'master' of https://github.com/guanjieshen/tempo
rportilla-databricks Jan 12, 2022
45f6087
housekeeping: organize imports on tsdf.py
guanjieshen Jan 12, 2022
015879b
chore(interpolation): change back to bfill, change forward to ffill
guanjieshen Jan 14, 2022
f871c2f
interpolation: add the ability to call interpolate after resample
guanjieshen Jan 18, 2022
931a183
housekeeping: add missing type hint
guanjieshen Jan 18, 2022
592afb6
Merge branch 'master' of https://github.com/guanjieshen/tempo
rportilla-databricks Jan 18, 2022
e8dada2
chore(interpolate): update readme
guanjieshen Jan 20, 2022
12c3ce2
chore: update interpolation documentation to be more clear
guanjieshen Jan 20, 2022
49637d3
adding one unit test
rportilla-databricks Jan 20, 2022
65fb3f6
unit test with default
rportilla-databricks Jan 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ jobs:
- name: Set Spark env
run: |
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_SUBMIT_OPTS="--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true"
Copy link
Contributor Author

@guanjieshen guanjieshen Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to mitigate issue identified in SPARK-29923

- name: Generate coverage report
working-directory: ./python
run: |
pip install -r requirements.txt
pip install coverage
coverage run -m unittest
coverage run -m unittest discover -s tests -p '*_tests.py'
coverage xml
- name: Publish test coverage
uses: codecov/codecov-action@v1
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ignore IntelliJ/PyCharm IDE files
# ignore IntelliJ/PyCharm/VSCode IDE files
.idea
*.iml
.vscode


# coverage files
.coverage
Expand Down
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
## Project Description
The purpose of this project is to make time series manipulation with Spark simpler. Operations covered under this package include AS OF joins, rolling statistics with user-specified window lengths, featurization of time series using lagged values, and Delta Lake optimization on time and partition fields.

[![image](https://github.com/databrickslabs/tempo/workflows/build/badge.svg)](https://github.com/databrickslabs/tempo/actions?query=workflow%3Abuild)
[![codecov](https://codecov.io/gh/databrickslabs/tempo/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/tempo)
[![Downloads](https://pepy.tech/badge/dbl-tempo/month)](https://pepy.tech/project/dbl-tempo)
[![PyPI version](https://badge.fury.io/py/dbl-tempo.svg)](https://badge.fury.io/py/dbl-tempo)

## Using the Project
Expand Down Expand Up @@ -164,6 +166,7 @@ moving_avg = watch_accel_tsdf.withRangeStats("y", rangeBackWindowSecs=600)
moving_avg.select('event_ts', 'x', 'y', 'z', 'mean_y').show(10, False)
```


#### 6 - Fourier Transform

Method for transforming the time series to frequency domain based on the distinguished data column
Expand All @@ -178,7 +181,78 @@ valueCol = name of the time domain data column which will be transformed
ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)
```
#### 7 - Interpolation

Interpolate a series to fill in missing values using a specified function. The following interpolation methods are supported:

- Zero Fill : `zero`
- Null Fill: `null`
- Backwards Fill: `bfill`
- Forwards Fill: `ffill`
- Linear Fill: `linear`

The `interpolate` method can either be use in conjunction with `resample` or independently.

If `interpolate` is not chained after a `resample` operation, the method automatically first re-samples the input dataset into a given frequency, then performs interpolation on the sampled time-series dataset.

Possible values for frequency include patterns such as 1 minute, 4 hours, 2 days or simply sec, min, day. For the accepted functions to aggregate data, options are 'floor', 'ceil', 'min', 'max', 'mean'.

`NULL` values after re-sampling are treated the same as missing values. Ability to specify `NULL` as a valid value is currently not supported.

Valid columns data types for interpolation are: `["int", "bigint", "float", "double"]`.

```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
)

```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Expand Down
73 changes: 73 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
## Project Description
The purpose of this project is to make time series manipulation with Spark simpler. Operations covered under this package include AS OF joins, rolling statistics with user-specified window lengths, featurization of time series using lagged values, and Delta Lake optimization on time and partition fields.

[![image](https://github.com/databrickslabs/tempo/workflows/build/badge.svg)](https://github.com/databrickslabs/tempo/actions?query=workflow%3Abuild)
[![codecov](https://codecov.io/gh/databrickslabs/tempo/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/tempo)
[![Downloads](https://pepy.tech/badge/dbl-tempo/month)](https://pepy.tech/project/dbl-tempo)

## Using the Project

Expand Down Expand Up @@ -181,6 +183,77 @@ ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)
```

#### 8- Interpolation
Interpolate a series to fill in missing values using a specified function. The following interpolation methods are supported:

- Zero Fill : `zero`
- Null Fill: `null`
- Backwards Fill: `bfill`
- Forwards Fill: `ffill`
- Linear Fill: `linear`

The `interpolate` method can either be use in conjunction with `resample` or independently.

If `interpolate` is not chained after a `resample` operation, the method automatically first re-samples the input dataset into a given frequency, then performs interpolation on the sampled time-series dataset.

Possible values for frequency include patterns such as 1 minute, 4 hours, 2 days or simply sec, min, day. For the accepted functions to aggregate data, options are 'floor', 'ceil', 'min', 'max', 'mean'.

`NULL` values after re-sampling are treated the same as missing values. Ability to specify `NULL` as a valid value is currently not supported.

Valid columns data types for interpolation are: `["int", "bigint", "float", "double"]`.

```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
)

```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Expand Down
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ipython==7.28.0
numpy==1.19.1
chispa==0.8.2
pandas==1.1.0
py4j==0.10.9
pyarrow==6.0.1
Expand Down
3 changes: 2 additions & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
packages=find_packages(where=".", include=["tempo"]),
install_requires=[
'ipython',
'pandas'
'pandas',
'scipy'
],
extras_require=dict(tests=["pytest"]),
classifiers=[
Expand Down
2 changes: 1 addition & 1 deletion python/tempo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from tempo.tsdf import TSDF
from tempo.utils import display
from tempo.utils import display
Loading