Skip to content

Commit

Permalink
Improve cagg_watermark function performance
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Mar 8, 2023
1 parent a854b27 commit 9deb2e7
Show file tree
Hide file tree
Showing 18 changed files with 514 additions and 211 deletions.
12 changes: 12 additions & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,18 @@ CREATE TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold (

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_invalidation_threshold', '');

CREATE TABLE _timescaledb_catalog.continuous_aggs_watermark (
mat_hypertable_id integer NOT NULL,
watermark bigint NOT NULL,
-- table constraints
CONSTRAINT continuous_aggs_watermark_pkey PRIMARY KEY (mat_hypertable_id),
CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_watermark', '');



-- this does not have an FK on the materialization table since INSERTs to this
-- table are performance critical
CREATE TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log (
Expand Down
13 changes: 13 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,16 @@ DROP FUNCTION _timescaledb_internal.ping_data_node(NAME);

CREATE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME, timeout INTERVAL = NULL) RETURNS BOOLEAN
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;

CREATE TABLE _timescaledb_catalog.continuous_aggs_watermark (
mat_hypertable_id integer NOT NULL,
watermark bigint NOT NULL,
-- table constraints
CONSTRAINT continuous_aggs_watermark_pkey PRIMARY KEY (mat_hypertable_id),
CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
);

GRANT SELECT ON _timescaledb_catalog.continuous_aggs_watermark TO PUBLIC;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_watermark', '');

5 changes: 5 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@ DROP FUNCTION _timescaledb_internal.ping_data_node(NAME, INTERVAL);

CREATE OR REPLACE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME) RETURNS BOOLEAN
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_watermark;

DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_watermark;

19 changes: 18 additions & 1 deletion src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "ts_catalog/chunk_data_node.h"
#include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "ts_catalog/hypertable_data_node.h"
#include "utils.h"

Expand Down Expand Up @@ -3891,7 +3892,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
Chunk *chunks;
const char *schema_name, *table_name;
const int32 hypertable_id = ht->fd.id;
bool has_continuous_aggs;
bool has_continuous_aggs, is_materialization_hypertable;
const MemoryContext oldcontext = CurrentMemoryContext;
ScanTupLock tuplock = {
.waitpolicy = LockWaitBlock,
Expand All @@ -3911,14 +3912,22 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
* well. Do not unlock - let the transaction semantics take care of it. */
lock_referenced_tables(ht->main_table_relid);

is_materialization_hypertable = false;

switch (ts_continuous_agg_hypertable_status(hypertable_id))
{
case HypertableIsMaterialization:
{
has_continuous_aggs = false;
is_materialization_hypertable = true;
break;
}
case HypertableIsMaterializationAndRaw:
{
has_continuous_aggs = true;
is_materialization_hypertable = true;
break;
}
case HypertableIsRawTable:
has_continuous_aggs = true;
break;
Expand Down Expand Up @@ -4027,6 +4036,14 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
}
}

/* When dropping chunks for a given CAgg then force set the watermark */
if (is_materialization_hypertable)
{
bool isnull;
int64 watermark = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull, true);
ts_cagg_watermark_set(ht, watermark, isnull);
}

if (affected_data_nodes)
*affected_data_nodes = data_nodes;

Expand Down
20 changes: 14 additions & 6 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2902,14 +2902,16 @@ ts_hypertable_func_call_on_data_nodes(const Hypertable *ht, FunctionCallInfo fci
/*
* Get the max value of an open dimension.
*/
Datum
ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull)
int64
ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull,
bool connect_to_spi)
{
StringInfo command;
const Dimension *dim;
int res;
bool max_isnull;
Datum maxdat;
int64 max_value = 0;

dim = hyperspace_get_open_dimension(ht->space, dimension_index);

Expand All @@ -2930,10 +2932,10 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index,
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)));

if (SPI_connect() != SPI_OK_CONNECT)
if (connect_to_spi && SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);
res = SPI_execute(command->data, connect_to_spi /* read_only */, 0 /*count*/);

if (res < 0)
ereport(ERROR,
Expand All @@ -2950,10 +2952,16 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index,
if (isnull)
*isnull = max_isnull;

if ((res = SPI_finish()) != SPI_OK_FINISH)
if (connect_to_spi && (res = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return maxdat;
if (!max_isnull)
{
Oid timetype = ts_dimension_get_partition_type(dim);
max_value = ts_time_value_to_internal(maxdat, timetype);
}

return max_value;
}

bool
Expand Down
5 changes: 3 additions & 2 deletions src/hypertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ extern TSDLLEXPORT List *ts_hypertable_get_available_data_node_server_oids(const
extern TSDLLEXPORT HypertableType ts_hypertable_get_type(const Hypertable *ht);
extern TSDLLEXPORT void ts_hypertable_func_call_on_data_nodes(const Hypertable *ht,
FunctionCallInfo fcinfo);
extern TSDLLEXPORT Datum ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull);
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull,
bool connect_to_spi);

extern TSDLLEXPORT bool ts_hypertable_has_compression_table(const Hypertable *ht);
extern TSDLLEXPORT void ts_hypertable_formdata_fill(FormData_hypertable *fd, const TupleInfo *ti);
Expand Down
1 change: 1 addition & 0 deletions src/ts_catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/chunk_data_node.c
${CMAKE_CURRENT_SOURCE_DIR}/compression_chunk_size.c
${CMAKE_CURRENT_SOURCE_DIR}/continuous_agg.c
${CMAKE_CURRENT_SOURCE_DIR}/continuous_aggs_watermark.c
${CMAKE_CURRENT_SOURCE_DIR}/dimension_partition.c
${CMAKE_CURRENT_SOURCE_DIR}/hypertable_compression.c
${CMAKE_CURRENT_SOURCE_DIR}/hypertable_data_node.c
Expand Down
10 changes: 10 additions & 0 deletions src/ts_catalog/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = INTERNAL_SCHEMA_NAME,
.table_name = JOB_ERRORS_TABLE_NAME,
},
[CONTINUOUS_AGGS_WATERMARK] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_WATERMARK_TABLE_NAME,
},
[_MAX_CATALOG_TABLES] = {
.schema_name = "invalid schema",
.table_name = "invalid table",
Expand Down Expand Up @@ -252,6 +256,12 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG_IDX] = "continuous_aggs_materialization_invalidation_log_idx",
},
},
[CONTINUOUS_AGGS_WATERMARK] = {
.length = _MAX_CONTINUOUS_AGGS_WATERMARK_INDEX,
.names = (char *[]) {
[CONTINUOUS_AGGS_WATERMARK_PKEY] = "continuous_aggs_watermark_pkey",
},
},
[HYPERTABLE_COMPRESSION] = {
.length = _MAX_HYPERTABLE_COMPRESSION_INDEX,
.names = (char *[]) {
Expand Down
34 changes: 34 additions & 0 deletions src/ts_catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef enum CatalogTable
CHUNK_COPY_OPERATION,
CONTINUOUS_AGGS_BUCKET_FUNCTION,
JOB_ERRORS,
CONTINUOUS_AGGS_WATERMARK,
/* Don't forget updating catalog.c when adding new tables! */
_MAX_CATALOG_TABLES,
} CatalogTable;
Expand Down Expand Up @@ -1172,6 +1173,39 @@ typedef enum Anum_continuous_aggs_materialization_invalidation_log_idx
#define Natts_continuous_aggs_materialization_invalidation_log_idx \
(_Anum_continuous_aggs_materialization_invalidation_log_idx_max - 1)

/****** CONTINUOUS_AGGS_WATERMARK_TABLE definitions*/
#define CONTINUOUS_AGGS_WATERMARK_TABLE_NAME "continuous_aggs_watermark"
typedef enum Anum_continuous_aggs_watermark
{
Anum_continuous_aggs_watermark_mat_hypertable_id = 1,
Anum_continuous_aggs_watermark_watermark,
_Anum_continuous_aggs_watermark_max,
} Anum_continuous_aggs_watermark;

#define Natts_continuous_aggs_watermark (_Anum_continuous_aggs_watermark_max - 1)

typedef struct FormData_continuous_aggs_watermark
{
int32 mat_hypertable_id;
int64 watermark;
} FormData_continuous_aggs_watermark;

typedef FormData_continuous_aggs_watermark *Form_continuous_aggs_watermark;

enum
{
CONTINUOUS_AGGS_WATERMARK_PKEY = 0,
_MAX_CONTINUOUS_AGGS_WATERMARK_INDEX,
};

typedef enum Anum_continuous_aggs_watermark_pkey
{
Anum_continuous_aggs_watermark_pkey_mat_hypertable_id = 1,
_Anum_continuous_aggs_watermark_pkey_max,
} Anum_continuous_aggs_watermark_pkey;

#define Natts_continuous_aggs_watermark_pkey (_Anum_continuous_aggs_watermark_pkey_max - 1)

#define HYPERTABLE_COMPRESSION_TABLE_NAME "hypertable_compression"
typedef enum Anum_hypertable_compression
{
Expand Down
Loading

0 comments on commit 9deb2e7

Please sign in to comment.