Skip to content

Commit

Permalink
Fix if_not_exists behavior for CAgg policy with NULL offsets
Browse files Browse the repository at this point in the history
Fixes #5688
  • Loading branch information
konskov committed Jan 16, 2024
1 parent 45bc8a0 commit 23dbe23
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .unreleased/bugfix_5688
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Implements: #6531 Fix if_not_exists behavior for CAgg policy with NULL offsets

Fixes: #5688
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
partitioning_type,
compress_after_type,
compress_after_datum);
compress_after_datum,
false /* isnull */);
else
{
Assert(created_before != NULL);
Expand All @@ -221,7 +222,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
partitioning_type,
INTERVALOID,
IntervalPGetDatum(created_before));
IntervalPGetDatum(created_before),
false /* isnull */);
}

if (is_equal)
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,14 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
POL_REFRESH_CONF_KEY_START_OFFSET,
cagg->partition_type,
policyconf.offset_start.type,
policyconf.offset_start.value) &&
policyconf.offset_start.value,
policyconf.offset_start.isnull) &&
policy_config_check_hypertable_lag_equality(existing->fd.config,
POL_REFRESH_CONF_KEY_END_OFFSET,
cagg->partition_type,
policyconf.offset_end.type,
policyconf.offset_end.value))
policyconf.offset_end.value,
policyconf.offset_end.isnull))
{
/* If all arguments are the same, do nothing */
ereport(NOTICE,
Expand Down
25 changes: 22 additions & 3 deletions tsl/src/bgw_policy/policy_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "jsonb_utils.h"
#include "policy_utils.h"
#include "time_utils.h"
#include "policies_v2.h"

/* Helper function to compare jsonb label value in the config
* with passed in value.
Expand All @@ -30,18 +31,30 @@
*/
bool
policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
Oid partitioning_type, Oid lag_type, Datum lag_datum)
Oid partitioning_type, Oid lag_type, Datum lag_datum,
bool isnull)
{
/*
* start_offset and end_offset for CAgg policies are allowed to have NULL values
* In that case, config_value will be NULL but this is not an error
*/

bool null_ok = (strcmp(json_label, POL_REFRESH_CONF_KEY_END_OFFSET) == 0 ||
strcmp(json_label, POL_REFRESH_CONF_KEY_START_OFFSET) == 0);

if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID)
{
bool found;
int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found);

if (!found)
if (!found && !null_ok)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for existing job", json_label)));

if (!found && isnull)
return true;

Check warning on line 56 in tsl/src/bgw_policy/policy_utils.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/policy_utils.c#L56

Added line #L56 was not covered by tests

switch (lag_type)
{
case INT2OID:
Expand All @@ -59,11 +72,17 @@ policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_labe
if (lag_type != INTERVALOID)
return false;
Interval *config_value = ts_jsonb_get_interval_field(config, json_label);
if (config_value == NULL)
if (config_value == NULL && !null_ok)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", json_label)));

if (config_value == NULL && isnull)
return true;

Check warning on line 81 in tsl/src/bgw_policy/policy_utils.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/bgw_policy/policy_utils.c#L81

Added line #L81 was not covered by tests

if (config_value == NULL && !isnull)
return false;

return DatumGetBool(
DirectFunctionCall2(interval_eq, IntervalPGetDatum(config_value), lag_datum));
}
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/policy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#include <postgres.h>
#include "job.h"
bool policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
Oid dim_type, Oid lag_type, Datum lag_datum);
Oid dim_type, Oid lag_type, Datum lag_datum,
bool isnull);
int64 subtract_integer_from_now_internal(int64 interval, Oid time_dim_type, Oid now_func,
bool *overflow);
Datum subtract_interval_from_now(Interval *lag, Oid time_dim_type);
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/retention_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
POL_RETENTION_CONF_KEY_DROP_AFTER,
partitioning_type,
window_type,
window_datum);
window_datum,
false /* isnull */);
else
{
Assert(created_before != NULL);
Expand All @@ -227,7 +228,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
partitioning_type,
INTERVALOID,
IntervalPGetDatum(created_before));
IntervalPGetDatum(created_before),
false /* isnull */);
}

if (is_equal)
Expand Down
35 changes: 31 additions & 4 deletions tsl/test/expected/cagg_policy.out
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,14 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for existing job
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('int_tab');
ERROR: "int_tab" is not a continuous aggregate
SELECT remove_continuous_aggregate_policy('mat_m1');
Expand Down Expand Up @@ -767,7 +774,12 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for job
WARNING: continuous aggregate policy already exists for "max_mat_view_timestamp"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
\set ON_ERROR_STOP 1
Expand Down Expand Up @@ -1137,6 +1149,21 @@ SELECT time_bucket('1 day', time) as dayb, device_id,
FROM metrics
GROUP BY 1, 2
WITH NO DATA;
-- this was previously crashing
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true) as remove_this_policy \gset
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
WARNING: continuous aggregate policy already exists for "metrics_cagg"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT delete_job(:remove_this_policy);
delete_job
------------

(1 row)

--can set compression policy only after setting up refresh policy --
\set ON_ERROR_STOP 0
SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
Expand All @@ -1155,7 +1182,7 @@ ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
COMP_JOB
----------
1054
1055
(1 row)

SELECT remove_compression_policy('metrics_cagg');
Expand Down Expand Up @@ -1297,7 +1324,7 @@ SELECT set_integer_now_func('t', 'unix_now');
SELECT add_retention_policy('t', 20);
add_retention_policy
----------------------
1056
1057
(1 row)

CREATE MATERIALIZED VIEW cagg(a, sumb) WITH (timescaledb.continuous)
Expand Down
5 changes: 5 additions & 0 deletions tsl/test/sql/cagg_policy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ FROM metrics
GROUP BY 1, 2
WITH NO DATA;

-- this was previously crashing
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true) as remove_this_policy \gset
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
SELECT delete_job(:remove_this_policy);

--can set compression policy only after setting up refresh policy --
\set ON_ERROR_STOP 0
SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
Expand Down

0 comments on commit 23dbe23

Please sign in to comment.