-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add interpolation functionality #109
Changes from 6 commits
012c7f5
862ae78
35cdd54
32220b0
2fc47fc
8638b0e
1ae7f75
734a5cc
9e2ff70
70b6efd
697dfad
342c484
29cfd56
45f536e
706bd42
76b8b97
4be3cb9
53f2181
e9282cb
ef11013
2c9ac6f
7fcbaac
69777d6
33f6efa
42f73d5
4f460a0
4a10e1d
bcc5837
968eb51
0cdcae5
828eda7
4a88cb0
20b8605
45f6087
015879b
f871c2f
931a183
592afb6
e8dada2
12c3ce2
49637d3
65fb3f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,321 @@ | ||
import sys | ||
from datetime import datetime | ||
from typing import List, Tuple | ||
|
||
from pyspark.sql import SparkSession as spark | ||
from pyspark.sql.dataframe import DataFrame | ||
from pyspark.sql.functions import ( | ||
col, | ||
first, | ||
last, | ||
lit, | ||
to_timestamp, | ||
unix_timestamp, | ||
when, | ||
) | ||
from pyspark.sql.window import Window | ||
|
||
# Interpolation fill options | ||
fill_options = ["zero", "null", "back", "forward", "linear"] | ||
supported_target_col_types = ["double", "float"] | ||
|
||
|
||
class Interpolation: | ||
def __calc_linear_spark( | ||
self, df: DataFrame, epoch, epoch_ff, epoch_bf, value_ff, value_bf, value | ||
): | ||
""" | ||
Native Spark function for calculating linear interpolation on a DataFrame. | ||
|
||
:param epoch - Original epoch timestamp of the column to be interpolated. | ||
:param epoch_ff - Forward filled epoch timestamp of the column to be interpolated. | ||
:param epoch_bf - Backfilled epoch timestamp of the column to be interpolated. | ||
:param value_ff - Forward filled value of the column to be interpolated. | ||
:param value_bf - Backfilled value of the column to be interpolated. | ||
:param value - Original value of the column to be interpolated. | ||
""" | ||
cols: List[str] = df.columns | ||
cols.remove(value) | ||
expr: str = f""" | ||
case when {value_bf} = {value_ff} then {value} | ||
else | ||
({value_ff}-{value_bf}) | ||
/({epoch_ff}-{epoch_bf}) | ||
*({epoch}-{epoch_bf}) | ||
+ {value_bf} | ||
end as {value} | ||
""" | ||
interpolated: DataFrame = df.selectExpr(*cols, expr) | ||
# Preserve column order | ||
return interpolated.select(*df.columns) | ||
|
||
# TODO: Currently not being used. But will useful for interpolating arbitrary ranges. | ||
def get_time_range(self, df: DataFrame, ts_col: str) -> Tuple[str]: | ||
""" | ||
Get minimum timestamp and maximum timestamp for DataFrame. | ||
|
||
:param df - Input dataframe (must have timestamp column) | ||
:param ts_col - Timestamp column name | ||
""" | ||
start_ts, end_ts = df.select(min(ts_col), max(ts_col)).first() | ||
self.start_ts = start_ts | ||
self.end_ts = end_ts | ||
return start_ts, end_ts | ||
|
||
# TODO: Currently not being used. But will useful for interpolating arbitrary ranges. | ||
def generate_series( | ||
self, start: datetime, stop: datetime, interval: int | ||
) -> DataFrame: | ||
""" | ||
Generate timeseries for a given range and interval. | ||
|
||
:param start - lower bound, inclusive | ||
:param stop - upper bound, exclusive | ||
:param interval - increment interval in seconds | ||
""" | ||
# Determine start and stops in epoch seconds | ||
start, stop = ( | ||
spark.createDataFrame([(start, stop)], ("start", "stop")) | ||
.select([col(c).cast("timestamp").cast("long") for c in ("start", "stop")]) | ||
.first() | ||
) | ||
# Create range with increments and cast to timestamp | ||
return spark.range(start, stop, interval).select( | ||
col("id").cast("timestamp").alias("generated_ts") | ||
) | ||
|
||
def __validate_fill(self, fill: str): | ||
""" | ||
Validate if the fill provided is within the allowed list of values. | ||
|
||
:param fill - Fill type e.g. "zero", "null", "back", "forward", "linear" | ||
""" | ||
if fill not in fill_options: | ||
raise ValueError( | ||
f"Please select from one of the following fill options: {fill_options}" | ||
) | ||
|
||
def __validate_col( | ||
self, | ||
df: DataFrame, | ||
partition_cols: List[str], | ||
target_cols: List[str], | ||
ts_col: str, | ||
): | ||
""" | ||
Validate if target column exists and is of numeric type, and validates if partition column exists. | ||
|
||
:param df - DataFrame to be validated | ||
:param partition_cols - Partition columns to be validated | ||
:param target_col - Target column to be validated | ||
:param ts_col - Timestamp column to be validated | ||
""" | ||
for column in partition_cols: | ||
if column not in str(df.columns): | ||
raise ValueError( | ||
f"Partition Column: '{column}' does not exist in DataFrame." | ||
) | ||
for column in target_cols: | ||
if column not in str(df.columns): | ||
raise ValueError( | ||
f"Target Column: '{column}' does not exist in DataFrame." | ||
) | ||
if df.select(column).dtypes[0][1] not in supported_target_col_types: | ||
raise ValueError( | ||
f"Target Column needs to be one of the following types: {supported_target_col_types}" | ||
) | ||
|
||
if ts_col not in str(df.columns): | ||
raise ValueError( | ||
f"Timestamp Column: '{ts_col}' does not exist in DataFrame." | ||
) | ||
|
||
if df.select(ts_col).dtypes[0][1] != "timestamp": | ||
raise ValueError(f"Timestamp Column needs to be of timestamp type.") | ||
|
||
def __fill_series( | ||
self, df: DataFrame, partition_cols: List[str], ts_col: str, target_col: str | ||
) -> DataFrame: | ||
""" | ||
Apply forward and backward fill (value and timestamp) to the timeseries dataset. | ||
|
||
:param df - input dataframe | ||
:param partition_cols - partition columns for window (columns must exist in the input dataframe) | ||
:ts_col int - timeseries column (must exist in the input dataframe) | ||
:target_col int - column to perform interpolation (must exist in the input dataframe) | ||
""" | ||
window_ff = ( | ||
Window.partitionBy(*partition_cols) | ||
.orderBy(ts_col) | ||
.rowsBetween(-sys.maxsize, 0) | ||
) | ||
window_bf = ( | ||
Window.partitionBy(*partition_cols) | ||
.orderBy(ts_col) | ||
.rowsBetween(0, sys.maxsize) | ||
) | ||
|
||
read_last = last(df[target_col], ignorenulls=True).over(window_ff) | ||
read_next = first(df[target_col], ignorenulls=True).over(window_bf) | ||
readtime_last = last(df["surrogate_ts"], ignorenulls=True).over(window_ff) | ||
readtime_next = first(df["surrogate_ts"], ignorenulls=True).over(window_bf) | ||
|
||
filled_series: DataFrame = ( | ||
df.withColumn("readvalue_ff", read_last) | ||
.withColumn("readvalue_bf", read_next) | ||
.withColumn("readtime_ff", readtime_last) | ||
.withColumn("readtime_bf", readtime_next) | ||
.filter(col("readtime_bf").isNotNull()) | ||
) | ||
|
||
return filled_series | ||
|
||
def __interpolate_column( | ||
self, | ||
epoch_series: DataFrame, | ||
partition_cols: List[str], | ||
ts_col: str, | ||
target_col: str, | ||
fill: str, | ||
) -> DataFrame: | ||
""" | ||
Apply interpolation to column. | ||
|
||
:param epoch_series - input DataFrame in epoch seconds | ||
:param ts_col - timestamp column name | ||
:param target_col - numeric column to interpolate | ||
:param fill - interpolation function to fill missing values | ||
""" | ||
# Generate Fill for Dataset | ||
filled_series: DataFrame = self.__fill_series( | ||
epoch_series, partition_cols, ts_col, target_col | ||
) | ||
|
||
output_df: DataFrame = filled_series | ||
|
||
# Handle zero fill | ||
if fill == "zero": | ||
output_df = filled_series.withColumn( | ||
target_col, | ||
when(col(target_col).isNotNull(), col(target_col)).otherwise(lit(0)), | ||
) | ||
|
||
# Handle forward fill | ||
if fill == "forward": | ||
output_df = filled_series.withColumn(target_col, col("readvalue_ff")) | ||
|
||
# Handle backwards fill | ||
if fill == "back": | ||
output_df = filled_series.withColumn(target_col, col("readvalue_bf")) | ||
|
||
# Handle linear fill | ||
if fill == "linear": | ||
output_df = output_df.transform( | ||
lambda df: self.__calc_linear_spark( | ||
df, | ||
ts_col, | ||
"readtime_ff", | ||
"readtime_bf", | ||
"readvalue_ff", | ||
"readvalue_bf", | ||
target_col, | ||
) | ||
) | ||
|
||
return output_df | ||
|
||
def interpolate( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @guanjieshen , I'm wondering if there is a way to extend TSDF so that this operation can be applied in the same way that 'resample' is applied. This is not required but having a consistent way to call functions on TSDF makes things simpler. Something like tsdf.interpolate as opposed to Interpolate.interpolate(tsdf) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, can definitely make that change! |
||
self, | ||
tsdf, | ||
ts_col: str, | ||
partition_cols: List[str], | ||
target_cols: List[str], | ||
sample_freq: str, | ||
sample_func: str, | ||
fill: str, | ||
) -> DataFrame: | ||
""" | ||
Apply interpolation to TSDF. | ||
|
||
:param tsdf - input TSDF | ||
:param target_cols - numeric columns to interpolate | ||
:param sample_freq - frequency at which to sample | ||
:param sample_func - aggregate function for sampling | ||
:param fill - interpolation function to fill missing values | ||
:param ts_col - timestamp column name | ||
:param partition_cols - partition columns names | ||
""" | ||
# Validate parameters | ||
self.__validate_fill(fill) | ||
self.__validate_col(tsdf.df, partition_cols, target_cols, ts_col) | ||
|
||
# Resample and Normalize Columns | ||
# TODO: Resampling twice, possible optimization is to see if we can do it only resample once | ||
no_fill: DataFrame = tsdf.resample( | ||
freq=sample_freq, func=sample_func, metricCols=target_cols | ||
).df | ||
|
||
# Build columns list to joining the two sampled series | ||
join_cols: List[str] = partition_cols + [ts_col] | ||
|
||
# Get series zero filled | ||
zero_fill: DataFrame = tsdf.resample( | ||
freq=sample_freq, func=sample_func, fill=True, metricCols=target_cols | ||
).df.select(*join_cols) | ||
|
||
# Join sampled DataFrames - generate complete timeseries | ||
joined_series: DataFrame = zero_fill.join( | ||
no_fill, | ||
join_cols, | ||
"left", | ||
) | ||
|
||
# Perform interpolation on each target column | ||
output_list: List[DataFrame] = [] | ||
for target_col in target_cols: | ||
# Mark columns that are interpolated | ||
marked_series: DataFrame = joined_series.withColumn( | ||
f"is_{target_col}_interpolated", | ||
when(col(target_col).isNull(), True).otherwise(False), | ||
) | ||
|
||
# Add surrogate ts_col to get missing values | ||
with_surrogate_ts: DataFrame = marked_series.withColumn( | ||
"surrogate_ts", | ||
when(col(target_col).isNull(), None).otherwise(col(ts_col)), | ||
) | ||
|
||
# # Normalize Timestamp to Epoch Seconds | ||
epoch_series: DataFrame = with_surrogate_ts.withColumn( | ||
ts_col, unix_timestamp(ts_col) | ||
).withColumn("surrogate_ts", unix_timestamp("surrogate_ts")) | ||
|
||
# # Interpolate target columns | ||
interpolated_result: DataFrame = self.__interpolate_column( | ||
epoch_series, partition_cols, ts_col, target_col, fill | ||
) | ||
|
||
# Denormalize output | ||
columns_to_drop: List[str] = [ | ||
"readvalue_ff", | ||
"readvalue_bf", | ||
"readtime_ff", | ||
"readtime_bf", | ||
"surrogate_ts", | ||
] + target_cols | ||
columns_to_drop.remove(target_col) | ||
normalized_output: DataFrame = ( | ||
# Drop intermediatry columns | ||
interpolated_result.drop(*columns_to_drop) | ||
# Convert timestamp column back to timestamp | ||
.withColumn(ts_col, to_timestamp(ts_col)) | ||
) | ||
output_list.append(normalized_output) | ||
|
||
# Join output_list into single DataFrame to output | ||
output: DataFrame = output_list[0] | ||
output.cache() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A memory cache will be expensive here. Instead of using this, I would rely on Delta Lake - we can make a comment that this step is optimized on NVMe-supported cloud VMs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
for right_df in output_list[1:]: | ||
output: DataFrame = output.join(right_df, on=join_cols, how="left") | ||
|
||
return output |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,18 @@ | ||
import tempo.resample as rs | ||
import tempo.io as tio | ||
|
||
from tempo.utils import ENV_BOOLEAN, PLATFORM | ||
|
||
from IPython.display import display as ipydisplay | ||
from IPython.core.display import HTML | ||
import logging | ||
from functools import reduce | ||
from typing import List | ||
|
||
import pyspark.sql.functions as f | ||
from IPython.core.display import HTML | ||
from IPython.display import display as ipydisplay | ||
from pyspark.sql.dataframe import DataFrame | ||
from pyspark.sql.window import Window | ||
|
||
import tempo.io as tio | ||
import tempo.resample as rs | ||
from tempo.interpol import Interpolation | ||
from tempo.utils import ENV_BOOLEAN, PLATFORM | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
|
@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None) | |
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill) | ||
return(enriched_tsdf) | ||
|
||
def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @guanjieshen , let's discuss why a frequency or function is required here at all. Users should be able to interpolate without resampling at all - https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.interpolate.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some suggestions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made the changes to the naming, and modified the default behaviour for For There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion - could you call the fill parameter 'method' instead of 'fill'? This is more consistent with pandas and I think is a little clearer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestions: Make an option for adding the 'is_interpolated' columns. I see this being really valuable but if there are 90 columns, this could get really cumbersome to maintain in the table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this to a boolean |
||
""" | ||
function to interpolate based on frequency, aggregation, and fill similar to pandas. Data will first be aggregated using resample, then missing values | ||
will be filled based on the fill calculation. | ||
|
||
:param target_cols: columns that should be interpolated | ||
:param sample_freq: frequency for upsample - valid inputs are "hr", "min", "sec" corresponding to hour, minute, or second | ||
:param sample_func: function used to aggregate input | ||
:param fill: function used to fill missing values e.g. linear, null, zero, back, forward | ||
:param ts_col [optional]: specify other ts_col, by default this uses the ts_col within the TSDF object | ||
:param partition_cols: specify other partition_cols, by default this uses the partition_cols within the TSDF object | ||
:return: TSDF object with interpolated data | ||
""" | ||
|
||
# Set defaults for timestamp column and partition columns | ||
if ts_col ==None: | ||
ts_col = self.ts_col | ||
if partition_cols == None: | ||
partition_cols = self.partitionCols | ||
|
||
interpolate_service: Interpolation = Interpolation() | ||
tsdf_input = TSDF(self.df, ts_col = ts_col, partition_cols=partition_cols) | ||
interpolated_df:DataFrame = interpolate_service.interpolate(tsdf_input,ts_col, partition_cols,target_cols, sample_freq, sample_func, fill) | ||
|
||
return TSDF(interpolated_df, ts_col = ts_col, partition_cols=partition_cols) | ||
|
||
def calc_bars(tsdf, freq, func = None, metricCols = None, fill = None): | ||
|
||
resample_open = tsdf.resample(freq=freq, func='floor', metricCols = metricCols, prefix='open', fill = fill) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can return a list of new columns [last(col(f)) for f in target_list] then expand in the final data frame, we can avoid the joins. I'll work on a prototype for that - this would be really helpful in avoiding the join even though logically we are still going through each column to compute the last non-null value.