Skip to content

Commit

Permalink
Change hypertable foreign key handling
Browse files Browse the repository at this point in the history
Don't copy foreign key constraints to the individual chunks and
instead modify the lookup query to propagate to individual chunks
to mimic how postgres does this for partitioned tables.
  • Loading branch information
svenklemm committed Jul 22, 2024
1 parent 6d83e05 commit c8ec547
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 144 deletions.
19 changes: 19 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,22 @@ SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_

GRANT SELECT ON _timescaledb_catalog.chunk_column_stats TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk_column_stats_id_seq TO PUBLIC;

-- Remove foreign key constraints from compressed chunks
DO $$
DECLARE
conrelid regclass;
conname name;
BEGIN
FOR conrelid, conname IN
SELECT
con.conrelid::regclass,
con.conname
FROM _timescaledb_catalog.chunk ch
JOIN pg_constraint con ON con.conrelid = format('%I.%I',schema_name,table_name)::regclass AND con.contype='f'
WHERE NOT ch.dropped AND EXISTS(SELECT FROM _timescaledb_catalog.chunk ch2 WHERE NOT ch2.dropped AND ch2.compressed_chunk_id=ch.id)
LOOP
EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %I', conrelid, conname);
END LOOP;
END $$;

18 changes: 18 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,21 @@ DROP FUNCTION IF EXISTS @extschema@.disable_column_stats(REGCLASS, NAME, BOOLEAN
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.chunk_column_stats;
ALTER EXTENSION timescaledb DROP SEQUENCE _timescaledb_catalog.chunk_column_stats_id_seq;
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_column_stats;

-- Remove foreign key constraints from compressed chunks
DO $$
DECLARE
chunkrelid regclass;
conname name;
conoid oid;
BEGIN
FOR chunkrelid, conname, conoid IN
SELECT format('%I.%I',ch.schema_name,ch.table_name)::regclass, con.conname, con.oid
FROM _timescaledb_catalog.hypertable ht
JOIN pg_constraint con ON con.contype = 'f' AND con.conrelid=format('%I.%I',ht.schema_name,ht.table_name)::regclass
JOIN _timescaledb_catalog.chunk ch on ch.hypertable_id=ht.compressed_hypertable_id and not ch.dropped
LOOP
EXECUTE format('ALTER TABLE %s ADD CONSTRAINT %I %s', conrelid, conname, pg_get_constraintdef(conoid));
END LOOP;
END $$;

25 changes: 4 additions & 21 deletions src/chunk_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id, ChunkCo
}

static bool
chunk_constraint_need_on_chunk(const char chunk_relkind, Form_pg_constraint conform)
chunk_constraint_need_on_chunk(Form_pg_constraint conform)
{
if (conform->contype == CONSTRAINT_CHECK)
{
Expand All @@ -753,26 +753,9 @@ chunk_constraint_need_on_chunk(const char chunk_relkind, Form_pg_constraint conf
if (conform->contype == CONSTRAINT_FOREIGN && OidIsValid(conform->conparentid))
return false;

/* Foreign tables do not support non-check constraints, so skip them */
if (chunk_relkind == RELKIND_FOREIGN_TABLE)
return false;

return true;
}

static bool
chunk_constraint_is_check(const char chunk_relkind, Form_pg_constraint conform)
{
if (conform->contype == CONSTRAINT_CHECK)
{
/*
* check constraints supported on foreign tables (like OSM chunks)
*/
return true;
}
return false;
}

int
ts_chunk_constraints_add_dimension_constraints(ChunkConstraints *ccs, int32 chunk_id,
const Hypercube *cube)
Expand All @@ -799,7 +782,7 @@ chunk_constraint_add(HeapTuple constraint_tuple, void *arg)
ConstraintContext *cc = arg;
Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(constraint_tuple);

if (chunk_constraint_need_on_chunk(cc->chunk_relkind, constraint))
if (cc->chunk_relkind != RELKIND_FOREIGN_TABLE && chunk_constraint_need_on_chunk(constraint))
{
ts_chunk_constraints_add(cc->ccs, cc->chunk_id, 0, NULL, NameStr(constraint->conname));
return CONSTR_PROCESSED;
Expand Down Expand Up @@ -828,7 +811,7 @@ chunk_constraint_add_check(HeapTuple constraint_tuple, void *arg)
ConstraintContext *cc = arg;
Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(constraint_tuple);

if (chunk_constraint_is_check(cc->chunk_relkind, constraint))
if (constraint->contype == CONSTRAINT_CHECK)
{
ts_chunk_constraints_add(cc->ccs,
cc->chunk_id,
Expand Down Expand Up @@ -867,7 +850,7 @@ ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk, Oi

con = (Form_pg_constraint) GETSTRUCT(tuple);

if (chunk_constraint_need_on_chunk(chunk->relkind, con))
if (chunk->relkind != RELKIND_FOREIGN_TABLE && chunk_constraint_need_on_chunk(con))
{
ChunkConstraint *cc = ts_chunk_constraints_add(chunk->constraints,
chunk->fd.id,
Expand Down
43 changes: 43 additions & 0 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,49 @@ preprocess_query(Node *node, PreprocessQueryContext *context)
* src/backend/utils/adt/ri_triggers.c
*/

/*
* RI_FKey_cascade_del
*
* DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
*/
if (query->commandType == CMD_DELETE && list_length(query->rtable) == 1 &&
context->root->glob->boundParams && query->jointree->quals &&
IsA(query->jointree->quals, OpExpr))
{
RangeTblEntry *rte = linitial_node(RangeTblEntry, query->rtable);
if (!rte->inh && rte->rtekind == RTE_RELATION)
{
Hypertable *ht =
ts_hypertable_cache_get_entry(hcache, rte->relid, CACHE_FLAG_MISSING_OK);
if (ht)
{
rte->inh = true;
}
}
}

/*
* RI_FKey_cascade_upd
*
* UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
* WHERE $n = fkatt1 [AND ...]
*/
if (query->commandType == CMD_UPDATE && list_length(query->rtable) == 1 &&
context->root->glob->boundParams && query->jointree->quals &&
IsA(query->jointree->quals, OpExpr))
{
RangeTblEntry *rte = linitial_node(RangeTblEntry, query->rtable);
if (!rte->inh && rte->rtekind == RTE_RELATION)
{
Hypertable *ht =
ts_hypertable_cache_get_entry(hcache, rte->relid, CACHE_FLAG_MISSING_OK);
if (ht)
{
rte->inh = true;
}
}
}

/*
* RI_FKey_check
*
Expand Down
12 changes: 12 additions & 0 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ check_chunk_alter_table_operation_allowed(Oid relid, AlterTableStmt *stmt)
}
break;
}
case AT_DropConstraint:
{
/* if this is an OSM chunk, block the operation */
Chunk *chunk = ts_chunk_get_by_relid(relid, false /* fail_if_not_found */);
if (chunk && chunk->fd.osm_chunk)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("operation not supported on OSM chunk tables")));
}
break;
}
default:
/* disable by default */
all_allowed = false;
Expand Down
2 changes: 1 addition & 1 deletion test/expected/rowsecurity-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -4796,7 +4796,7 @@ ALTER TABLE r2 FORCE ROW LEVEL SECURITY;
UPDATE r1 SET a = a+5;
ERROR: new row for relation "_hyper_28_117_chunk" violates check constraint "constraint_117"
DETAIL: Failing row contains (15).
CONTEXT: SQL statement "UPDATE ONLY "_timescaledb_internal"."_hyper_28_117_chunk" SET "a" = $1 WHERE $2 OPERATOR(pg_catalog.=) "a""
CONTEXT: SQL statement "UPDATE ONLY "regress_rls_schema"."r2" SET "a" = $1 WHERE $2 OPERATOR(pg_catalog.=) "a""
-- Remove FORCE from r2
ALTER TABLE r2 NO FORCE ROW LEVEL SECURITY;
-- As owner, we now bypass RLS
Expand Down
15 changes: 15 additions & 0 deletions test/sql/updates/setup.compression.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,18 @@ $$;
\if :WITH_ROLES
GRANT SELECT ON compress TO tsdbadmin;
\endif

-- test foreign key constraint handling with compression
CREATE TABLE pk_table(time timestamptz PRIMARY KEY);
INSERT INTO pk_table VALUES ('2024-01-01 00:00:00');

CREATE TABLE fk_table ( time timestamptz, time_fk timestamptz REFERENCES pk_table(time));

SELECT table_name FROM create_hypertable('fk_table', 'time');

ALTER TABLE fk_table SET (timescaledb.compress, timescaledb.compress_segmentby='time_fk');
INSERT INTO fk_table SELECT '2024-01-01 00:00:00', '2024-01-01 00:00:00';
INSERT INTO fk_table SELECT '2024-02-01 00:00:00', '2024-01-01 00:00:00';

SELECT compress_chunk(ch, true) FROM show_chunks('fk_table') ch;

55 changes: 0 additions & 55 deletions tsl/src/compression/compression_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
static void set_toast_tuple_target_on_chunk(Oid compressed_table_id);
static void set_statistics_on_compressed_chunk(Oid compressed_table_id);
static void create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings);
static void clone_constraints_to_chunk(Oid ht_reloid, const Chunk *compressed_chunk);
static List *get_fk_constraints(Oid reloid);

int32
compression_hypertable_create(Hypertable *ht, Oid owner, Oid tablespace_oid)
Expand Down Expand Up @@ -149,8 +147,6 @@ compression_chunk_create(Chunk *src_chunk, Chunk *chunk, List *column_defs, Oid

create_compressed_chunk_indexes(chunk, settings);

clone_constraints_to_chunk(src_chunk->hypertable_relid, chunk);

return chunk->table_id;
}

Expand Down Expand Up @@ -346,54 +342,3 @@ create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings)

ReleaseSysCache(index_tuple);
}

static void
clone_constraints_to_chunk(Oid ht_reloid, const Chunk *compressed_chunk)
{
CatalogSecurityContext sec_ctx;
List *constraint_list = get_fk_constraints(ht_reloid);

ListCell *lc;
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
foreach (lc, constraint_list)
{
Oid conoid = lfirst_oid(lc);
CatalogInternalCall2(DDL_CONSTRAINT_CLONE,
Int32GetDatum(conoid),
Int32GetDatum(compressed_chunk->table_id));
}
ts_catalog_restore_user(&sec_ctx);
}

static List *
get_fk_constraints(Oid reloid)
{
SysScanDesc scan;
ScanKeyData scankey;
HeapTuple tuple;
List *conlist = NIL;

Relation pg_constr = table_open(ConstraintRelationId, AccessShareLock);

ScanKeyInit(&scankey,
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber,
F_OIDEQ,
ObjectIdGetDatum(reloid));

scan = systable_beginscan(pg_constr, ConstraintRelidTypidNameIndexId, true, NULL, 1, &scankey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_constraint form = (Form_pg_constraint) GETSTRUCT(tuple);

if (form->contype == CONSTRAINT_FOREIGN)
{
conlist = lappend_oid(conlist, form->oid);
}
}

systable_endscan(scan);
table_close(pg_constr, AccessShareLock);

return conlist;
}
49 changes: 12 additions & 37 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,13 @@ add_time_to_order_by_if_not_included(OrderBySettings obs, ArrayType *segmentby,
/* returns list of constraints that need to be cloned on the compressed hypertable
* This is limited to foreign key constraints now
*/
static List *
static void
validate_existing_constraints(Hypertable *ht, CompressionSettings *settings)
{
Relation pg_constr;
SysScanDesc scan;
ScanKeyData scankey;
HeapTuple tuple;
List *conlist = NIL;

ArrayType *arr;

Expand All @@ -537,11 +536,17 @@ validate_existing_constraints(Hypertable *ht, CompressionSettings *settings)
Form_pg_constraint form = (Form_pg_constraint) GETSTRUCT(tuple);

/*
* We check primary, unique, and exclusion constraints. Move foreign
* key constraints over to compression table ignore triggers
* We check primary, unique, and exclusion constraints.
*/
if (form->contype == CONSTRAINT_CHECK || form->contype == CONSTRAINT_TRIGGER)
if (form->contype == CONSTRAINT_CHECK || form->contype == CONSTRAINT_TRIGGER
#if PG17_GE
|| form->contype == CONSTRAINT_NOTNULL
/* CONSTRAINT_NOTNULL introduced in PG17, see b0e96f311985 */
#endif
)
{
continue;
}
else if (form->contype == CONSTRAINT_EXCLUSION)
{
ereport(ERROR,
Expand Down Expand Up @@ -573,53 +578,23 @@ validate_existing_constraints(Hypertable *ht, CompressionSettings *settings)

arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
numkeys = ts_array_length(arr);
if (ARR_NDIM(arr) != 1 || numkeys < 0 || ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != INT2OID)
elog(ERROR, "conkey is not a 1-D smallint array");
attnums = (int16 *) ARR_DATA_PTR(arr);
for (j = 0; j < numkeys; j++)
{
const char *attname = get_attname(settings->fd.relid, attnums[j], false);

if (form->contype == CONSTRAINT_FOREIGN)
{
/* is this a segment-by column */
if (!ts_array_is_member(settings->fd.segmentby, attname))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("column \"%s\" must be used for segmenting", attname),
errdetail("The foreign key constraint \"%s\" cannot be"
" enforced with the given compression configuration.",
NameStr(form->conname))));
}
#if PG17_GE
else if (form->contype == CONSTRAINT_NOTNULL)
{
/* CONSTRAINT_NOTNULL introduced in PG17, see b0e96f311985 */
continue;
}
#endif
/* is colno a segment-by or order_by column */
else if (!form->conindid && !ts_array_is_member(settings->fd.segmentby, attname) &&
!ts_array_is_member(settings->fd.orderby, attname))
if (!form->conindid && !ts_array_is_member(settings->fd.segmentby, attname) &&
!ts_array_is_member(settings->fd.orderby, attname))
ereport(WARNING,
(errmsg("column \"%s\" should be used for segmenting or ordering",
attname)));
}

if (form->contype == CONSTRAINT_FOREIGN)
{
Name conname = palloc0(NAMEDATALEN);
namestrcpy(conname, NameStr(form->conname));
conlist = lappend(conlist, conname);
}
}
}

systable_endscan(scan);
table_close(pg_constr, AccessShareLock);

return conlist;
}

/*
Expand Down
11 changes: 5 additions & 6 deletions tsl/test/expected/compression_fks.out
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ INSERT INTO ht_with_fk SELECT '2000-01-01 0:00:01';
ERROR: insert or update on table "_hyper_1_2_chunk" violates foreign key constraint "2_2_keys"
\set ON_ERROR_STOP 1
SELECT conrelid::regclass,conname,confrelid::regclass FROM pg_constraint WHERE contype = 'f' AND confrelid = 'keys'::regclass ORDER BY conrelid::regclass::text COLLATE "C",conname;
conrelid | conname | confrelid
------------------------------------------------+----------+-----------
_timescaledb_internal._hyper_1_2_chunk | 2_2_keys | keys
_timescaledb_internal.compress_hyper_2_3_chunk | keys | keys
ht_with_fk | keys | keys
(3 rows)
conrelid | conname | confrelid
----------------------------------------+----------+-----------
_timescaledb_internal._hyper_1_2_chunk | 2_2_keys | keys
ht_with_fk | keys | keys
(2 rows)

Loading

0 comments on commit c8ec547

Please sign in to comment.