Skip to content

Commit

Permalink
Add recompress optional argument to compress_chunk
Browse files Browse the repository at this point in the history
This patch deprecates the recompress_chunk procedure as all that
functionality is covered by compress_chunk now. This patch also adds a
new optional boolean argument to compress_chunk to force applying
changed compression settings to existing compressed chunks.
  • Loading branch information
svenklemm committed Feb 7, 2024
1 parent c5fd41c commit 101e4c5
Show file tree
Hide file tree
Showing 35 changed files with 704 additions and 453 deletions.
2 changes: 2 additions & 0 deletions .unreleased/pr_6609
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Implements: #6609 Deprecate recompress_chunk
Implements: #6609 Add optional recompress argument to compress_chunk
64 changes: 9 additions & 55 deletions sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.create_compressed_chunk(

CREATE OR REPLACE FUNCTION @extschema@.compress_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C STRICT VOLATILE;

CREATE OR REPLACE FUNCTION @extschema@.decompress_chunk(
Expand Down Expand Up @@ -62,61 +63,14 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.get_compressed_chunk_index_for
-- Parameters:
-- chunk: Chunk to recompress.
-- if_not_compressed: Print notice instead of error if chunk is already compressed.
CREATE OR REPLACE PROCEDURE @extschema@.recompress_chunk(chunk REGCLASS,
if_not_compressed BOOLEAN = true)
AS $$
DECLARE
status INT;
chunk_name TEXT[];
compressed_chunk_index REGCLASS;
BEGIN

-- procedures with SET clause cannot execute transaction
-- control so we adjust search_path in procedure body
SET LOCAL search_path TO pg_catalog, pg_temp;

status := _timescaledb_functions.chunk_status(chunk);

-- Chunk names are in the internal catalog, but we only care about
-- the chunk name here.
-- status bits:
-- 1: compressed
-- 2: compressed unordered
-- 4: frozen
-- 8: compressed partial

chunk_name := parse_ident(chunk::text);
CASE
WHEN status = 0 THEN
RAISE EXCEPTION 'call compress_chunk instead of recompress_chunk';
WHEN status = 1 THEN
IF if_not_compressed THEN
RAISE NOTICE 'nothing to recompress in chunk "%"', chunk_name[array_upper(chunk_name,1)];
RETURN;
ELSE
RAISE EXCEPTION 'nothing to recompress in chunk "%"', chunk_name[array_upper(chunk_name,1)];
END IF;
WHEN status = 3 OR status = 9 OR status = 11 THEN
-- first check if there's an index. Might have to use a heuristic to determine if index usage would be efficient,
-- or if we'd better fall back to decompressing & recompressing entire chunk
SELECT _timescaledb_functions.get_compressed_chunk_index_for_recompression(chunk) INTO STRICT compressed_chunk_index;
IF compressed_chunk_index IS NOT NULL THEN
PERFORM _timescaledb_functions.recompress_chunk_segmentwise(chunk, if_not_compressed);
ELSE
PERFORM @extschema@.decompress_chunk(chunk);
COMMIT;
-- SET LOCAL is only active until end of transaction.
-- While we could use SET at the start of the function we do not
-- want to bleed out search_path to caller, so we do SET LOCAL
-- again after COMMIT
SET LOCAL search_path TO pg_catalog, pg_temp;
PERFORM @extschema@.compress_chunk(chunk, if_not_compressed);
END IF;
ELSE
RAISE EXCEPTION 'unexpected chunk status % in chunk "%"', status, chunk_name[array_upper(chunk_name,1)];
END CASE;
END
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE @extschema@.recompress_chunk(chunk REGCLASS, if_not_compressed BOOLEAN = true) LANGUAGE PLPGSQL AS $$
BEGIN
IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN
RAISE WARNING 'procedure @extschema@.recompress_chunk(regclass,boolean) is deprecated and the functionality is now included in @extschema@.compress_chunk. this compatibility function will be removed in a future version.';
END IF;
PERFORM @extschema@.compress_chunk(chunk, if_not_compressed);
END$$ SET search_path TO pg_catalog,pg_temp;

-- A version of makeaclitem that accepts a comma-separated list of
-- privileges rather than just a single privilege. This is copied from
Expand Down
4 changes: 4 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,7 @@ $BODY$
SELECT sum(total_bytes)::bigint
FROM @extschema@.hypertable_approximate_detailed_size(hypertable);
$BODY$ SET search_path TO pg_catalog, pg_temp;

DROP FUNCTION IF EXISTS @extschema@.compress_chunk;
CREATE FUNCTION @extschema@.compress_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN = true, recompress BOOLEAN = false) RETURNS REGCLASS AS '' LANGUAGE SQL SET search_path TO pg_catalog, pg_temp;

4 changes: 4 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,7 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.hypertable', ''
DROP FUNCTION IF EXISTS _timescaledb_functions.relation_approximate_size(relation REGCLASS);
DROP FUNCTION IF EXISTS @extschema@.hypertable_approximate_detailed_size(relation REGCLASS);
DROP FUNCTION IF EXISTS @extschema@.hypertable_approximate_size(hypertable REGCLASS);

DROP FUNCTION IF EXISTS @extschema@.compress_chunk;
CREATE FUNCTION @extschema@.compress_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN = true) RETURNS REGCLASS AS '' LANGUAGE SQL SET search_path TO pg_catalog,pg_temp;

2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ policy_recompression_execute(int32 job_id, Jsonb *config)
if (!ts_chunk_needs_recompression(chunk))
continue;

tsl_recompress_chunk_wrapper(chunk);
tsl_compress_chunk_wrapper(chunk, true, false);

elog(LOG,
"completed recompressing chunk \"%s.%s\"",
Expand Down
54 changes: 28 additions & 26 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
if (chunk_unordered)
{
ts_chunk_set_unordered(mergable_chunk);
tsl_recompress_chunk_wrapper(mergable_chunk);
tsl_compress_chunk_wrapper(mergable_chunk, true, false);
}
}

Expand Down Expand Up @@ -779,20 +779,45 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);
bool recompress = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

uncompressed_chunk_id = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);

PG_RETURN_OID(uncompressed_chunk_id);
}

Oid
tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress)
{
Oid uncompressed_chunk_id = chunk->table_id;

if (ts_chunk_is_compressed(chunk))
{
if (recompress)
{
CompressionSettings *ht_settings = ts_compression_settings_get(chunk->hypertable_relid);
Oid compressed_chunk_relid = ts_chunk_get_relid(chunk->fd.compressed_chunk_id, true);
CompressionSettings *chunk_settings =
ts_compression_settings_get(compressed_chunk_relid);

if (!ts_compression_settings_equal(ht_settings, chunk_settings))
{
decompress_chunk_impl(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
return uncompressed_chunk_id;
}
}
if (!ts_chunk_needs_recompression(chunk))
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
PG_RETURN_OID(uncompressed_chunk_id);
return uncompressed_chunk_id;
}

if (get_compressed_chunk_index_for_recompression(chunk))
Expand All @@ -810,7 +835,7 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
uncompressed_chunk_id = compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}

PG_RETURN_OID(uncompressed_chunk_id);
return uncompressed_chunk_id;
}

Datum
Expand Down Expand Up @@ -844,29 +869,6 @@ tsl_decompress_chunk(PG_FUNCTION_ARGS)
PG_RETURN_OID(uncompressed_chunk_id);
}

bool
tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_relid = uncompressed_chunk->table_id;

Hypertable *ht = ts_hypertable_get_by_id(uncompressed_chunk->fd.hypertable_id);
ts_hypertable_permissions_check(ht->main_table_relid, GetUserId());

if (!ht->fd.compressed_hypertable_id)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("missing compressed hypertable")));

if (ts_chunk_is_compressed(uncompressed_chunk))
{
decompress_chunk_impl(uncompressed_chunk, false);
}

Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_relid, true);
Assert(!ts_chunk_is_compressed(chunk));
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);

return true;
}

/* Sort the tuples and recompress them */
static void
recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel,
Expand Down
5 changes: 3 additions & 2 deletions tsl/src/compression/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
#include <postgres.h>
#include <fmgr.h>

#include "chunk.h"

extern Datum tsl_create_compressed_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_recompress_chunk(PG_FUNCTION_ARGS);
extern bool tsl_recompress_chunk_wrapper(Chunk *chunk);
extern Oid tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress);
extern Datum tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS);

extern Datum tsl_get_compressed_chunk_index_for_recompression(
Expand Down
106 changes: 106 additions & 0 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -2667,3 +2667,109 @@ SELECT approximate_row_count('stattest');
(1 row)

DROP TABLE stattest;
-- test that all variants of compress_chunk produce a fully compressed chunk
CREATE TABLE compress_chunk_test(time TIMESTAMPTZ NOT NULL, device text, value float);
SELECT create_hypertable('compress_chunk_test', 'time');
create_hypertable
-----------------------------------
(47,public,compress_chunk_test,t)
(1 row)

INSERT INTO compress_chunk_test SELECT '2020-01-01', 'r2d2', 3.14;
ALTER TABLE compress_chunk_test SET (timescaledb.compress);
SELECT show_chunks('compress_chunk_test') AS "CHUNK" \gset
-- initial call will compress the chunk
SELECT compress_chunk(:'CHUNK');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- subsequent calls will be noop
SELECT compress_chunk(:'CHUNK');
NOTICE: chunk "_hyper_47_100_chunk" is already compressed
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- unless if_not_compressed is set to false
\set ON_ERROR_STOP 0
SELECT compress_chunk(:'CHUNK', false);
ERROR: chunk "_hyper_47_100_chunk" is already compressed
\set ON_ERROR_STOP 1
ALTER TABLE compress_chunk_test SET (timescaledb.compress_segmentby='device');
SELECT compressed_chunk_id from _timescaledb_catalog.chunk ch INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = ch.hypertable_id AND ht.table_name='compress_chunk_test';
compressed_chunk_id
---------------------
101
(1 row)

-- changing compression settings will not recompress the chunk by default
SELECT compress_chunk(:'CHUNK');
NOTICE: chunk "_hyper_47_100_chunk" is already compressed
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- unless we specify recompress := true
SELECT compress_chunk(:'CHUNK', recompress := true);
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- compressed_chunk_id should be different now
SELECT compressed_chunk_id from _timescaledb_catalog.chunk ch INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = ch.hypertable_id AND ht.table_name='compress_chunk_test';
compressed_chunk_id
---------------------
102
(1 row)

--test partial handling
INSERT INTO compress_chunk_test SELECT '2020-01-01', 'c3po', 3.14;
-- should result in merging uncompressed data into compressed chunk
SELECT compress_chunk(:'CHUNK');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- compressed_chunk_id should not have changed
SELECT compressed_chunk_id from _timescaledb_catalog.chunk ch INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = ch.hypertable_id AND ht.table_name='compress_chunk_test';
compressed_chunk_id
---------------------
102
(1 row)

-- should return no rows
SELECT * FROM ONLY :CHUNK;
time | device | value
------+--------+-------
(0 rows)

ALTER TABLE compress_chunk_test SET (timescaledb.compress_segmentby='');
-- create another chunk
INSERT INTO compress_chunk_test SELECT '2021-01-01', 'c3po', 3.14;
SELECT show_chunks('compress_chunk_test') AS "CHUNK2" LIMIT 1 OFFSET 1 \gset
SELECT compress_chunk(:'CHUNK2');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_103_chunk
(1 row)

-- make it partial and compress again
INSERT INTO compress_chunk_test SELECT '2021-01-01', 'r2d2', 3.14;
SELECT compress_chunk(:'CHUNK2');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_103_chunk
(1 row)

-- should return no rows
SELECT * FROM ONLY :CHUNK2;
time | device | value
------+--------+-------
(0 rows)

44 changes: 28 additions & 16 deletions tsl/test/expected/compression_bgw-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,12 @@ SELECT count(*) from test2;
29
(1 row)

-- call recompress_chunk inside a transaction. This should fails since
-- it contains transaction-terminating commands.
\set ON_ERROR_STOP 0
START TRANSACTION;
CALL recompress_chunk(:'CHUNK_NAME'::regclass);
ROLLBACK;
\set ON_ERROR_STOP 1
CALL recompress_chunk(:'CHUNK_NAME'::regclass);
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

-- Demonstrate that no locks are held on the hypertable, chunk, or the
-- compressed chunk after recompress_chunk has executed.
SELECT pid, locktype, relation, relation::regclass, mode, granted
Expand Down Expand Up @@ -510,21 +508,35 @@ WHERE hypertable_name = 'test2' ORDER BY chunk_name;
(2 rows)

\set ON_ERROR_STOP 0
-- call recompress_chunk when status is not unordered
CALL recompress_chunk(:'CHUNK_NAME'::regclass, true);
psql:include/recompress_basic.sql:115: NOTICE: nothing to recompress in chunk "_hyper_14_62_chunk"
-- call compress_chunk when status is not unordered
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:107: NOTICE: chunk "_hyper_14_62_chunk" is already compressed
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

-- This will succeed and compress the chunk for the test below.
CALL recompress_chunk(:'CHUNK_NAME'::regclass, false);
psql:include/recompress_basic.sql:118: ERROR: nothing to recompress in chunk "_hyper_14_62_chunk"
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:110: NOTICE: chunk "_hyper_14_62_chunk" is already compressed
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

--now decompress it , then try and recompress
SELECT decompress_chunk(:'CHUNK_NAME'::regclass);
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

CALL recompress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:122: ERROR: call compress_chunk instead of recompress_chunk
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

\set ON_ERROR_STOP 1
-- test recompress policy
CREATE TABLE metrics(time timestamptz NOT NULL);
Expand Down Expand Up @@ -629,7 +641,7 @@ SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'met

---- nothing to do yet
CALL run_job(:JOB_RECOMPRESS);
psql:include/recompress_basic.sql:194: NOTICE: no chunks for hypertable "public.metrics" that satisfy recompress chunk policy
psql:include/recompress_basic.sql:186: NOTICE: no chunks for hypertable "public.metrics" that satisfy recompress chunk policy
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
Expand Down
Loading

0 comments on commit 101e4c5

Please sign in to comment.