Skip to content

Commit

Permalink
Fix issues with Hypercore TAM recompression
Browse files Browse the repository at this point in the history
A truncate on a hypercore TAM table is executed across both compressed
and non-compressed data. This caused an issue when recompressing
because it tries to truncate also the compressed data. Fix this issue
by introducing a flag that allows truncating only the non-compressed
data.

Another issue releated to cache invalidation is also fixed. Since a
recompression sometimes creates a new compressed relation, and the
compressed relid is cached in the Hypercore TAM's relcache entry, the
cache needs to be invalidated during recompression. However, this
wasn't done previously leading to an error. This is fixed by adding a
relcache invalidation during recompression.

Finally, compression using an index scan is disabled for Hypercore TAM
since the index covers also compressed data (in the recompression
case). While the index could be used when compressing the first time
(when only non-compressed data is indexed), it is still disabled
completely for Hypercore TAM given that index scans are not used by
default anyway.

Tests are added to cover all of the issues described above.
  • Loading branch information
erimatnor committed Oct 18, 2024
1 parent b65083e commit fb6af6d
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 17 deletions.
22 changes: 22 additions & 0 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,20 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
(errmsg("new compressed chunk \"%s.%s\" created",
NameStr(compress_ht_chunk->fd.schema_name),
NameStr(compress_ht_chunk->fd.table_name))));

/* Since a new compressed relation was created it is necessary to
* invalidate the relcache entry for the chunk because Hypercore TAM
* caches information about the compressed relation in the
* relcache. */
if (ts_is_hypercore_am(cxt.srcht_chunk->amoid))
{
/* Tell other backends */
CacheInvalidateRelcacheByRelid(cxt.srcht_chunk->table_id);

/* Immediately invalidate our own cache */
RelationCacheInvalidateEntry(cxt.srcht_chunk->table_id);
}

EventTriggerAlterTableEnd();
}
else
Expand Down Expand Up @@ -844,11 +858,19 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, enum UseAccessMethod use
TS_FALLTHROUGH;
case USE_AM_NULL:
Assert(rel_is_hypercore);
/* Don't forward the truncate to the compressed data during recompression */
bool truncate_compressed = hypercore_set_truncate_compressed(false);
relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);
hypercore_set_truncate_compressed(truncate_compressed);
break;
case USE_AM_TRUE:
if (rel_is_hypercore)
{
/* Don't forward the truncate to the compressed data during recompression */
bool truncate_compressed = hypercore_set_truncate_compressed(false);
relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);
hypercore_set_truncate_compressed(truncate_compressed);
}
else
{
/* Convert to a compressed hypercore by simply calling ALTER TABLE
Expand Down
26 changes: 22 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/skey.h>
#include <catalog/heap.h>
#include <catalog/pg_am.h>
#include <common/base64.h>
Expand All @@ -27,6 +28,7 @@
#include "debug_assert.h"
#include "debug_point.h"
#include "guc.h"
#include "hypercore/hypercore_handler.h"
#include "nodes/chunk_dispatch/chunk_insert_state.h"
#include "segment_meta.h"
#include "ts_catalog/array_utils.h"
Expand Down Expand Up @@ -180,7 +182,12 @@ static void
RelationDeleteAllRows(Relation rel, Snapshot snap)
{
TupleTableSlot *slot = table_slot_create(rel, NULL);
TableScanDesc scan = table_beginscan(rel, snap, 0, (ScanKey) NULL);
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
TableScanDesc scan = table_beginscan(rel, snap, 0, &scankey);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down Expand Up @@ -291,8 +298,14 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
* The following code is trying to find an existing index that
* matches the configuration so that we can skip sequential scan and
* tuplesort.
*
* Note that Hypercore TAM doesn't support (re-)compression via index at
* this point because the index covers also existing compressed tuples. It
* could be supported for initial compression when there is no compressed
* data, but for now just avoid it altogether since compression indexscan
* isn't enabled by default anyway.
*/
if (ts_guc_enable_compression_indexscan)
if (ts_guc_enable_compression_indexscan && !REL_IS_HYPERCORE(in_rel))
{
List *in_rel_index_oids = RelationGetIndexList(in_rel);
foreach (lc, in_rel_index_oids)
Expand Down Expand Up @@ -442,6 +455,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
{
int64 nrows_processed = 0;

Assert(!REL_IS_HYPERCORE(in_rel));
elog(ts_guc_debug_compression_path_info ? INFO : DEBUG1,
"using index \"%s\" to scan rows for compression",
get_rel_name(matched_index_rel->rd_id));
Expand Down Expand Up @@ -562,9 +576,13 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
Tuplesortstate *tuplesortstate;
TableScanDesc scan;
TupleTableSlot *slot;

ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
tuplesortstate = compression_create_tuplesort_state(settings, in_rel);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, &scankey);
slot = table_slot_create(in_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down
36 changes: 25 additions & 11 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
#include "debug_assert.h"
#include "guc.h"
#include "hypercore_handler.h"
#include "process_utility.h"
#include "relstats.h"
#include "trigger.h"
#include "ts_catalog/array_utils.h"
Expand All @@ -82,6 +81,16 @@ static List *partially_compressed_relids = NIL; /* Relids that needs to have
* updated status set at end of
* transaction */

static bool hypercore_truncate_compressed = true;

bool
hypercore_set_truncate_compressed(bool onoff)
{
bool old_value = hypercore_truncate_compressed;
hypercore_truncate_compressed = onoff;
return old_value;
}

#define HYPERCORE_AM_INFO_SIZE(natts) \
(sizeof(HypercoreInfo) + (sizeof(ColumnCompressionSettings) * (natts)))

Expand Down Expand Up @@ -540,12 +549,13 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
ParallelTableScanDesc cptscan =
parallel_scan ? (ParallelTableScanDesc) &cpscan->cpscandesc : NULL;

scan->cscan_desc = scan->compressed_rel->rd_tableam->scan_begin(scan->compressed_rel,
snapshot,
scan->rs_base.rs_nkeys,
scan->rs_base.rs_key,
cptscan,
flags);
if (scan->compressed_rel)
scan->cscan_desc = scan->compressed_rel->rd_tableam->scan_begin(scan->compressed_rel,
snapshot,
scan->rs_base.rs_nkeys,
scan->rs_base.rs_key,
cptscan,
flags);

return &scan->rs_base;
}
Expand All @@ -560,7 +570,11 @@ hypercore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_s
scan->reset = true;
scan->hs_scan_state = HYPERCORE_SCAN_START;

table_rescan(scan->cscan_desc, key);
if (key && key->sk_flags & SK_NO_COMPRESSED)
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;

if (scan->cscan_desc)
table_rescan(scan->cscan_desc, key);

Relation relation = scan->uscan_desc->rs_rd;
const TableAmRoutine *oldtam = switch_to_heapam(relation);
Expand Down Expand Up @@ -1643,7 +1657,7 @@ hypercore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapsh
{
TM_Result result = TM_Ok;

if (is_compressed_tid(tid))
if (is_compressed_tid(tid) && hypercore_truncate_compressed)
{
HypercoreInfo *caminfo = RelationGetHypercoreInfo(relation);
Relation crel = table_open(caminfo->compressed_relid, RowExclusiveLock);
Expand Down Expand Up @@ -1827,7 +1841,7 @@ hypercore_relation_set_new_filelocator(Relation rel, const RelFileLocator *newrl
* change the rel file number for it as well. This can happen if you, for
* example, execute a transactional TRUNCATE. */
Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel));
if (OidIsValid(compressed_relid))
if (OidIsValid(compressed_relid) && hypercore_truncate_compressed)
{
Relation compressed_rel = table_open(compressed_relid, AccessExclusiveLock);
#if PG16_GE
Expand All @@ -1847,7 +1861,7 @@ hypercore_relation_nontransactional_truncate(Relation rel)
rel->rd_tableam = oldtam;

Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel));
if (OidIsValid(compressed_relid))
if (OidIsValid(compressed_relid) && hypercore_truncate_compressed)
{
Relation crel = table_open(compressed_relid, AccessShareLock);
crel->rd_tableam->relation_nontransactional_truncate(crel);
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern void hypercore_alter_access_method_begin(Oid relid, bool to_other_am);
extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am);
extern Datum hypercore_handler(PG_FUNCTION_ARGS);
extern void hypercore_xact_event(XactEvent event, void *arg);
extern bool hypercore_set_truncate_compressed(bool onoff);

typedef struct ColumnCompressionSettings
{
Expand Down Expand Up @@ -56,4 +57,6 @@ typedef struct HypercoreInfo
ColumnCompressionSettings columns[FLEXIBLE_ARRAY_MEMBER];
} HypercoreInfo;

#define REL_IS_HYPERCORE(rel) ((rel)->rd_tableam == hypercore_routine())

extern HypercoreInfo *RelationGetHypercoreInfo(Relation rel);
119 changes: 119 additions & 0 deletions tsl/test/expected/hypercore.out
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,122 @@ SELECT count(*) FROM :chunk;
(1 row)

drop table readings;
---------------------------------------------
-- Test recompression via compress_chunk() --
---------------------------------------------
show timescaledb.enable_transparent_decompression;
timescaledb.enable_transparent_decompression
----------------------------------------------
off
(1 row)

create table recompress (time timestamptz, value int);
select create_hypertable('recompress', 'time', create_default_indexes => false);
NOTICE: adding not-null constraint to column "time"
create_hypertable
-------------------------
(3,public,recompress,t)
(1 row)

insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2);
select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk
from timescaledb_information.chunks
where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass
order by unique_chunk asc
limit 1 \gset
alter table recompress set (timescaledb.compress_orderby='time');
WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes
NOTICE: default segment by for hypertable "recompress" is set to ""
alter table :unique_chunk set access method hypercore;
-- Should already be compressed
select compress_chunk(:'unique_chunk');
NOTICE: chunk "_hyper_3_34_chunk" is already compressed
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

-- Insert something to compress
insert into recompress values ('2024-01-01 03:00', 3);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

-- Make sure we see the data after recompression and everything is
-- compressed
select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
(3 rows)

-- Add a time index to test recompression with index scan. Index scans
-- during compression is actually disabled for Hypercore TAM since the
-- index covers also compressed data, so this is only a check that the
-- GUC can be set without negative consequences.
create index on recompress (time);
set timescaledb.enable_compression_indexscan=true;
-- Insert another value to compress
insert into recompress values ('2024-01-02 04:00', 4);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
t | Tue Jan 02 04:00:00 2024 PST | 4
(4 rows)

-- Test using delete instead of truncate when compressing
set timescaledb.enable_delete_after_compression=true;
-- Insert another value to compress
insert into recompress values ('2024-01-02 05:00', 5);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
t | Tue Jan 02 04:00:00 2024 PST | 4
t | Tue Jan 02 05:00:00 2024 PST | 5
(5 rows)

-- Add a segmentby key to test segmentwise recompression
-- Insert another value to compress that goes into same segment
alter table :unique_chunk set access method heap;
alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value');
alter table :unique_chunk set access method hypercore;
insert into recompress values ('2024-01-02 06:00', 5);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
t | Tue Jan 02 04:00:00 2024 PST | 4
t | Tue Jan 02 05:00:00 2024 PST | 5
t | Tue Jan 02 06:00:00 2024 PST | 5
(6 rows)

4 changes: 2 additions & 2 deletions tsl/test/expected/hypercore_create.out
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ select * from compressed_rel_size_stats order by rel;
_timescaledb_internal._hyper_1_7_chunk | hypercore | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_9_chunk | hypercore | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_11_chunk | hypercore | test2 | 373 | 10 | 10
_timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 0 | 0 | 0
_timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_17_chunk | hypercore | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_18_chunk | hypercore | test3 | 1 | 1 | 1
(9 rows)
Expand Down Expand Up @@ -476,7 +476,7 @@ select * from compressed_rel_size_stats order by rel;
_timescaledb_internal._hyper_1_7_chunk | heap | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_9_chunk | heap | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_11_chunk | heap | test2 | 373 | 10 | 10
_timescaledb_internal._hyper_4_13_chunk | heap | test3 | 0 | 0 | 0
_timescaledb_internal._hyper_4_13_chunk | heap | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_17_chunk | heap | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_18_chunk | heap | test3 | 1 | 1 | 1
(9 rows)
Expand Down
Loading

0 comments on commit fb6af6d

Please sign in to comment.