Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add interpolation functionality #109

Merged
merged 42 commits into from
Jan 20, 2022
Merged

feature: add interpolation functionality #109

merged 42 commits into from
Jan 20, 2022

Conversation

guanjieshen
Copy link
Contributor

@guanjieshen guanjieshen commented Dec 22, 2021

Add the ability to perform interpolation on TSDF objects. Will support the following types of fill for interpolation:

  • zero (missing values will be replaced with 0)
  • null (missing values will be replaced with null)
  • nearest neighbour backfill
  • nearest neighbour forward fill
  • linear

Core config options for this interpolation method will be in line with the TSDF class, such as the ability to support multiple partitions, and also the option to interpolate multiple columns at once (using the same fill type).

This implementation leverages the resample method already present in the TSDF class in order to normalize the dataset such that it can be interpolated. Therefore this can be thought of as a wrapper around the resample feature in order to add additional fill capabilities.

Also if show_interpolated flag to True the output will also provide a new column is_{column_name}_interpolated for each interpolated column that specifies on the row level if the data for that column is interpolated or not. There will also be a more general is_ts_interpolated column that specifies if the time series row itself has been interpolated regardless if data within the columns are null.

This implementation assumes that nulls are treated the same as missing values. Which means that users do not have to filter out nulls before interpolation.

Items that still need to be addressed in a later PR:

  • Allow nulls to be handled as either a valid value and as missing value

@lgtm-com
Copy link

lgtm-com bot commented Dec 22, 2021

This pull request introduces 1 alert when merging 012c7f5 into 88c4d05 - view on LGTM.com

new alerts:

  • 1 for 'import *' may pollute namespace

@lgtm-com
Copy link

lgtm-com bot commented Dec 22, 2021

This pull request introduces 1 alert when merging 862ae78 into 88c4d05 - view on LGTM.com

new alerts:

  • 1 for 'import *' may pollute namespace

Copy link
Contributor

@rportilla-databricks rportilla-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guanjieshen, this looks like a really great start to a highly requested feature that delivers a lot of value. One thing that is needed are unit tests. Ideally, most of the functions in the code below have an associated test. Could you add these so we can take a look at the interface? I'm happy to meet in person to sort this out next week as well.


:linear_udf register linear calculation UDF
"""
self.linear_udf = udf(Interpolation.__calc_linear, FloatType())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guanjieshen, is there any reason this needs to be a Python UDF vs pandas UDF? We should test this on a billion-record dataset to measure performance and I'm happy to help with that.

Copy link
Contributor Author

@guanjieshen guanjieshen Dec 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'm thinking of reimplementing this either in native spark throughselectExpr or converting it into a panadsUDF.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have switched this to use native spark functionality rather than udfs.


return output_df

def interpolate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guanjieshen , I'm wondering if there is a way to extend TSDF so that this operation can be applied in the same way that 'resample' is applied. This is not required but having a consistent way to call functions on TSDF makes things simpler. Something like tsdf.interpolate as opposed to Interpolate.interpolate(tsdf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, can definitely make that change!

@lgtm-com
Copy link

lgtm-com bot commented Dec 23, 2021

This pull request introduces 1 alert when merging 35cdd54 into 88c4d05 - view on LGTM.com

new alerts:

  • 1 for 'import *' may pollute namespace

@lgtm-com
Copy link

lgtm-com bot commented Dec 23, 2021

This pull request introduces 2 alerts when merging 32220b0 into 88c4d05 - view on LGTM.com

new alerts:

  • 1 for Unused import
  • 1 for 'import *' may pollute namespace

@guanjieshen
Copy link
Contributor Author

@guanjieshen, this looks like a really great start to a highly requested feature that delivers a lot of value. One thing that is needed are unit tests. Ideally, most of the functions in the code below have an associated test. Could you add these so we can take a look at the interface? I'm happy to meet in person to sort this out next week as well.

Great that sounds good! This is still a WIP right now, but hopefully we can run some benchmarks on a larger dataset (than the one I've been currently working on) in order to get a sense how it scales.

@guanjieshen
Copy link
Contributor Author

@rportilla-databricks also any concerns with create a separate test file for interpolation? The main tests.py seems fairly large right now.

@guanjieshen guanjieshen changed the title feat: add interpolation Feature: Add Interpolation Dec 23, 2021
@guanjieshen guanjieshen changed the title Feature: Add Interpolation Feature: Add Interpolation functionality Dec 23, 2021
@guanjieshen guanjieshen changed the title Feature: Add Interpolation functionality Feature: add interpolation functionality Dec 23, 2021
@guanjieshen guanjieshen changed the title Feature: add interpolation functionality feature: add interpolation functionality Dec 23, 2021
@lgtm-com
Copy link

lgtm-com bot commented Dec 23, 2021

This pull request introduces 1 alert when merging 2fc47fc into 88c4d05 - view on LGTM.com

new alerts:

  • 1 for 'import *' may pollute namespace

@lgtm-com
Copy link

lgtm-com bot commented Dec 23, 2021

This pull request introduces 2 alerts when merging 8638b0e into 88c4d05 - view on LGTM.com

new alerts:

  • 2 for Testing equality to None

@rportilla-databricks
Copy link
Contributor

rportilla-databricks commented Dec 27, 2021 via email

@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None)
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill)
return(enriched_tsdf)

def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None):
Copy link
Contributor

@rportilla-databricks rportilla-databricks Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guanjieshen , let's discuss why a frequency or function is required here at all. Users should be able to interpolate without resampling at all - https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.interpolate.html

Copy link
Contributor

@rportilla-databricks rportilla-databricks Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some suggestions:

  • Change sample_freq -> freq (to be consistent with resample)
  • Change sample_func -> func (to be consistent with resample)
  • Make freq/func and target_cols optional (you can use the code block below to populate the target_cols in case the user forgets or really just wants everything interpolated)
    if target_cols is None:
        prohibited_cols = self.partitionCols + [self.ts_col]
        summarizable_types = ['int', 'bigint', 'float', 'double']
        # filter columns to find summarizable columns
        target_cols = [datatype[0] for datatype in self.df.dtypes if
                           ((datatype[1] in summarizable_types) and
                            (datatype[0].lower() not in prohibited_cols))]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the changes to the naming, and modified the default behaviour for target_cols based on the code snipped.

For freq and func wouldn't these need to be required? Unless we are thinking that a user needs to call resample first before interpolate?

@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None)
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill)
return(enriched_tsdf)

def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion - could you call the fill parameter 'method' instead of 'fill'? This is more consistent with pandas and I think is a little clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None)
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill)
return(enriched_tsdf)

def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions:

Make an option for adding the 'is_interpolated' columns. I see this being really valuable but if there are 90 columns, this could get really cumbersome to maintain in the table.
Make an option for forcing a resample - in this case, the user can supply an argument for freq/func. Otherwise, we assume it's done and interpolate won't do much.

Copy link
Contributor Author

@guanjieshen guanjieshen Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to a boolean show_interpolated that can be set when calling interpolate by default it's set to not show.


# Join output_list into single DataFrame to output
output: DataFrame = output_list[0]
output.cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A memory cache will be expensive here. Instead of using this, I would rely on Delta Lake - we can make a comment that this step is optimized on NVMe-supported cloud VMs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed output.cache()

:param fill - interpolation function to fill missing values
"""
# Generate Fill for Dataset
filled_series: DataFrame = self.__fill_series(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can return a list of new columns [last(col(f)) for f in target_list] then expand in the final data frame, we can avoid the joins. I'll work on a prototype for that - this would be really helpful in avoiding the join even though logically we are still going through each column to compute the last non-null value.

@lgtm-com
Copy link

lgtm-com bot commented Jan 12, 2022

This pull request introduces 2 alerts when merging 4a10e1d into 4eb3e8a - view on LGTM.com

new alerts:

  • 2 for Module is imported more than once

@@ -20,13 +20,13 @@ jobs:
- name: Set Spark env
run: |
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_SUBMIT_OPTS="--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true"
Copy link
Contributor Author

@guanjieshen guanjieshen Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to mitigate issue identified in SPARK-29923

Comment on lines +21 to +22
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to mitigate issue identified in SPARK-29923

from pyspark.sql.window import Window

# Interpolation fill options
method_options = ["zero", "null", "back", "forward", "linear"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to suggest we use 'bfill', 'ffill' for the back/forward options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in this commit!

guanjieshen@015879b

@@ -585,6 +586,41 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None)
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill)
return(enriched_tsdf)

def interpolate(self, freq: str, func: str, method: str, target_cols: List[str] = None,ts_col: str = None, partition_cols: List[str]=None, show_interpolated:bool = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the freq/func optional in the case that the user supplies a resampled data frame? So, I'd like to be able to do this:

   actual_df: DataFrame =simple_input_tsdf.resample(freq = '1 minute', func='min').interpolate(
        method="linear"
    ).df

Copy link
Contributor Author

@guanjieshen guanjieshen Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can get rid of func, but freq is required so that the interpolation function knows what interval to fill the gaps in with. Alternatively, we can store freq and func as a private variable in the TSDF class if a resample has been done on the dataset?

@rportilla-databricks
Copy link
Contributor

rportilla-databricks commented Jan 15, 2022 via email

@guanjieshen
Copy link
Contributor Author

guanjieshen commented Jan 18, 2022

Oh this makes sense. When this is done using pandas, it's typically another subclass of data frame (resampled data frame). We could add a subclass of TSDF in a similar way (ResampledTSDF) - this would have the resampled frequency as a data attribute. Then interpolate method always knows it can safely skip. What do you think?

On Fri, Jan 14, 2022 at 5:03 PM Guanjie Shen @.> wrote: @.* commented on this pull request. ------------------------------ In python/tempo/tsdf.py <#109 (comment)>: > @@ -585,6 +586,41 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None) enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill) return(enriched_tsdf) + def interpolate(self, freq: str, func: str, method: str, target_cols: List[str] = None,ts_col: str = None, partition_cols: List[str]=None, show_interpolated:bool = False): I can get rid of func, but freq is required so that the interpolation function know what interval to fill the gaps with. Alternatively, we can store that as a private variable in the TSDF class if a resample has been done on the dataset? — Reply to this email directly, view it on GitHub <#109 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJCRAXDBVRMZYAGMQ3JVRKTUWCMS3ANCNFSM5KTAJMBQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub. You are receiving this because you were mentioned.Message ID: @.>
-- Ricardo Portilla Lead Solutions Architect, Ph.D Databricks Inc. @.
databricks.com

Good idea, made the change in this commit: guanjieshen@f871c2f

So now something like this should will work:

interpolated_df: DataFrame = (
   input_tsdf.resample(freq="30 seconds", func="mean").interpolate(method="linear").df
)

@rportilla-databricks Thoughts on deprecating the fill parameter within the resample method? Since with the latest change this:

input_tsdf.resample(freq="30 seconds", func="mean", fill=True)

should now be equivalent to:

input_tsdf.resample(freq="30 seconds", func="mean").interpolate(method="null")

I think leaving it in may create unexpected results especially if users try something like this:

input_tsdf.resample(freq="30 seconds", func="mean", fill=True).interpolate(method="null")

@rportilla-databricks
Copy link
Contributor

Just added one unit test. This looks great!

@rportilla-databricks rportilla-databricks merged commit c43888f into databrickslabs:release_Q42021 Jan 20, 2022
rportilla-databricks added a commit that referenced this pull request Jan 28, 2022
* new changes

* updated upsample

* updated upsample

* updated upsample

* committing read_yaml

* adding class1 with stacking

* adding class1 with stacking

* removing streams

* removing streams

* adding anomaly detection yaml support

* making database configurable

* making database configurable

* making database configurable

* added option for empty string prefix

* added option for empty string prefix

* added option for empty string prefix

* removing anomaly detection in branch

* remove anomaly detection code test file

* merging resample

* removing dbl tempo egg files

* removing dbl tempo egg files

* removing dbl tempo egg files

* removing dbl tempo egg files

* removing dbl tempo egg files

* Fourier transform functionality release Q42021 (#111)

* fourier transformation functionality in tempo

* fourier transform method docstrings added

* fourier transform unit test added

* updating readme with the fourier transform usage and the fourier function's variable naming pattern standard

* Updating requirements

* minor logic correction of naming the data column as 'val'

* adding the corrected buildTestDF and also adding pyarrow in requirements.txt

* Fourier unit test fixed and contributions information updated

* data column in tests and logic is corrected with the name changed to tdval

* original contribution restoration

* bringing the pandas_udf method inside the calling method to ensure the reference is not lost in the executors

* committing the correct timestep variable position

* adding self to timestep

* inherit timestep directly from parameter

* tidying up the codebase

* removed the set_timestep method as it is no longer required

* removing the unnecessary orderby

* adding order by inside the pandas function

* removed the redundant imports

* Update README.md

* fixing workflows

* feature: add interpolation functionality (#109)

* feat: add interpolation

* feat(interpolation): add support for multuple partitions, and target columns

* test: add interpolation zero fill test

* test: add additional interpolation tests

* chore: convert linear interpolation to use spark native functions

* chore: allow for interpolation to be called directly from the TSDF object

* chore: update series fill logic

* chore: change default behaviour for target_cols

* chore: rename to be more consistent with pandas and the tsdf class

* chore(interpolation): make show if interpolated column optional

* chore(interpolation): remove caching

* Troubleshooting (#2)

* Refactor interpolation code to remove joins, and double` resample`
* Added additional test coverage to interpolation code
* Updated `test` folder structure

Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com>

* chore: add additional comments

* chore: update branches in test.yml

* fix: update interpolate_column params

* chore: add interpolation details in readme.md

* chore: update main readme.md

* chore: update main readme.md

* Merge branch 'master' of github.com:guanjieshen/tempo

* chore: make readme more consistent

* chore: add build and downloads badge to readme

* changes

* fix: fourier test java error

* fix: try to configure netty changes so tests for fourier will work

* change

* housekeeping: organize imports on tsdf.py

* chore(interpolation): change back to bfill, change forward to ffill

* interpolation: add the ability to call interpolate after resample

* housekeeping: add missing type hint

* chore(interpolate): update readme

* chore: update interpolation documentation to be more clear

* adding one unit test

Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com>
Co-authored-by: Ricardo Portilla <ricardo.portilla@databricks.com>

* commiting release file

* removed unused code

* make the sql opt optional

* pushing prefix change

* pushing prefix change

* pushing prefix change

* pushing prefix change

* adding files

* adding files

* adding files

* updating asof prefix logic for sql optimization

Co-authored-by: Souvik Pratiher <70095944+Spratiher9@users.noreply.github.com>
Co-authored-by: Guanjie Shen <75445106+guanjieshen@users.noreply.github.com>
Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants