Skip to content

Commit

Permalink
Fix undefined start value handling in CAgg refresh
Browse files Browse the repository at this point in the history
The CAgg refresh job did not handle the NULL value of start_offset for a
time_bucket function with a variable width properly. This problem has
led to the creation of invalid invalidation records and 'timestamp out
of range' errors during the next refresh.

Fixes: #5474
  • Loading branch information
jnidzwetzki committed Mar 4, 2024
1 parent 9f2d4e9 commit ef860e9
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 8 deletions.
11 changes: 9 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,20 @@ get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json
}

int64
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, bool *start_isnull)
policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim,
const Jsonb *config, bool *start_isnull)
{
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, start_isnull);

Oid type = ts_dimension_get_partition_type(dim);

/* interpret NULL as min value for that type */
if (*start_isnull)
return ts_time_get_min(ts_dimension_get_partition_type(dim));
{
return ts_continuous_agg_bucket_width_variable(cagg) ? ts_time_get_nobegin_or_min(type) :
ts_time_get_min(type);
}

return res;
}

Expand Down
4 changes: 2 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ extern Datum policy_refresh_cagg_check(PG_FUNCTION_ARGS);
extern Datum policy_refresh_cagg_remove(PG_FUNCTION_ARGS);

int32 policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config,
bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim,
const Jsonb *config, bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,11 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
errmsg("configuration materialization hypertable id %d not found",
materialization_id)));

ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);

open_dim = get_open_dimension_for_hypertable(mat_ht, true);
dim_type = ts_dimension_get_partition_type(open_dim);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config, &start_isnull);
refresh_start = policy_refresh_cagg_get_refresh_start(cagg, open_dim, config, &start_isnull);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config, &end_isnull);

if (refresh_start >= refresh_end)
Expand All @@ -420,7 +422,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->refresh_window.type = dim_type;
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.end = refresh_end;
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);
policy_data->cagg = cagg;
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
}
Expand Down
41 changes: 40 additions & 1 deletion tsl/test/expected/cagg_invalidation.out
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ WHERE cagg_id = :cond_10_id;

-- should trigger two individual refreshes
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Allow at most 5 individual invalidations per refreshe
-- Allow at most 5 individual invalidations per refresh
SET timescaledb.materializations_per_refresh_window=5;
-- Insert into every second bucket
INSERT INTO conditions VALUES (20, 1, 1.0);
Expand Down Expand Up @@ -1226,6 +1226,7 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "-".
\set VERBOSITY terse
RESET timescaledb.materializations_per_refresh_window;
-- Test refresh with undefined invalidation threshold and variable sized buckets
CREATE TABLE timestamp_ht (
time timestamptz NOT NULL,
Expand Down Expand Up @@ -1275,3 +1276,41 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts
FROM temperature_4h_2
GROUP BY 1 ORDER BY 1;
NOTICE: continuous aggregate "temperature_1month_hierarchical_ts" is already up-to-date
---------------------------------------------------------------------
--- Issue 5474
---------------------------------------------------------------------
CREATE TABLE i5474 (
time timestamptz NOT NULL,
sensor_id integer NOT NULL,
cpu double precision NOT NULL,
temperature double precision NOT NULL);
SELECT create_hypertable('i5474','time');
create_hypertable
---------------------
(16,public,i5474,t)
(1 row)

CREATE MATERIALIZED VIEW i5474_summary_daily
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time, 'AWST') AS bucket,
sensor_id,
avg(cpu) AS avg_cpu
FROM i5474
GROUP BY bucket, sensor_id;
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
SELECT add_continuous_aggregate_policy('i5474_summary_daily',
start_offset => NULL,
end_offset => INTERVAL '10 MINUTES',
schedule_interval => INTERVAL '1 MINUTE'
) new_job_id \gset
-- Check that start_offset = NULL is handled properly by the refresh job...
CALL run_job(:new_job_id);
-- ...and the CAgg can be refreshed afterward
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00');
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0);
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
-- CAgg should be up-to-date now
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
40 changes: 39 additions & 1 deletion tsl/test/sql/cagg_invalidation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ WHERE cagg_id = :cond_10_id;
-- should trigger two individual refreshes
CALL refresh_continuous_aggregate('cond_10', 0, 200);

-- Allow at most 5 individual invalidations per refreshe
-- Allow at most 5 individual invalidations per refresh
SET timescaledb.materializations_per_refresh_window=5;

-- Insert into every second bucket
Expand Down Expand Up @@ -719,6 +719,7 @@ SET timescaledb.materializations_per_refresh_window='-';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
\set VERBOSITY terse
RESET timescaledb.materializations_per_refresh_window;

-- Test refresh with undefined invalidation threshold and variable sized buckets
CREATE TABLE timestamp_ht (
Expand Down Expand Up @@ -765,3 +766,40 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts
SELECT time_bucket('1 month', bucket_4h, 'Europe/Berlin'), avg(average)
FROM temperature_4h_2
GROUP BY 1 ORDER BY 1;

---------------------------------------------------------------------
--- Issue 5474
---------------------------------------------------------------------
CREATE TABLE i5474 (
time timestamptz NOT NULL,
sensor_id integer NOT NULL,
cpu double precision NOT NULL,
temperature double precision NOT NULL);

SELECT create_hypertable('i5474','time');

CREATE MATERIALIZED VIEW i5474_summary_daily
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time, 'AWST') AS bucket,
sensor_id,
avg(cpu) AS avg_cpu
FROM i5474
GROUP BY bucket, sensor_id;

SELECT add_continuous_aggregate_policy('i5474_summary_daily',
start_offset => NULL,
end_offset => INTERVAL '10 MINUTES',
schedule_interval => INTERVAL '1 MINUTE'
) new_job_id \gset

-- Check that start_offset = NULL is handled properly by the refresh job...
CALL run_job(:new_job_id);

-- ...and the CAgg can be refreshed afterward
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00');
INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0);
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');

-- CAgg should be up-to-date now
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');

0 comments on commit ef860e9

Please sign in to comment.