Skip to content

Commit

Permalink
Pushdown ORDER BY for realtime caggs
Browse files Browse the repository at this point in the history
Previously ordered queries on realtime caggs would always lead to full
table scan as the query plan would have a sort with the limit on top.
With this patch this gets changed so that the ORDER BY can be pushed
down so the query can benefit from the ordered append optimization and
does not require full table scan.

Since the internal structure is different on PG 14 and 15 this
optimization will only be available on PG 16 and 17.

Fixes #4861
  • Loading branch information
svenklemm committed Oct 15, 2024
1 parent d2706a2 commit 9c7085a
Show file tree
Hide file tree
Showing 18 changed files with 748 additions and 160 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7271
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7271 Push down ORDER BY in real time continuous aggregate queries
2 changes: 1 addition & 1 deletion src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ ts_tsl_loaded(PG_FUNCTION_ARGS)
}

static void
preprocess_query_tsl_default_fn_community(Query *parse)
preprocess_query_tsl_default_fn_community(Query *parse, int *cursor_opts)
{
/* No op in community licensed code */
}
Expand Down
2 changes: 1 addition & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ typedef struct CrossModuleFunctions
PGFunction chunk_unfreeze_chunk;
PGFunction recompress_chunk_segmentwise;
PGFunction get_compressed_chunk_index_for_recompression;
void (*preprocess_query_tsl)(Query *parse);
void (*preprocess_query_tsl)(Query *parse, int *cursor_opts);
} CrossModuleFunctions;

extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
Expand Down
17 changes: 17 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <utils/regproc.h>
#include <utils/varlena.h>

#include "compat/compat.h"
#include "config.h"
#include "extension.h"
#include "guc.h"
Expand Down Expand Up @@ -66,6 +67,9 @@ bool ts_guc_enable_qual_propagation = true;
bool ts_guc_enable_cagg_reorder_groupby = true;
bool ts_guc_enable_now_constify = true;
bool ts_guc_enable_foreign_key_propagation = true;
#if PG16_GE
TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown = true;
#endif
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
bool ts_guc_enable_osm_reads = true;
Expand Down Expand Up @@ -563,6 +567,19 @@ _guc_init(void)
NULL,
NULL);

#if PG16_GE
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_sort_pushdown"),
"Enable sort pushdown for continuous aggregates",
"Enable pushdown of ORDER BY clause for continuous aggregates",
&ts_guc_enable_cagg_sort_pushdown,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
#endif

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_watermark_constify"),
"Enable cagg watermark constify",
"Enable constifying cagg watermark for real-time caggs",
Expand Down
7 changes: 6 additions & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#pragma once

#include <postgres.h>

#include "compat/compat.h"
#include "config.h"
#include "export.h"

Expand All @@ -27,8 +29,11 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
#if PG16_GE
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
#endif
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering;
extern TSDLLEXPORT bool ts_guc_enable_compressed_direct_batch_delete;
Expand Down
2 changes: 1 addition & 1 deletion src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ timescaledb_planner(Query *parse, const char *query_string, int cursor_opts,
preprocess_query((Node *) parse, &context);

if (ts_guc_enable_optimizations)
ts_cm_functions->preprocess_query_tsl(parse);
ts_cm_functions->preprocess_query_tsl(parse, &cursor_opts);
}

if (prev_planner_hook != NULL)
Expand Down
103 changes: 103 additions & 0 deletions tsl/src/continuous_aggs/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,106 @@ constify_cagg_watermark(Query *parse)
if (context.valid_query)
replace_watermark_with_const(&context);
}

#if PG16_GE
/*
* Push down ORDER BY and LIMIT into subqueries of UNION for realtime
* continuous aggregates when sorting by time.
*
* This is only enabled on PG16 and above because the internal structure is different
* in previous versions.
*/
void
cagg_sort_pushdown(Query *parse, int *cursor_opts)
{
ListCell *lc;

/* Nothing to do if we have no valid sort clause */
if (list_length(parse->rtable) != 1 || list_length(parse->sortClause) != 1 ||
!OidIsValid(linitial_node(SortGroupClause, parse->sortClause)->sortop))
return;

Cache *cache = ts_hypertable_cache_pin();

foreach (lc, parse->rtable)
{
RangeTblEntry *rte = lfirst(lc);

/*
* Realtime cagg view will have 2 rtable entries, one for the materialized data and one for
* the not yet materialized data.
*/
if (rte->rtekind != RTE_SUBQUERY || rte->relkind != RELKIND_VIEW ||
list_length(rte->subquery->rtable) != 2)
continue;

ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(rte->relid);

/*
* This optimization only applies to realtime caggs.
*/
if (!cagg || !cagg->data.finalized || cagg->data.materialized_only)
continue;

Hypertable *ht = ts_hypertable_cache_get_entry_by_id(cache, cagg->data.mat_hypertable_id);
Dimension const *dim = hyperspace_get_open_dimension(ht->space, 0);

/* We should only encounter hypertables with an open dimension */
if (!dim)
continue;

Check warning on line 404 in tsl/src/continuous_aggs/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/planner.c#L404

Added line #L404 was not covered by tests

SortGroupClause *sort = linitial_node(SortGroupClause, parse->sortClause);
TargetEntry *tle = get_sortgroupref_tle(sort->tleSortGroupRef, parse->targetList);

/*
* We only pushdown ORDER BY when it's single column
* ORDER BY on the time column.
*/
AttrNumber time_col = dim->column_attno;
if (!IsA(tle->expr, Var) || castNode(Var, tle->expr)->varattno != time_col)
continue;

RangeTblEntry *mat_rte = linitial_node(RangeTblEntry, rte->subquery->rtable);
RangeTblEntry *rt_rte = lsecond_node(RangeTblEntry, rte->subquery->rtable);

mat_rte->subquery->sortClause = list_copy(parse->sortClause);
rt_rte->subquery->sortClause = list_copy(parse->sortClause);

TargetEntry *mat_tle = list_nth(mat_rte->subquery->targetList, time_col - 1);
TargetEntry *rt_tle = list_nth(rt_rte->subquery->targetList, time_col - 1);
linitial_node(SortGroupClause, mat_rte->subquery->sortClause)->tleSortGroupRef =
mat_tle->ressortgroupref;
linitial_node(SortGroupClause, rt_rte->subquery->sortClause)->tleSortGroupRef =
rt_tle->ressortgroupref;

SortGroupClause *cagg_group = linitial(rt_rte->subquery->groupClause);
cagg_group = list_nth(rt_rte->subquery->groupClause, rt_tle->ressortgroupref - 1);
cagg_group->sortop = sort->sortop;
cagg_group->nulls_first = sort->nulls_first;

Oid placeholder;
int16 strategy;
get_ordering_op_properties(sort->sortop, &placeholder, &placeholder, &strategy);

/*
* If this is DESC order and the sortop is the commutator of the cagg_group sortop,
* we can align the sortop of the cagg_group with the sortop of the sort clause, which
* will allow us to have the GroupAggregate node to produce the correct order and avoid
* having to resort.
*/
if (strategy == BTGreaterStrategyNumber)
{
rte->subquery->rtable = list_make2(rt_rte, mat_rte);
}

/*
* We have to prevent parallelism when we do this optimization because
* the subplans of the Append have to be processed sequentially.
*/
*cursor_opts = *cursor_opts & ~CURSOR_OPT_PARALLEL_OK;
parse->sortClause = NIL;
rte->subquery->sortClause = NIL;
}
ts_cache_release(cache);
}
#endif
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
#include "planner/planner.h"

void constify_cagg_watermark(Query *parse);
void cagg_sort_pushdown(Query *parse, int *cursor_opts);

#endif
10 changes: 9 additions & 1 deletion tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ tsl_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntr
* Run preprocess query optimizations
*/
void
tsl_preprocess_query(Query *parse)
tsl_preprocess_query(Query *parse, int *cursor_opts)
{
Assert(parse != NULL);

Expand All @@ -220,6 +220,14 @@ tsl_preprocess_query(Query *parse)
{
constify_cagg_watermark(parse);
}

#if PG16_GE
/* Push down ORDER BY and LIMIT for realtime cagg (PG16+ only) */
if (ts_guc_enable_cagg_sort_pushdown)
{
cagg_sort_pushdown(parse, cursor_opts);
}
#endif
}

/*
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ void tsl_create_upper_paths_hook(PlannerInfo *, UpperRelationKind, RelOptInfo *,
void tsl_set_rel_pathlist_query(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *, Hypertable *);
void tsl_set_rel_pathlist_dml(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *, Hypertable *);
void tsl_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
void tsl_preprocess_query(Query *parse);
void tsl_preprocess_query(Query *parse, int *cursor_opts);
void tsl_postprocess_plan(PlannedStmt *stmt);
Loading

0 comments on commit 9c7085a

Please sign in to comment.