Skip to content

Commit

Permalink
Remove watermark
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Mar 6, 2023
1 parent 3fea8e8 commit 4ad547f
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 45 deletions.
5 changes: 3 additions & 2 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3892,8 +3892,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
Chunk *chunks;
const char *schema_name, *table_name;
const int32 hypertable_id = ht->fd.id;
bool has_continuous_aggs;
bool is_materialization_hypertable = false;
bool has_continuous_aggs, is_materialization_hypertable;
const MemoryContext oldcontext = CurrentMemoryContext;
ScanTupLock tuplock = {
.waitpolicy = LockWaitBlock,
Expand All @@ -3913,6 +3912,8 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
* well. Do not unlock - let the transaction semantics take care of it. */
lock_referenced_tables(ht->main_table_relid);

is_materialization_hypertable = false;

switch (ts_continuous_agg_hypertable_status(hypertable_id))
{
case HypertableIsMaterialization:
Expand Down
7 changes: 5 additions & 2 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <postgres.h>
#include <access/htup_details.h>
#include <access/xact.h>
#include <catalog/dependency.h>
#include <catalog/namespace.h>
#include <catalog/pg_trigger.h>
Expand All @@ -24,12 +23,12 @@
#include <utils/date.h>
#include <utils/lsyscache.h>
#include <utils/timestamp.h>
#include <miscadmin.h>

#include "compat/compat.h"

#include "bgw/job.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "cross_module_fn.h"
#include "hypercube.h"
#include "hypertable.h"
Expand Down Expand Up @@ -1094,6 +1093,7 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)
catalog = ts_catalog_get();
LockRelationOid(catalog_get_table_id(catalog, BGW_JOB), RowExclusiveLock);
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGG), RowExclusiveLock);
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_WATERMARK), RowExclusiveLock);

raw_hypertable_has_other_caggs =
OidIsValid(raw_hypertable.objectId) &&
Expand Down Expand Up @@ -1164,6 +1164,9 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)
{
invalidation_threshold_delete(form.raw_hypertable_id);
}

/* Delete watermark */
ts_cagg_watermark_delete_by_mat_hypertable_id(form.mat_hypertable_id);
}

if (cadata->bucket_width == BUCKET_WIDTH_VARIABLE)
Expand Down
104 changes: 64 additions & 40 deletions src/ts_catalog/continuous_aggs_watermark.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@
*/

#include <postgres.h>
#include <access/xact.h>
#include <fmgr.h>
#include <miscadmin.h>
#include <utils/acl.h>

#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "hypertable.h"

typedef struct CAggWatermark
typedef struct ContinuousAggregateWatermark
{
int32 hyper_id;
int32 mat_hypertable_id;
MemoryContext mctx;
MemoryContextCallback cb;
CommandId cid;
int64 value;
} CAggWatermark;
} ContinuousAggregateWatermark;

/*
* Globally cache the watermark for better performance (by avoiding repeated
Expand All @@ -38,7 +40,7 @@ typedef struct CAggWatermark
* only a fallback if the planner needs to constify it many times (e.g., if
* used as an index condition on many chunks).
*/
static CAggWatermark *watermark = NULL;
static ContinuousAggregateWatermark *watermark_cache = NULL;

/*
* Callback handler to reset the watermark after the transaction ends. This is
Expand All @@ -47,17 +49,18 @@ static CAggWatermark *watermark = NULL;
static void
cagg_watermark_reset(void *arg)
{
watermark = NULL;
watermark_cache = NULL;
}

/*
* CAggWatermark is valid for the duration of one command execution on the same
* ContinuousAggregateWatermark is valid for the duration of one command execution on the same
* materialized hypertable.
*/
static bool
cagg_watermark_is_valid(const CAggWatermark *w, int32 hyper_id)
cagg_watermark_is_valid(const ContinuousAggregateWatermark *watermark, int32 mat_hypertable_id)
{
return w != NULL && w->hyper_id == hyper_id && w->cid == GetCurrentCommandId(false);
return watermark != NULL && watermark->mat_hypertable_id == mat_hypertable_id &&
watermark->cid == GetCurrentCommandId(false);
}

static void
Expand All @@ -78,8 +81,8 @@ static Datum
cagg_watermark_get(Hypertable *mat_ht, bool *isnull)
{
PG_USED_FOR_ASSERTS_ONLY short count = 0;
Datum datum;
bool value_isnull;
Datum datum = (Datum) 0;
bool value_isnull = true;
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, RowExclusiveLock, CurrentMemoryContext);

Expand All @@ -105,24 +108,25 @@ cagg_watermark_get(Hypertable *mat_ht, bool *isnull)
return datum;
}

static CAggWatermark *
static ContinuousAggregateWatermark *
cagg_watermark_create(const ContinuousAgg *cagg, MemoryContext top_mctx)
{
Hypertable *ht;
const Dimension *dim;
Datum maxdat;
bool max_isnull;
Oid timetype;
CAggWatermark *w;
MemoryContext mctx =
AllocSetContextCreate(top_mctx, "CAggWatermark function", ALLOCSET_DEFAULT_SIZES);

w = MemoryContextAllocZero(mctx, sizeof(CAggWatermark));
w->mctx = mctx;
w->hyper_id = cagg->data.mat_hypertable_id;
w->cid = GetCurrentCommandId(false);
w->cb.func = cagg_watermark_reset;
MemoryContextRegisterResetCallback(mctx, &w->cb);
ContinuousAggregateWatermark *watermark;
MemoryContext mctx = AllocSetContextCreate(top_mctx,
"ContinuousAggregateWatermark function",
ALLOCSET_DEFAULT_SIZES);

watermark = MemoryContextAllocZero(mctx, sizeof(ContinuousAggregateWatermark));
watermark->mctx = mctx;
watermark->mat_hypertable_id = cagg->data.mat_hypertable_id;
watermark->cid = GetCurrentCommandId(false);
watermark->cb.func = cagg_watermark_reset;
MemoryContextRegisterResetCallback(mctx, &watermark->cb);

ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
Assert(NULL != ht);
Expand All @@ -144,22 +148,22 @@ cagg_watermark_create(const ContinuousAgg *cagg, MemoryContext top_mctx)
* be added to ts_compute_beginning_of_the_next_bucket_variable() as
* an optimization, if necessary.
*/
w->value =
watermark->value =
ts_compute_beginning_of_the_next_bucket_variable(value, cagg->bucket_function);
}
else
{
w->value =
watermark->value =
ts_time_saturating_add(value, ts_continuous_agg_bucket_width(cagg), timetype);
}
}
else
{
/* Nothing materialized, so return min */
w->value = ts_time_get_min(timetype);
watermark->value = ts_time_get_min(timetype);
}

return w;
return watermark;
}

TS_FUNCTION_INFO_V1(ts_continuous_agg_watermark);
Expand All @@ -172,15 +176,17 @@ TS_FUNCTION_INFO_V1(ts_continuous_agg_watermark);
* aggregate. It is used by real-time aggregation as the threshold between the
* materialized data and real-time data in the UNION query.
*
* The watermark is defined as the end of the last (highest) bucket in the
* materialized hypertable of a continuous aggregate.
* The watermark is stored into `_timescaledb_catalog.continuous_aggs_watermark`
* catalog table by the `refresh_continuous_agregate` procedure. It is defined
* as the end of the last (highest) bucket in the materialized hypertable of a
* continuous aggregate.
*
* The materialized hypertable ID is given as input argument.
*/
Datum
ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
{
const int32 hyper_id = PG_GETARG_INT32(0);
const int32 mat_hypertable_id = PG_GETARG_INT32(0);
ContinuousAgg *cagg;
AclResult aclresult;

Expand All @@ -189,28 +195,28 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("materialized hypertable cannot be NULL")));

if (watermark != NULL)
if (watermark_cache != NULL)
{
if (cagg_watermark_is_valid(watermark, hyper_id))
PG_RETURN_INT64(watermark->value);
if (cagg_watermark_is_valid(watermark_cache, mat_hypertable_id))
PG_RETURN_INT64(watermark_cache->value);

MemoryContextDelete(watermark->mctx);
MemoryContextDelete(watermark_cache->mctx);
}

cagg = ts_continuous_agg_find_by_mat_hypertable_id(hyper_id);
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);

if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", hyper_id)));
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));

/* Preemptive permission check to ensure the function complains about lack
* of permissions on the cagg rather than the materialized hypertable */
aclresult = pg_class_aclcheck(cagg->relid, GetUserId(), ACL_SELECT);
aclcheck_error(aclresult, OBJECT_MATVIEW, get_rel_name(cagg->relid));
watermark = cagg_watermark_create(cagg, TopTransactionContext);
watermark_cache = cagg_watermark_create(cagg, TopTransactionContext);

PG_RETURN_INT64(watermark->value);
PG_RETURN_INT64(watermark_cache->value);
}

static ScanTupleResult
Expand All @@ -235,7 +241,7 @@ cagg_watermark_scan_update(TupleInfo *ti, void *data)
static void
cagg_watermark_set_internal(int32 mat_hypertable_id, Datum new_watermark)
{
bool updated_watermark;
bool watermark_updated;
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0],
Expand All @@ -244,7 +250,7 @@ cagg_watermark_set_internal(int32 mat_hypertable_id, Datum new_watermark)
F_INT4EQ,
Int32GetDatum(mat_hypertable_id));

updated_watermark = ts_catalog_scan_one(CONTINUOUS_AGGS_WATERMARK /*=table*/,
watermark_updated = ts_catalog_scan_one(CONTINUOUS_AGGS_WATERMARK /*=table*/,
CONTINUOUS_AGGS_WATERMARK_PKEY /*=indexid*/,
scankey /*=scankey*/,
1 /*=num_keys*/,
Expand All @@ -253,7 +259,7 @@ cagg_watermark_set_internal(int32 mat_hypertable_id, Datum new_watermark)
CONTINUOUS_AGGS_WATERMARK_TABLE_NAME /*=table_name*/,
&new_watermark /*=data*/);

if (!updated_watermark)
if (!watermark_updated)
{
Catalog *catalog = ts_catalog_get();
Relation rel =
Expand All @@ -271,7 +277,7 @@ cagg_watermark_set_internal(int32 mat_hypertable_id, Datum new_watermark)
}
}

void
TSDLLEXPORT void
ts_cagg_watermark_set(Hypertable *mat_ht, Datum watermark, bool watermark_isnull)
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id);
Expand All @@ -288,3 +294,21 @@ ts_cagg_watermark_set(Hypertable *mat_ht, Datum watermark, bool watermark_isnull

return;
}

TSDLLEXPORT bool
ts_cagg_watermark_delete_by_mat_hypertable_id(int32 mat_hypertable_id)
{
int count = 0;
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, RowExclusiveLock, CurrentMemoryContext);

cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_hypertable_id);

ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
count++;
}
return count > 0;
}
1 change: 1 addition & 0 deletions src/ts_catalog/continuous_aggs_watermark.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "export.h"
#include "hypertable.h"

extern TSDLLEXPORT bool ts_cagg_watermark_delete_by_mat_hypertable_id(int32 mat_hypertable_id);
extern TSDLLEXPORT void ts_cagg_watermark_set(Hypertable *mat_ht, Datum watermark,
bool new_watermark_isnull);

Expand Down
3 changes: 2 additions & 1 deletion test/expected/drop_rename_hypertable.out
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ SELECT * FROM _timescaledb_catalog.hypertable;
_timescaledb_catalog | continuous_aggs_hypertable_invalidation_log | table | super_user
_timescaledb_catalog | continuous_aggs_invalidation_threshold | table | super_user
_timescaledb_catalog | continuous_aggs_materialization_invalidation_log | table | super_user
_timescaledb_catalog | continuous_aggs_watermark | table | super_user
_timescaledb_catalog | dimension | table | super_user
_timescaledb_catalog | dimension_partition | table | super_user
_timescaledb_catalog | dimension_slice | table | super_user
Expand All @@ -217,7 +218,7 @@ SELECT * FROM _timescaledb_catalog.hypertable;
_timescaledb_catalog | metadata | table | super_user
_timescaledb_catalog | remote_txn | table | super_user
_timescaledb_catalog | tablespace | table | super_user
(23 rows)
(24 rows)

\dt "_timescaledb_internal".*
List of relations
Expand Down

0 comments on commit 4ad547f

Please sign in to comment.