Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix if_not_exists behavior for CAgg policy with NULL offsets #6531

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .unreleased/bugfix_5688
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Implements: #6531 Fix if_not_exists behavior for CAgg policy with NULL offsets

Fixes: #5688
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
partitioning_type,
compress_after_type,
compress_after_datum);
compress_after_datum,
false /* isnull */);
else
{
Assert(created_before != NULL);
Expand All @@ -221,7 +222,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
partitioning_type,
INTERVALOID,
IntervalPGetDatum(created_before));
IntervalPGetDatum(created_before),
false /* isnull */);
}

if (is_equal)
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,14 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
POL_REFRESH_CONF_KEY_START_OFFSET,
cagg->partition_type,
policyconf.offset_start.type,
policyconf.offset_start.value) &&
policyconf.offset_start.value,
policyconf.offset_start.isnull) &&
policy_config_check_hypertable_lag_equality(existing->fd.config,
POL_REFRESH_CONF_KEY_END_OFFSET,
cagg->partition_type,
policyconf.offset_end.type,
policyconf.offset_end.value))
policyconf.offset_end.value,
policyconf.offset_end.isnull))
{
/* If all arguments are the same, do nothing */
ereport(NOTICE,
Expand Down
28 changes: 25 additions & 3 deletions tsl/src/bgw_policy/policy_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "jsonb_utils.h"
#include "policy_utils.h"
#include "time_utils.h"
#include "policies_v2.h"

/* Helper function to compare jsonb label value in the config
* with passed in value.
Expand All @@ -30,18 +31,33 @@
*/
bool
policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
Oid partitioning_type, Oid lag_type, Datum lag_datum)
Oid partitioning_type, Oid lag_type, Datum lag_datum,
bool isnull)
{
/*
* start_offset and end_offset for CAgg policies are allowed to have NULL values
* In that case, config_value will be NULL but this is not an error
*/

bool null_ok = (strcmp(json_label, POL_REFRESH_CONF_KEY_END_OFFSET) == 0 ||
strcmp(json_label, POL_REFRESH_CONF_KEY_START_OFFSET) == 0);

if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID)
{
bool found;
int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found);

if (!found)
if (!found && !null_ok)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for existing job", json_label)));

if (!found && isnull)
return true;

if ((!found && !isnull) || (found && isnull))
return false;

switch (lag_type)
{
case INT2OID:
Expand All @@ -59,11 +75,17 @@ policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_labe
if (lag_type != INTERVALOID)
return false;
Interval *config_value = ts_jsonb_get_interval_field(config, json_label);
if (config_value == NULL)
if (config_value == NULL && !null_ok)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", json_label)));

if (config_value == NULL && isnull)
return true;

if ((config_value == NULL && !isnull) || (config_value != NULL && isnull))
return false;

return DatumGetBool(
DirectFunctionCall2(interval_eq, IntervalPGetDatum(config_value), lag_datum));
}
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/policy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#include <postgres.h>
#include "job.h"
bool policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
Oid dim_type, Oid lag_type, Datum lag_datum);
Oid dim_type, Oid lag_type, Datum lag_datum,
bool isnull);
int64 subtract_integer_from_now_internal(int64 interval, Oid time_dim_type, Oid now_func,
bool *overflow);
Datum subtract_interval_from_now(Interval *lag, Oid time_dim_type);
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/retention_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
POL_RETENTION_CONF_KEY_DROP_AFTER,
partitioning_type,
window_type,
window_datum);
window_datum,
false /* isnull */);
else
{
Assert(created_before != NULL);
Expand All @@ -227,7 +228,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
partitioning_type,
INTERVALOID,
IntervalPGetDatum(created_before));
IntervalPGetDatum(created_before),
false /* isnull */);
}

if (is_equal)
Expand Down
139 changes: 127 additions & 12 deletions tsl/test/expected/cagg_policy.out
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,14 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for existing job
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('int_tab');
ERROR: "int_tab" is not a continuous aggregate
SELECT remove_continuous_aggregate_policy('mat_m1');
Expand All @@ -282,6 +289,63 @@ SELECT remove_continuous_aggregate_policy('mat_m1');

(1 row)

-- add with NULL offset, readd with NULL offset
SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1010
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
NOTICE: continuous aggregate policy already exists for "mat_m1", skipping
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('mat_m1');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1011
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
NOTICE: continuous aggregate policy already exists for "mat_m1", skipping
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true);
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('mat_m1');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

--this one will fail
SELECT remove_continuous_aggregate_policy('mat_m1');
ERROR: continuous aggregate policy not found for "mat_m1"
Expand Down Expand Up @@ -523,7 +587,7 @@ SELECT timescaledb_experimental.show_policies('max_mat_view_date');
SELECT add_retention_policy('continuous_agg_max_mat_date', '25 days'::interval);
add_retention_policy
----------------------
1025
1027
(1 row)

SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset =>'25 days'::interval);
Expand Down Expand Up @@ -613,13 +677,13 @@ SELECT add_job('custom_func','1h', config:='{"type":"function"}'::jsonb, initial
SELECT _timescaledb_functions.alter_job_set_hypertable_id( :job_id, 'max_mat_view_date'::regclass);
alter_job_set_hypertable_id
-----------------------------
1027
1029
(1 row)

SELECT * FROM timescaledb_information.jobs WHERE job_id != 1 ORDER BY 1;
job_id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | config | next_start | initial_start | hypertable_schema | hypertable_name | check_schema | check_name
--------+----------------------------+-------------------+-------------+-------------+--------------+-------------+-------------+-------------------+-----------+----------------+----------------------+------------------------------+------------------------------+-----------------------+----------------------------+--------------+------------
1027 | User-Defined Action [1027] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | t | {"type": "function"} | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:00 1999 PST | _timescaledb_internal | _materialized_hypertable_6 | |
1029 | User-Defined Action [1029] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | t | {"type": "function"} | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:00 1999 PST | _timescaledb_internal | _materialized_hypertable_6 | |
(1 row)

SELECT timescaledb_experimental.remove_all_policies('max_mat_view_date', true); -- ignore custom job
Expand Down Expand Up @@ -665,7 +729,7 @@ DETAIL: The start and end offsets must cover at least two buckets in the valid
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1028
1030
(1 row)

SELECT remove_continuous_aggregate_policy('max_mat_view_date');
Expand All @@ -678,7 +742,7 @@ SELECT remove_continuous_aggregate_policy('max_mat_view_date');
SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1029
1031
(1 row)

SELECT remove_continuous_aggregate_policy('max_mat_view_date');
Expand Down Expand Up @@ -721,7 +785,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1031
1033
(1 row)

SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
Expand Down Expand Up @@ -767,7 +831,12 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for job
WARNING: continuous aggregate policy already exists for "max_mat_view_timestamp"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
\set ON_ERROR_STOP 1
Expand Down Expand Up @@ -1086,13 +1155,13 @@ ERROR: compress_after value for compression policy should be greater than the s
SELECT add_compression_policy('mat_smallint', 5::smallint);
add_compression_policy
------------------------
1051
1053
(1 row)

SELECT add_compression_policy('mat_bigint', 20::bigint);
add_compression_policy
------------------------
1052
1054
(1 row)

-- end of coverage tests
Expand Down Expand Up @@ -1137,6 +1206,52 @@ SELECT time_bucket('1 day', time) as dayb, device_id,
FROM metrics
GROUP BY 1, 2
WITH NO DATA;
-- this was previously crashing
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true);
add_continuous_aggregate_policy
---------------------------------
1055
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
WARNING: continuous aggregate policy already exists for "metrics_cagg"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('metrics_cagg');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1056
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
NOTICE: continuous aggregate policy already exists for "metrics_cagg", skipping
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
WARNING: continuous aggregate policy already exists for "metrics_cagg"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('metrics_cagg');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

--can set compression policy only after setting up refresh policy --
\set ON_ERROR_STOP 0
SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
Expand All @@ -1155,7 +1270,7 @@ ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
COMP_JOB
----------
1054
1058
(1 row)

SELECT remove_compression_policy('metrics_cagg');
Expand Down Expand Up @@ -1297,7 +1412,7 @@ SELECT set_integer_now_func('t', 'unix_now');
SELECT add_retention_policy('t', 20);
add_retention_policy
----------------------
1056
1060
(1 row)

CREATE MATERIALIZED VIEW cagg(a, sumb) WITH (timescaledb.continuous)
Expand Down
18 changes: 18 additions & 0 deletions tsl/test/sql/cagg_policy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_

SELECT remove_continuous_aggregate_policy('int_tab');
SELECT remove_continuous_aggregate_policy('mat_m1');
-- add with NULL offset, readd with NULL offset
SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
SELECT remove_continuous_aggregate_policy('mat_m1');
SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true);
SELECT remove_continuous_aggregate_policy('mat_m1');

--this one will fail
SELECT remove_continuous_aggregate_policy('mat_m1');
SELECT remove_continuous_aggregate_policy('mat_m1', if_not_exists=>true);
Expand Down Expand Up @@ -533,6 +543,14 @@ FROM metrics
GROUP BY 1, 2
WITH NO DATA;

-- this was previously crashing
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true);
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
SELECT remove_continuous_aggregate_policy('metrics_cagg');
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
SELECT remove_continuous_aggregate_policy('metrics_cagg');
--can set compression policy only after setting up refresh policy --
\set ON_ERROR_STOP 0
SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
Expand Down
Loading