Skip to content

Commit

Permalink
test: add interpolation zero fill test
Browse files Browse the repository at this point in the history
  • Loading branch information
guanjieshen committed Dec 23, 2021
1 parent 862ae78 commit 35cdd54
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 5 deletions.
11 changes: 6 additions & 5 deletions python/tempo/interpol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ def __init__(self):
:linear_udf register linear calculation UDF
"""
self.linear_udf = udf(Interpolation.__calc_linear, FloatType())
self.__linear_udf = udf(Interpolation.__calc_linear, FloatType())

@staticmethod
def __calc_linear(epoch, epoch_ff, epoch_bf, value_ff, value_bf, value):
"""
UDF for calculating linear interpolation on a DataFrame.
User defined function for calculating linear interpolation on a DataFrame.
:param epoch - Original epoch timestamp of the column to be interpolated.
:param epoch_ff - Forward filled epoch timestamp of the column to be interpolated.
Expand Down Expand Up @@ -215,7 +215,7 @@ def __interpolate_column(
if fill == "linear":
output_df = output_df.withColumn(
target_col,
self.linear_udf(
self.__linear_udf(
ts_col,
"readtime_ff",
"readtime_bf",
Expand Down Expand Up @@ -257,14 +257,15 @@ def interpolate(
freq=sample_freq, func=sample_func, metricCols=target_cols
).df

# Columns for joining series
# Build columns list to joining the two sampled series
join_cols: List[str] = partition_cols + [ts_col]

# Get series zero filled
zero_fill: DataFrame = tsdf.resample(
freq=sample_freq, func=sample_func, fill=True, metricCols=target_cols
).df.select(*join_cols)

# Join Sampled DataFrames - Generate Complete Timeseries
# Join sampled DataFrames - generate complete timeseries
joined_series: DataFrame = zero_fill.join(
no_fill,
join_cols,
Expand Down
175 changes: 175 additions & 0 deletions python/tests/interpol_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from python.tempo.interpol import Interpolation
from python.tests.tests import SparkTest
from chispa.dataframe_comparer import *
from tempo.tsdf import TSDF
from tempo.utils import *


class InterpolationTest(SparkTest):
def buildTestDataFrame(self):
"""Test of range stats for 20 minute rolling window"""
schema = StructType(
[
StructField("partition_a", StringType()),
StructField("partition_b", StringType()),
StructField("event_ts", StringType()),
StructField("value_a", FloatType()),
StructField("value_b", FloatType()),
]
)

data = [
["A", "A-1", "2020-01-01 00:01:10", 349.21, 10.0],
["A", "A-2", "2020-01-01 00:01:15", 340.21, 9.0],
["B", "B-1", "2020-01-01 00:01:15", 362.1, 4.0],
["A", "A-2", "2020-01-01 00:01:17", 353.32, 8.0],
["B", "B-2", "2020-01-01 00:02:14", 350.32, 6.0],
["A", "A-1", "2020-01-01 00:02:13", 351.32, 7.0],
["B", "B-2", "2020-01-01 00:01:12", 361.1, 5.0],
]

# construct dataframes
self.input_df = self.buildTestDF(schema, data)

# generate TSDF
self.input_tsdf = TSDF(
self.input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)

# register interpolation helper
self.interpolate_helper = Interpolation()

def test_zero_fill_interpolation(self):
self.buildTestDataFrame()

expected_schema = StructType(
[
StructField("partition_a", StringType()),
StructField("partition_b", StringType()),
StructField("event_ts", StringType()),
StructField("value_a", DoubleType()),
StructField("is_value_a_interpolated", BooleanType(), False),
StructField("value_b", DoubleType()),
StructField("is_value_b_interpolated", BooleanType()),
]
)

expected_data = [
["A", "A-1", "2020-01-01 00:01:00", 349.2099914550781, False, 10.0, False],
["A", "A-1", "2020-01-01 00:01:30", 0.0, True, 0.0, True],
["A", "A-1", "2020-01-01 00:02:00", 351.32000732421875, False, 7.0, False],
["A", "A-2", "2020-01-01 00:01:00", 346.76499938964844, False, 8.5, False],
["B", "B-1", "2020-01-01 00:01:00", 362.1000061035156, False, 4.0, False],
["B", "B-2", "2020-01-01 00:01:00", 361.1000061035156, False, 5.0, False],
["B", "B-2", "2020-01-01 00:01:30", 0.0, True, 0.0, True],
["B", "B-2", "2020-01-01 00:02:00", 350.32000732421875, False, 6.0, False],
]

expected_df: DataFrame = self.buildTestDF(expected_schema, expected_data)

actual_df: DataFrame = self.interpolate_helper.interpolate(
tsdf=self.input_tsdf,
partition_cols=["partition_a", "partition_b"],
target_cols=["value_a", "value_b"],
sample_freq="30 seconds",
ts_col="event_ts",
sample_func="mean",
fill="zero",
)

# actual_df.show()
# expected_df.show()
# actual_df.printSchema()
# expected_df.printSchema()

assert_df_equality(expected_df, actual_df)

def test_zero_fill_interpolation(self):
self.buildTestDataFrame()

expected_schema = StructType(
[
StructField("partition_a", StringType()),
StructField("partition_b", StringType()),
StructField("event_ts", StringType()),
StructField("value_a", DoubleType()),
StructField("is_value_a_interpolated", BooleanType(), False),
StructField("value_b", DoubleType()),
StructField("is_value_b_interpolated", BooleanType()),
]
)

expected_data = [
["A", "A-1", "2020-01-01 00:01:00", 349.2099914550781, False, 10.0, False],
["A", "A-1", "2020-01-01 00:01:30", 0.0, True, 0.0, True],
["A", "A-1", "2020-01-01 00:02:00", 351.32000732421875, False, 7.0, False],
["A", "A-2", "2020-01-01 00:01:00", 346.76499938964844, False, 8.5, False],
["B", "B-1", "2020-01-01 00:01:00", 362.1000061035156, False, 4.0, False],
["B", "B-2", "2020-01-01 00:01:00", 361.1000061035156, False, 5.0, False],
["B", "B-2", "2020-01-01 00:01:30", 0.0, True, 0.0, True],
["B", "B-2", "2020-01-01 00:02:00", 350.32000732421875, False, 6.0, False],
]

expected_df: DataFrame = self.buildTestDF(expected_schema, expected_data)

actual_df: DataFrame = self.interpolate_helper.interpolate(
tsdf=self.input_tsdf,
partition_cols=["partition_a", "partition_b"],
target_cols=["value_a", "value_b"],
sample_freq="30 seconds",
ts_col="event_ts",
sample_func="mean",
fill="zero",
)

# actual_df.show()
# expected_df.show()
# actual_df.printSchema()
# expected_df.printSchema()

assert_df_equality(expected_df, actual_df)

def test_zero_fill_interpolation(self):
self.buildTestDataFrame()

expected_schema = StructType(
[
StructField("partition_a", StringType()),
StructField("partition_b", StringType()),
StructField("event_ts", StringType()),
StructField("value_a", DoubleType()),
StructField("is_value_a_interpolated", BooleanType(), False),
StructField("value_b", DoubleType()),
StructField("is_value_b_interpolated", BooleanType()),
]
)

expected_data = [
["A", "A-1", "2020-01-01 00:01:00", 349.2099914550781, False, 10.0, False],
["A", "A-1", "2020-01-01 00:01:30", 0.0, True, 0.0, True],
["A", "A-1", "2020-01-01 00:02:00", 351.32000732421875, False, 7.0, False],
["A", "A-2", "2020-01-01 00:01:00", 346.76499938964844, False, 8.5, False],
["B", "B-1", "2020-01-01 00:01:00", 362.1000061035156, False, 4.0, False],
["B", "B-2", "2020-01-01 00:01:00", 361.1000061035156, False, 5.0, False],
["B", "B-2", "2020-01-01 00:01:30", 0.0, True, 0.0, True],
["B", "B-2", "2020-01-01 00:02:00", 350.32000732421875, False, 6.0, False],
]

expected_df: DataFrame = self.buildTestDF(expected_schema, expected_data)

actual_df: DataFrame = self.interpolate_helper.interpolate(
tsdf=self.input_tsdf,
partition_cols=["partition_a", "partition_b"],
target_cols=["value_a", "value_b"],
sample_freq="30 seconds",
ts_col="event_ts",
sample_func="mean",
fill="zero",
)

assert_df_equality(expected_df, actual_df)

0 comments on commit 35cdd54

Please sign in to comment.