Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add interpolation functionality #109

Merged
merged 42 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
012c7f5
feat: add interpolation
guanjieshen Dec 22, 2021
862ae78
feat(interpolation): add support for multuple partitions, and target …
guanjieshen Dec 22, 2021
35cdd54
test: add interpolation zero fill test
guanjieshen Dec 23, 2021
32220b0
test: add additional interpolation tests
guanjieshen Dec 23, 2021
2fc47fc
chore: convert linear interpolation to use spark native functions
guanjieshen Dec 23, 2021
8638b0e
chore: allow for interpolation to be called directly from the TSDF ob…
guanjieshen Dec 23, 2021
1ae7f75
chore: update series fill logic
guanjieshen Jan 5, 2022
734a5cc
chore: change default behaviour for target_cols
guanjieshen Jan 5, 2022
9e2ff70
Merge pull request #1 from guanjieshen/poc/sample
guanjieshen Jan 5, 2022
70b6efd
chore: rename to be more consistent with pandas and the tsdf class
guanjieshen Jan 5, 2022
697dfad
chore(interpolation): make show if interpolated column optional
guanjieshen Jan 5, 2022
342c484
chore(interpolation): remove caching
guanjieshen Jan 5, 2022
29cfd56
Troubleshooting (#2)
guanjieshen Jan 11, 2022
45f536e
chore: add additional comments
guanjieshen Jan 11, 2022
706bd42
chore: update branches in test.yml
guanjieshen Jan 11, 2022
76b8b97
fix: update interpolate_column params
guanjieshen Jan 11, 2022
4be3cb9
chore: add interpolation details in readme.md
guanjieshen Jan 11, 2022
53f2181
chore: update main readme.md
guanjieshen Jan 11, 2022
e9282cb
chore: update main readme.md
guanjieshen Jan 11, 2022
ef11013
Merge branch 'master' of github.com:guanjieshen/tempo
guanjieshen Jan 11, 2022
2c9ac6f
Merge branch 'master' of github.com:guanjieshen/tempo
guanjieshen Jan 11, 2022
7fcbaac
chore: make readme more consistent
guanjieshen Jan 11, 2022
69777d6
chore: add build and downloads badge to readme
guanjieshen Jan 11, 2022
33f6efa
Merge remote-tracking branch 'upstream/master'
guanjieshen Jan 11, 2022
42f73d5
Merge remote-tracking branch 'upstream/release_Q42021'
guanjieshen Jan 11, 2022
4f460a0
changes
rportilla-databricks Jan 12, 2022
4a10e1d
Merge remote-tracking branch 'upstream/release_Q42021'
guanjieshen Jan 12, 2022
bcc5837
merge
rportilla-databricks Jan 12, 2022
968eb51
commit
rportilla-databricks Jan 12, 2022
0cdcae5
fix: fourier test java error
guanjieshen Jan 12, 2022
828eda7
fix: try to configure netty changes so tests for fourier will work
guanjieshen Jan 12, 2022
4a88cb0
change
rportilla-databricks Jan 12, 2022
20b8605
Merge branch 'master' of https://github.com/guanjieshen/tempo
rportilla-databricks Jan 12, 2022
45f6087
housekeeping: organize imports on tsdf.py
guanjieshen Jan 12, 2022
015879b
chore(interpolation): change back to bfill, change forward to ffill
guanjieshen Jan 14, 2022
f871c2f
interpolation: add the ability to call interpolate after resample
guanjieshen Jan 18, 2022
931a183
housekeeping: add missing type hint
guanjieshen Jan 18, 2022
592afb6
Merge branch 'master' of https://github.com/guanjieshen/tempo
rportilla-databricks Jan 18, 2022
e8dada2
chore(interpolate): update readme
guanjieshen Jan 20, 2022
12c3ce2
chore: update interpolation documentation to be more clear
guanjieshen Jan 20, 2022
49637d3
adding one unit test
rportilla-databricks Jan 20, 2022
65fb3f6
unit test with default
rportilla-databricks Jan 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ignore IntelliJ/PyCharm IDE files
# ignore IntelliJ/PyCharm/VSCode IDE files
.idea
*.iml
.vscode


# coverage files
.coverage
Expand Down
321 changes: 321 additions & 0 deletions python/tempo/interpol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
import sys
from datetime import datetime
from typing import List, Tuple

from pyspark.sql import SparkSession as spark
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import (
col,
first,
last,
lit,
to_timestamp,
unix_timestamp,
when,
)
from pyspark.sql.window import Window

# Interpolation fill options
fill_options = ["zero", "null", "back", "forward", "linear"]
supported_target_col_types = ["double", "float"]


class Interpolation:
def __calc_linear_spark(
self, df: DataFrame, epoch, epoch_ff, epoch_bf, value_ff, value_bf, value
):
"""
Native Spark function for calculating linear interpolation on a DataFrame.

:param epoch - Original epoch timestamp of the column to be interpolated.
:param epoch_ff - Forward filled epoch timestamp of the column to be interpolated.
:param epoch_bf - Backfilled epoch timestamp of the column to be interpolated.
:param value_ff - Forward filled value of the column to be interpolated.
:param value_bf - Backfilled value of the column to be interpolated.
:param value - Original value of the column to be interpolated.
"""
cols: List[str] = df.columns
cols.remove(value)
expr: str = f"""
case when {value_bf} = {value_ff} then {value}
else
({value_ff}-{value_bf})
/({epoch_ff}-{epoch_bf})
*({epoch}-{epoch_bf})
+ {value_bf}
end as {value}
"""
interpolated: DataFrame = df.selectExpr(*cols, expr)
# Preserve column order
return interpolated.select(*df.columns)

# TODO: Currently not being used. But will useful for interpolating arbitrary ranges.
def get_time_range(self, df: DataFrame, ts_col: str) -> Tuple[str]:
"""
Get minimum timestamp and maximum timestamp for DataFrame.

:param df - Input dataframe (must have timestamp column)
:param ts_col - Timestamp column name
"""
start_ts, end_ts = df.select(min(ts_col), max(ts_col)).first()
self.start_ts = start_ts
self.end_ts = end_ts
return start_ts, end_ts

# TODO: Currently not being used. But will useful for interpolating arbitrary ranges.
def generate_series(
self, start: datetime, stop: datetime, interval: int
) -> DataFrame:
"""
Generate timeseries for a given range and interval.

:param start - lower bound, inclusive
:param stop - upper bound, exclusive
:param interval - increment interval in seconds
"""
# Determine start and stops in epoch seconds
start, stop = (
spark.createDataFrame([(start, stop)], ("start", "stop"))
.select([col(c).cast("timestamp").cast("long") for c in ("start", "stop")])
.first()
)
# Create range with increments and cast to timestamp
return spark.range(start, stop, interval).select(
col("id").cast("timestamp").alias("generated_ts")
)

def __validate_fill(self, fill: str):
"""
Validate if the fill provided is within the allowed list of values.

:param fill - Fill type e.g. "zero", "null", "back", "forward", "linear"
"""
if fill not in fill_options:
raise ValueError(
f"Please select from one of the following fill options: {fill_options}"
)

def __validate_col(
self,
df: DataFrame,
partition_cols: List[str],
target_cols: List[str],
ts_col: str,
):
"""
Validate if target column exists and is of numeric type, and validates if partition column exists.

:param df - DataFrame to be validated
:param partition_cols - Partition columns to be validated
:param target_col - Target column to be validated
:param ts_col - Timestamp column to be validated
"""
for column in partition_cols:
if column not in str(df.columns):
raise ValueError(
f"Partition Column: '{column}' does not exist in DataFrame."
)
for column in target_cols:
if column not in str(df.columns):
raise ValueError(
f"Target Column: '{column}' does not exist in DataFrame."
)
if df.select(column).dtypes[0][1] not in supported_target_col_types:
raise ValueError(
f"Target Column needs to be one of the following types: {supported_target_col_types}"
)

if ts_col not in str(df.columns):
raise ValueError(
f"Timestamp Column: '{ts_col}' does not exist in DataFrame."
)

if df.select(ts_col).dtypes[0][1] != "timestamp":
raise ValueError(f"Timestamp Column needs to be of timestamp type.")

def __fill_series(
self, df: DataFrame, partition_cols: List[str], ts_col: str, target_col: str
) -> DataFrame:
"""
Apply forward and backward fill (value and timestamp) to the timeseries dataset.

:param df - input dataframe
:param partition_cols - partition columns for window (columns must exist in the input dataframe)
:ts_col int - timeseries column (must exist in the input dataframe)
:target_col int - column to perform interpolation (must exist in the input dataframe)
"""
window_ff = (
Window.partitionBy(*partition_cols)
.orderBy(ts_col)
.rowsBetween(-sys.maxsize, 0)
)
window_bf = (
Window.partitionBy(*partition_cols)
.orderBy(ts_col)
.rowsBetween(0, sys.maxsize)
)

read_last = last(df[target_col], ignorenulls=True).over(window_ff)
read_next = first(df[target_col], ignorenulls=True).over(window_bf)
readtime_last = last(df["surrogate_ts"], ignorenulls=True).over(window_ff)
readtime_next = first(df["surrogate_ts"], ignorenulls=True).over(window_bf)

filled_series: DataFrame = (
df.withColumn("readvalue_ff", read_last)
.withColumn("readvalue_bf", read_next)
.withColumn("readtime_ff", readtime_last)
.withColumn("readtime_bf", readtime_next)
.filter(col("readtime_bf").isNotNull())
)

return filled_series

def __interpolate_column(
self,
epoch_series: DataFrame,
partition_cols: List[str],
ts_col: str,
target_col: str,
fill: str,
) -> DataFrame:
"""
Apply interpolation to column.

:param epoch_series - input DataFrame in epoch seconds
:param ts_col - timestamp column name
:param target_col - numeric column to interpolate
:param fill - interpolation function to fill missing values
"""
# Generate Fill for Dataset
filled_series: DataFrame = self.__fill_series(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can return a list of new columns [last(col(f)) for f in target_list] then expand in the final data frame, we can avoid the joins. I'll work on a prototype for that - this would be really helpful in avoiding the join even though logically we are still going through each column to compute the last non-null value.

epoch_series, partition_cols, ts_col, target_col
)

output_df: DataFrame = filled_series

# Handle zero fill
if fill == "zero":
output_df = filled_series.withColumn(
target_col,
when(col(target_col).isNotNull(), col(target_col)).otherwise(lit(0)),
)

# Handle forward fill
if fill == "forward":
output_df = filled_series.withColumn(target_col, col("readvalue_ff"))

# Handle backwards fill
if fill == "back":
output_df = filled_series.withColumn(target_col, col("readvalue_bf"))

# Handle linear fill
if fill == "linear":
output_df = output_df.transform(
lambda df: self.__calc_linear_spark(
df,
ts_col,
"readtime_ff",
"readtime_bf",
"readvalue_ff",
"readvalue_bf",
target_col,
)
)

return output_df

def interpolate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guanjieshen , I'm wondering if there is a way to extend TSDF so that this operation can be applied in the same way that 'resample' is applied. This is not required but having a consistent way to call functions on TSDF makes things simpler. Something like tsdf.interpolate as opposed to Interpolate.interpolate(tsdf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, can definitely make that change!

self,
tsdf,
ts_col: str,
partition_cols: List[str],
target_cols: List[str],
sample_freq: str,
sample_func: str,
fill: str,
) -> DataFrame:
"""
Apply interpolation to TSDF.

:param tsdf - input TSDF
:param target_cols - numeric columns to interpolate
:param sample_freq - frequency at which to sample
:param sample_func - aggregate function for sampling
:param fill - interpolation function to fill missing values
:param ts_col - timestamp column name
:param partition_cols - partition columns names
"""
# Validate parameters
self.__validate_fill(fill)
self.__validate_col(tsdf.df, partition_cols, target_cols, ts_col)

# Resample and Normalize Columns
# TODO: Resampling twice, possible optimization is to see if we can do it only resample once
no_fill: DataFrame = tsdf.resample(
freq=sample_freq, func=sample_func, metricCols=target_cols
).df

# Build columns list to joining the two sampled series
join_cols: List[str] = partition_cols + [ts_col]

# Get series zero filled
zero_fill: DataFrame = tsdf.resample(
freq=sample_freq, func=sample_func, fill=True, metricCols=target_cols
).df.select(*join_cols)

# Join sampled DataFrames - generate complete timeseries
joined_series: DataFrame = zero_fill.join(
no_fill,
join_cols,
"left",
)

# Perform interpolation on each target column
output_list: List[DataFrame] = []
for target_col in target_cols:
# Mark columns that are interpolated
marked_series: DataFrame = joined_series.withColumn(
f"is_{target_col}_interpolated",
when(col(target_col).isNull(), True).otherwise(False),
)

# Add surrogate ts_col to get missing values
with_surrogate_ts: DataFrame = marked_series.withColumn(
"surrogate_ts",
when(col(target_col).isNull(), None).otherwise(col(ts_col)),
)

# # Normalize Timestamp to Epoch Seconds
epoch_series: DataFrame = with_surrogate_ts.withColumn(
ts_col, unix_timestamp(ts_col)
).withColumn("surrogate_ts", unix_timestamp("surrogate_ts"))

# # Interpolate target columns
interpolated_result: DataFrame = self.__interpolate_column(
epoch_series, partition_cols, ts_col, target_col, fill
)

# Denormalize output
columns_to_drop: List[str] = [
"readvalue_ff",
"readvalue_bf",
"readtime_ff",
"readtime_bf",
"surrogate_ts",
] + target_cols
columns_to_drop.remove(target_col)
normalized_output: DataFrame = (
# Drop intermediatry columns
interpolated_result.drop(*columns_to_drop)
# Convert timestamp column back to timestamp
.withColumn(ts_col, to_timestamp(ts_col))
)
output_list.append(normalized_output)

# Join output_list into single DataFrame to output
output: DataFrame = output_list[0]
output.cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A memory cache will be expensive here. Instead of using this, I would rely on Delta Lake - we can make a comment that this step is optimized on NVMe-supported cloud VMs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed output.cache()

for right_df in output_list[1:]:
output: DataFrame = output.join(right_df, on=join_cols, how="left")

return output
42 changes: 35 additions & 7 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import tempo.resample as rs
import tempo.io as tio

from tempo.utils import ENV_BOOLEAN, PLATFORM

from IPython.display import display as ipydisplay
from IPython.core.display import HTML
import logging
from functools import reduce
from typing import List

import pyspark.sql.functions as f
from IPython.core.display import HTML
from IPython.display import display as ipydisplay
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.window import Window

import tempo.io as tio
import tempo.resample as rs
from tempo.interpol import Interpolation
from tempo.utils import ENV_BOOLEAN, PLATFORM

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None)
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill)
return(enriched_tsdf)

def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None):
Copy link
Contributor

@rportilla-databricks rportilla-databricks Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guanjieshen , let's discuss why a frequency or function is required here at all. Users should be able to interpolate without resampling at all - https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.interpolate.html

Copy link
Contributor

@rportilla-databricks rportilla-databricks Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some suggestions:

  • Change sample_freq -> freq (to be consistent with resample)
  • Change sample_func -> func (to be consistent with resample)
  • Make freq/func and target_cols optional (you can use the code block below to populate the target_cols in case the user forgets or really just wants everything interpolated)
    if target_cols is None:
        prohibited_cols = self.partitionCols + [self.ts_col]
        summarizable_types = ['int', 'bigint', 'float', 'double']
        # filter columns to find summarizable columns
        target_cols = [datatype[0] for datatype in self.df.dtypes if
                           ((datatype[1] in summarizable_types) and
                            (datatype[0].lower() not in prohibited_cols))]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the changes to the naming, and modified the default behaviour for target_cols based on the code snipped.

For freq and func wouldn't these need to be required? Unless we are thinking that a user needs to call resample first before interpolate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion - could you call the fill parameter 'method' instead of 'fill'? This is more consistent with pandas and I think is a little clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions:

Make an option for adding the 'is_interpolated' columns. I see this being really valuable but if there are 90 columns, this could get really cumbersome to maintain in the table.
Make an option for forcing a resample - in this case, the user can supply an argument for freq/func. Otherwise, we assume it's done and interpolate won't do much.

Copy link
Contributor Author

@guanjieshen guanjieshen Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to a boolean show_interpolated that can be set when calling interpolate by default it's set to not show.

"""
function to interpolate based on frequency, aggregation, and fill similar to pandas. Data will first be aggregated using resample, then missing values
will be filled based on the fill calculation.

:param target_cols: columns that should be interpolated
:param sample_freq: frequency for upsample - valid inputs are "hr", "min", "sec" corresponding to hour, minute, or second
:param sample_func: function used to aggregate input
:param fill: function used to fill missing values e.g. linear, null, zero, back, forward
:param ts_col [optional]: specify other ts_col, by default this uses the ts_col within the TSDF object
:param partition_cols: specify other partition_cols, by default this uses the partition_cols within the TSDF object
:return: TSDF object with interpolated data
"""

# Set defaults for timestamp column and partition columns
if ts_col ==None:
ts_col = self.ts_col
if partition_cols == None:
partition_cols = self.partitionCols

interpolate_service: Interpolation = Interpolation()
tsdf_input = TSDF(self.df, ts_col = ts_col, partition_cols=partition_cols)
interpolated_df:DataFrame = interpolate_service.interpolate(tsdf_input,ts_col, partition_cols,target_cols, sample_freq, sample_func, fill)

return TSDF(interpolated_df, ts_col = ts_col, partition_cols=partition_cols)

def calc_bars(tsdf, freq, func = None, metricCols = None, fill = None):

resample_open = tsdf.resample(freq=freq, func='floor', metricCols = metricCols, prefix='open', fill = fill)
Expand Down
Loading