From 55ae29cf16541876fd7f7a43cec14d6a5fe88722 Mon Sep 17 00:00:00 2001 From: Konstantina Skovola Date: Tue, 16 Jan 2024 15:10:09 +0200 Subject: [PATCH] Fix if_not_exists behavior for CAgg policy with NULL offsets Fixes #5688 --- .unreleased/bugfix_5688 | 3 + tsl/src/bgw_policy/compression_api.c | 6 +- tsl/src/bgw_policy/continuous_aggregate_api.c | 6 +- tsl/src/bgw_policy/policy_utils.c | 28 +++- tsl/src/bgw_policy/policy_utils.h | 3 +- tsl/src/bgw_policy/retention_api.c | 6 +- tsl/test/expected/cagg_policy.out | 139 ++++++++++++++++-- tsl/test/sql/cagg_policy.sql | 18 +++ 8 files changed, 187 insertions(+), 22 deletions(-) create mode 100644 .unreleased/bugfix_5688 diff --git a/.unreleased/bugfix_5688 b/.unreleased/bugfix_5688 new file mode 100644 index 00000000000..537345f579a --- /dev/null +++ b/.unreleased/bugfix_5688 @@ -0,0 +1,3 @@ +Implements: #6531 Fix if_not_exists behavior for CAgg policy with NULL offsets + +Fixes: #5688 diff --git a/tsl/src/bgw_policy/compression_api.c b/tsl/src/bgw_policy/compression_api.c index f721ad84ed2..7beaf1dcbb8 100644 --- a/tsl/src/bgw_policy/compression_api.c +++ b/tsl/src/bgw_policy/compression_api.c @@ -212,7 +212,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER, partitioning_type, compress_after_type, - compress_after_datum); + compress_after_datum, + false /* isnull */); else { Assert(created_before != NULL); @@ -221,7 +222,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum, POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE, partitioning_type, INTERVALOID, - IntervalPGetDatum(created_before)); + IntervalPGetDatum(created_before), + false /* isnull */); } if (is_equal) diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.c b/tsl/src/bgw_policy/continuous_aggregate_api.c index 517229d4d49..19449d959ba 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.c +++ b/tsl/src/bgw_policy/continuous_aggregate_api.c @@ -571,12 +571,14 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa POL_REFRESH_CONF_KEY_START_OFFSET, cagg->partition_type, policyconf.offset_start.type, - policyconf.offset_start.value) && + policyconf.offset_start.value, + policyconf.offset_start.isnull) && policy_config_check_hypertable_lag_equality(existing->fd.config, POL_REFRESH_CONF_KEY_END_OFFSET, cagg->partition_type, policyconf.offset_end.type, - policyconf.offset_end.value)) + policyconf.offset_end.value, + policyconf.offset_end.isnull)) { /* If all arguments are the same, do nothing */ ereport(NOTICE, diff --git a/tsl/src/bgw_policy/policy_utils.c b/tsl/src/bgw_policy/policy_utils.c index 1a56ec353d9..62d94e8b036 100644 --- a/tsl/src/bgw_policy/policy_utils.c +++ b/tsl/src/bgw_policy/policy_utils.c @@ -14,6 +14,7 @@ #include "jsonb_utils.h" #include "policy_utils.h" #include "time_utils.h" +#include "policies_v2.h" /* Helper function to compare jsonb label value in the config * with passed in value. @@ -30,18 +31,33 @@ */ bool policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label, - Oid partitioning_type, Oid lag_type, Datum lag_datum) + Oid partitioning_type, Oid lag_type, Datum lag_datum, + bool isnull) { + /* + * start_offset and end_offset for CAgg policies are allowed to have NULL values + * In that case, config_value will be NULL but this is not an error + */ + + bool null_ok = (strcmp(json_label, POL_REFRESH_CONF_KEY_END_OFFSET) == 0 || + strcmp(json_label, POL_REFRESH_CONF_KEY_START_OFFSET) == 0); + if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID) { bool found; int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found); - if (!found) + if (!found && !null_ok) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("could not find %s in config for existing job", json_label))); + if (!found && isnull) + return true; + + if ((!found && !isnull) || (found && isnull)) + return false; + switch (lag_type) { case INT2OID: @@ -59,11 +75,17 @@ policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_labe if (lag_type != INTERVALOID) return false; Interval *config_value = ts_jsonb_get_interval_field(config, json_label); - if (config_value == NULL) + if (config_value == NULL && !null_ok) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("could not find %s in config for job", json_label))); + if (config_value == NULL && isnull) + return true; + + if ((config_value == NULL && !isnull) || (config_value != NULL && isnull)) + return false; + return DatumGetBool( DirectFunctionCall2(interval_eq, IntervalPGetDatum(config_value), lag_datum)); } diff --git a/tsl/src/bgw_policy/policy_utils.h b/tsl/src/bgw_policy/policy_utils.h index 9b973268b27..e668b90b77d 100644 --- a/tsl/src/bgw_policy/policy_utils.h +++ b/tsl/src/bgw_policy/policy_utils.h @@ -8,7 +8,8 @@ #include #include "job.h" bool policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label, - Oid dim_type, Oid lag_type, Datum lag_datum); + Oid dim_type, Oid lag_type, Datum lag_datum, + bool isnull); int64 subtract_integer_from_now_internal(int64 interval, Oid time_dim_type, Oid now_func, bool *overflow); Datum subtract_interval_from_now(Interval *lag, Oid time_dim_type); diff --git a/tsl/src/bgw_policy/retention_api.c b/tsl/src/bgw_policy/retention_api.c index 72f92b4322e..34492725bad 100644 --- a/tsl/src/bgw_policy/retention_api.c +++ b/tsl/src/bgw_policy/retention_api.c @@ -218,7 +218,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, POL_RETENTION_CONF_KEY_DROP_AFTER, partitioning_type, window_type, - window_datum); + window_datum, + false /* isnull */); else { Assert(created_before != NULL); @@ -227,7 +228,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE, partitioning_type, INTERVALOID, - IntervalPGetDatum(created_before)); + IntervalPGetDatum(created_before), + false /* isnull */); } if (is_equal) diff --git a/tsl/test/expected/cagg_policy.out b/tsl/test/expected/cagg_policy.out index f7ee678964e..d795c9df0b9 100644 --- a/tsl/test/expected/cagg_policy.out +++ b/tsl/test/expected/cagg_policy.out @@ -273,7 +273,14 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; \set ON_ERROR_STOP 0 \set VERBOSITY default SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true); -ERROR: could not find start_offset in config for existing job +WARNING: continuous aggregate policy already exists for "mat_m1" +DETAIL: A policy already exists with different arguments. +HINT: Remove the existing policy before adding a new one. + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + SELECT remove_continuous_aggregate_policy('int_tab'); ERROR: "int_tab" is not a continuous aggregate SELECT remove_continuous_aggregate_policy('mat_m1'); @@ -282,6 +289,63 @@ SELECT remove_continuous_aggregate_policy('mat_m1'); (1 row) +-- add with NULL offset, readd with NULL offset +SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); + add_continuous_aggregate_policy +--------------------------------- + 1010 +(1 row) + +SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE +NOTICE: continuous aggregate policy already exists for "mat_m1", skipping + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING +WARNING: continuous aggregate policy already exists for "mat_m1" +DETAIL: A policy already exists with different arguments. +HINT: Remove the existing policy before adding a new one. + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT remove_continuous_aggregate_policy('mat_m1'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + +SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true); + add_continuous_aggregate_policy +--------------------------------- + 1011 +(1 row) + +SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true); +NOTICE: continuous aggregate policy already exists for "mat_m1", skipping + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); +WARNING: continuous aggregate policy already exists for "mat_m1" +DETAIL: A policy already exists with different arguments. +HINT: Remove the existing policy before adding a new one. + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT remove_continuous_aggregate_policy('mat_m1'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + --this one will fail SELECT remove_continuous_aggregate_policy('mat_m1'); ERROR: continuous aggregate policy not found for "mat_m1" @@ -523,7 +587,7 @@ SELECT timescaledb_experimental.show_policies('max_mat_view_date'); SELECT add_retention_policy('continuous_agg_max_mat_date', '25 days'::interval); add_retention_policy ---------------------- - 1025 + 1027 (1 row) SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset =>'25 days'::interval); @@ -613,13 +677,13 @@ SELECT add_job('custom_func','1h', config:='{"type":"function"}'::jsonb, initial SELECT _timescaledb_functions.alter_job_set_hypertable_id( :job_id, 'max_mat_view_date'::regclass); alter_job_set_hypertable_id ----------------------------- - 1027 + 1029 (1 row) SELECT * FROM timescaledb_information.jobs WHERE job_id != 1 ORDER BY 1; job_id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | config | next_start | initial_start | hypertable_schema | hypertable_name | check_schema | check_name --------+----------------------------+-------------------+-------------+-------------+--------------+-------------+-------------+-------------------+-----------+----------------+----------------------+------------------------------+------------------------------+-----------------------+----------------------------+--------------+------------ - 1027 | User-Defined Action [1027] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | t | {"type": "function"} | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:00 1999 PST | _timescaledb_internal | _materialized_hypertable_6 | | + 1029 | User-Defined Action [1029] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | t | {"type": "function"} | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:00 1999 PST | _timescaledb_internal | _materialized_hypertable_6 | | (1 row) SELECT timescaledb_experimental.remove_all_policies('max_mat_view_date', true); -- ignore custom job @@ -665,7 +729,7 @@ DETAIL: The start and end offsets must cover at least two buckets in the valid SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval); add_continuous_aggregate_policy --------------------------------- - 1028 + 1030 (1 row) SELECT remove_continuous_aggregate_policy('max_mat_view_date'); @@ -678,7 +742,7 @@ SELECT remove_continuous_aggregate_policy('max_mat_view_date'); SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval); add_continuous_aggregate_policy --------------------------------- - 1029 + 1031 (1 row) SELECT remove_continuous_aggregate_policy('max_mat_view_date'); @@ -721,7 +785,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval); add_continuous_aggregate_policy --------------------------------- - 1031 + 1033 (1 row) SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); @@ -767,7 +831,12 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; \set ON_ERROR_STOP 0 SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true); -ERROR: could not find start_offset in config for job +WARNING: continuous aggregate policy already exists for "max_mat_view_timestamp" + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true); ERROR: invalid input syntax for type interval: "xyz" \set ON_ERROR_STOP 1 @@ -1086,13 +1155,13 @@ ERROR: compress_after value for compression policy should be greater than the s SELECT add_compression_policy('mat_smallint', 5::smallint); add_compression_policy ------------------------ - 1051 + 1053 (1 row) SELECT add_compression_policy('mat_bigint', 20::bigint); add_compression_policy ------------------------ - 1052 + 1054 (1 row) -- end of coverage tests @@ -1137,6 +1206,52 @@ SELECT time_bucket('1 day', time) as dayb, device_id, FROM metrics GROUP BY 1, 2 WITH NO DATA; +-- this was previously crashing +SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true); + add_continuous_aggregate_policy +--------------------------------- + 1055 +(1 row) + +SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true); +WARNING: continuous aggregate policy already exists for "metrics_cagg" + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT remove_continuous_aggregate_policy('metrics_cagg'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + +SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); + add_continuous_aggregate_policy +--------------------------------- + 1056 +(1 row) + +SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE +NOTICE: continuous aggregate policy already exists for "metrics_cagg", skipping + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING +WARNING: continuous aggregate policy already exists for "metrics_cagg" + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +SELECT remove_continuous_aggregate_policy('metrics_cagg'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + --can set compression policy only after setting up refresh policy -- \set ON_ERROR_STOP 0 SELECT add_compression_policy('metrics_cagg', '1 day'::interval); @@ -1155,7 +1270,7 @@ ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_ SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; COMP_JOB ---------- - 1054 + 1058 (1 row) SELECT remove_compression_policy('metrics_cagg'); @@ -1297,7 +1412,7 @@ SELECT set_integer_now_func('t', 'unix_now'); SELECT add_retention_policy('t', 20); add_retention_policy ---------------------- - 1056 + 1060 (1 row) CREATE MATERIALIZED VIEW cagg(a, sumb) WITH (timescaledb.continuous) diff --git a/tsl/test/sql/cagg_policy.sql b/tsl/test/sql/cagg_policy.sql index c752c422723..42822549ec1 100644 --- a/tsl/test/sql/cagg_policy.sql +++ b/tsl/test/sql/cagg_policy.sql @@ -141,6 +141,16 @@ SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_ SELECT remove_continuous_aggregate_policy('int_tab'); SELECT remove_continuous_aggregate_policy('mat_m1'); +-- add with NULL offset, readd with NULL offset +SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE +SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING +SELECT remove_continuous_aggregate_policy('mat_m1'); +SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); +SELECT remove_continuous_aggregate_policy('mat_m1'); + --this one will fail SELECT remove_continuous_aggregate_policy('mat_m1'); SELECT remove_continuous_aggregate_policy('mat_m1', if_not_exists=>true); @@ -533,6 +543,14 @@ FROM metrics GROUP BY 1, 2 WITH NO DATA; +-- this was previously crashing +SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true); +SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true); +SELECT remove_continuous_aggregate_policy('metrics_cagg'); +SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE +SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING +SELECT remove_continuous_aggregate_policy('metrics_cagg'); --can set compression policy only after setting up refresh policy -- \set ON_ERROR_STOP 0 SELECT add_compression_policy('metrics_cagg', '1 day'::interval);