Skip to content

Commit

Permalink
Perform startup chunk exclusion in parallel leader
Browse files Browse the repository at this point in the history
The parallel version of the ChunkAppend node uses shared memory to
coordinate the plan selection for the parallel workers. If the workers
perform the startup exclusion individually, it may choose different
subplans for each worker (e.g., due to a "constant" function that claims
to be constant but returns different results). In that case, we have a
disagreement about the plans between the workers.  This would lead to
hard-to-debug problems and out-of-bounds reads when pstate->next_plan is
used for subplan selection.

With this patch, startup exclusion is only performed in the parallel
leader. The leader stores this information in shared memory. The
parallel workers read the information from shared memory and don't
perform startup exclusion.
  • Loading branch information
jnidzwetzki committed Jul 7, 2023
1 parent 490bc91 commit 21f9986
Show file tree
Hide file tree
Showing 6 changed files with 1,338 additions and 16 deletions.
166 changes: 150 additions & 16 deletions src/nodes/chunk_append/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,17 @@
#define INVALID_SUBPLAN_INDEX (-1)
#define NO_MATCHING_SUBPLANS (-2)

typedef enum SubplanState
{
SUBPLAN_STATE_INCLUDED = 1 << 0, /* Used and not removed by startup exclusion */
SUBPLAN_STATE_FINISHED = 1 << 1, /* The subplan is finished */
} SubplanState;

typedef struct ParallelChunkAppendState
{
int next_plan;
bool finished[FLEXIBLE_ARRAY_MEMBER];
int filtered_first_partial_plan;
uint32 subplan_state[FLEXIBLE_ARRAY_MEMBER]; /* See SubplanState */
} ParallelChunkAppendState;

typedef struct ChunkAppendState
Expand All @@ -57,6 +64,10 @@ typedef struct ChunkAppendState
bool runtime_initialized;
uint32 limit;

#ifdef USE_ASSERT_CHECKING
bool init_done;
#endif

/* list of subplans after planning */
List *initial_subplans;
/* list of constraints indexed like initial_subplans */
Expand All @@ -72,6 +83,8 @@ typedef struct ChunkAppendState
List *filtered_constraints;
/* list of restrictinfo clauses after startup exclusion */
List *filtered_ri_clauses;
/* included subplans by startup exclusion */
Bitmapset *included_subplans_by_se;

/* valid subplans for runtime exclusion */
Bitmapset *valid_subplans;
Expand All @@ -88,6 +101,8 @@ typedef struct ChunkAppendState
LWLock *lock;
ParallelContext *pcxt;
ParallelChunkAppendState *pstate;
EState *estate;
int eflags;
void (*choose_next_subplan)(struct ChunkAppendState *);
} ChunkAppendState;

Expand Down Expand Up @@ -131,6 +146,8 @@ static void show_sort_group_keys(ChunkAppendState *planstate, List *ancestors, E
static void show_sortorder_options(StringInfo buf, Node *sortexpr, Oid sortOperator, Oid collation,
bool nullsFirst);

static void perform_plan_init(ChunkAppendState *state, EState *estate, int eflags);

Node *
ts_chunk_append_state_create(CustomScan *cscan)
{
Expand Down Expand Up @@ -188,6 +205,9 @@ do_startup_exclusion(ChunkAppendState *state)
.glob = &glob,
};

/* Reset included subplans */
state->included_subplans_by_se = NULL;

/*
* clauses and constraints should always have the same length as initial_subplans
*/
Expand Down Expand Up @@ -246,6 +266,7 @@ do_startup_exclusion(ChunkAppendState *state)
}
}

state->included_subplans_by_se = bms_add_member(state->included_subplans_by_se, i);
filtered_children = lappend(filtered_children, lfirst(lc_plan));
filtered_ri_clauses = lappend(filtered_ri_clauses, ri_clauses);
filtered_constraints = lappend(filtered_constraints, lfirst(lc_constraints));
Expand All @@ -255,6 +276,9 @@ do_startup_exclusion(ChunkAppendState *state)
state->filtered_ri_clauses = filtered_ri_clauses;
state->filtered_constraints = filtered_constraints;
state->filtered_first_partial_plan = filtered_first_partial_plan;

Assert(list_length(state->filtered_subplans) ==
bms_num_members(state->included_subplans_by_se));
}

/*
Expand All @@ -267,8 +291,6 @@ chunk_append_begin(CustomScanState *node, EState *estate, int eflags)
{
CustomScan *cscan = castNode(CustomScan, node->ss.ps.plan);
ChunkAppendState *state = (ChunkAppendState *) node;
ListCell *lc;
int i;

/* CustomScan hard-codes the scan and result tuple slot to a fixed
* TTSOpsVirtual ops (meaning it expects the slot ops of the child tuple to
Expand All @@ -293,9 +315,59 @@ chunk_append_begin(CustomScanState *node, EState *estate, int eflags)

initialize_constraints(state, lthird(cscan->custom_private));

/* In parallel mode with a parallel_aware plan, the parallel leader performs the startup
* exclusion and stores the result in shared memory (the flag SUBPLAN_STATE_INCLUDED of
* pstate->subplan_state is set for all included plans).
*
* The parallel workers use the information from shared memory to include the same plans as the
* parallel leader. This ensures that all workers work on the same subplans and we have an
* agreement about the number of subplans. This is necessary to ensure that the parallel workers
* work correctly and that the next subplan to be processed in the shared memory
* (pstate->next_plan) pointers to the same plan in all workers.
*
* If the workers perform the startup exclusion individually, it may choose different subplans
* for each worker (e.g., due to a "constant" function that claims to be constant but returns
* different results). In that case, we have a disagreement about the plans between the workers.
* This would lead to hard-to-debug problems and out-of-bounds reads when pstate->next_plan is
* used for subplan selection.
*
*/
if (IsParallelWorker() && node->ss.ps.plan->parallel_aware)
{
/* The included subplans are initialized in chunk_append_initialize_worker. The needed
* estate and eflags are stored to be able to perform the delayed initialization in
* that function.
*
* Note: Due to force_parallel_mode, this node can be used in a ParallelWorker without
* a parallel_aware plan. So, we have to check both conditions above to determine if
* chunk_append_initialize_worker will be called or not and we can delay the
* invokation of perform_plan_init().
*/
state->estate = estate;
state->eflags = eflags;
return;
}

if (state->startup_exclusion)
do_startup_exclusion(state);

perform_plan_init(state, estate, eflags);
}

/*
* Perform an initialization of the filtered_subplans.
*/
static void
perform_plan_init(ChunkAppendState *state, EState *estate, int eflags)
{
ListCell *lc;
int i;

#ifdef USE_ASSERT_CHECKING
Assert(state->init_done == false);
state->init_done = true;
#endif

state->num_subplans = list_length(state->filtered_subplans);

if (state->num_subplans == 0)
Expand All @@ -314,7 +386,7 @@ chunk_append_begin(CustomScanState *node, EState *estate, int eflags)
* so explain and planstate_tree_walker can find it
*/
state->subplanstates[i] = ExecInitNode(lfirst(lc), estate, eflags);
node->custom_ps = lappend(node->custom_ps, state->subplanstates[i]);
state->csstate.custom_ps = lappend(state->csstate.custom_ps, state->subplanstates[i]);

/*
* pass down limit to child nodes
Expand All @@ -331,7 +403,7 @@ chunk_append_begin(CustomScanState *node, EState *estate, int eflags)
/*
* make sure all params are initialized for runtime exclusion
*/
node->ss.ps.chgParam = bms_copy(state->subplanstates[0]->plan->allParam);
state->csstate.ss.ps.chgParam = bms_copy(state->subplanstates[0]->plan->allParam);
}
}

Expand Down Expand Up @@ -462,6 +534,8 @@ chunk_append_exec(CustomScanState *node)
ProjectionInfo *projinfo = node->ss.ps.ps_ProjInfo;
TupleTableSlot *subslot;

Assert(state->init_done == true);

if (state->current == INVALID_SUBPLAN_INDEX)
state->choose_next_subplan(state);

Expand Down Expand Up @@ -548,7 +622,8 @@ choose_next_subplan_for_worker(ChunkAppendState *state)

/* mark just completed subplan as finished */
if (state->current >= 0)
pstate->finished[state->current] = true;
pstate->subplan_state[state->current] =
ts_set_flags_32(pstate->subplan_state[state->current], SUBPLAN_STATE_FINISHED);

if (pstate->next_plan == INVALID_SUBPLAN_INDEX)
next_plan = get_next_subplan(state, INVALID_SUBPLAN_INDEX);
Expand All @@ -567,7 +642,7 @@ choose_next_subplan_for_worker(ChunkAppendState *state)
start = next_plan;

/* skip finished subplans */
while (pstate->finished[next_plan])
while (ts_flags_are_set_32(pstate->subplan_state[next_plan], SUBPLAN_STATE_FINISHED))
{
next_plan = get_next_subplan(state, next_plan);

Expand Down Expand Up @@ -602,7 +677,8 @@ choose_next_subplan_for_worker(ChunkAppendState *state)
* immediately so it does not get assigned another worker
*/
if (next_plan < state->filtered_first_partial_plan)
pstate->finished[next_plan] = true;
pstate->subplan_state[next_plan] =
ts_set_flags_32(pstate->subplan_state[next_plan], SUBPLAN_STATE_FINISHED);

Check warning on line 681 in src/nodes/chunk_append/exec.c

View check run for this annotation

Codecov / codecov/patch

src/nodes/chunk_append/exec.c#L680-L681

Added lines #L680 - L681 were not covered by tests

/* advance next_plan for next worker */
pstate->next_plan = get_next_subplan(state, state->current);
Expand Down Expand Up @@ -677,8 +753,35 @@ static Size
chunk_append_estimate_dsm(CustomScanState *node, ParallelContext *pcxt)
{
ChunkAppendState *state = (ChunkAppendState *) node;
return add_size(offsetof(ParallelChunkAppendState, finished),
sizeof(bool) * state->num_subplans);
return add_size(offsetof(ParallelChunkAppendState, subplan_state),
sizeof(uint32) * list_length(state->initial_subplans));
}

/*
* Initialize the parallel state.
*/
static void
init_pstate(ChunkAppendState *state, ParallelChunkAppendState *pstate)
{
Assert(state != NULL);
Assert(pstate != NULL);
Assert(state->csstate.pscan_len > 0);

/* The parallel worker state has to be (re-)initialized by the parallel leader */
Assert(!IsParallelWorker());

memset(pstate, 0, state->csstate.pscan_len);

pstate->next_plan = INVALID_SUBPLAN_INDEX;
pstate->filtered_first_partial_plan = state->filtered_first_partial_plan;

/* Mark active subplans in parallel state */
int plan = -1;
while ((plan = bms_next_member(state->included_subplans_by_se, plan)) >= 0)
{
pstate->subplan_state[plan] =
ts_set_flags_32(pstate->subplan_state[plan], SUBPLAN_STATE_INCLUDED);
}
}

/*
Expand All @@ -694,11 +797,9 @@ chunk_append_initialize_dsm(CustomScanState *node, ParallelContext *pcxt, void *
{
ChunkAppendState *state = (ChunkAppendState *) node;
ParallelChunkAppendState *pstate = (ParallelChunkAppendState *) coordinate;

memset(pstate, 0, node->pscan_len);
init_pstate(state, pstate);

state->lock = chunk_append_get_lock_pointer();
pstate->next_plan = INVALID_SUBPLAN_INDEX;

/*
* Leader should use the same subplan selection as normal worker threads. If the user wishes to
Expand Down Expand Up @@ -726,9 +827,7 @@ chunk_append_reinitialize_dsm(CustomScanState *node, ParallelContext *pcxt, void
{
ChunkAppendState *state = (ChunkAppendState *) node;
ParallelChunkAppendState *pstate = (ParallelChunkAppendState *) coordinate;

pstate->next_plan = INVALID_SUBPLAN_INDEX;
memset(pstate->finished, 0, sizeof(bool) * state->num_subplans);
init_pstate(state, pstate);
}

/*
Expand All @@ -744,10 +843,45 @@ chunk_append_initialize_worker(CustomScanState *node, shm_toc *toc, void *coordi
ChunkAppendState *state = (ChunkAppendState *) node;
ParallelChunkAppendState *pstate = (ParallelChunkAppendState *) coordinate;

Assert(IsParallelWorker());
Assert(node->ss.ps.plan->parallel_aware);
Assert(pstate != NULL);
Assert(state->estate != NULL);

/* Read information about included plans by startup exclusion from the parallel state */
state->filtered_first_partial_plan = pstate->filtered_first_partial_plan;

List *filtered_subplans = NIL;
List *filtered_ri_clauses = NIL;
List *filtered_constraints = NIL;

for (int plan = 0; plan < list_length(state->initial_subplans); plan++)
{
if (ts_flags_are_set_32(pstate->subplan_state[plan], SUBPLAN_STATE_INCLUDED))
{
filtered_subplans =
lappend(filtered_subplans, list_nth(state->filtered_subplans, plan));
filtered_ri_clauses =
lappend(filtered_ri_clauses, list_nth(state->filtered_ri_clauses, plan));
filtered_constraints =
lappend(filtered_constraints, list_nth(state->filtered_constraints, plan));
}
}

state->filtered_subplans = filtered_subplans;
state->filtered_ri_clauses = filtered_ri_clauses;
state->filtered_constraints = filtered_constraints;

Assert(list_length(state->filtered_subplans) == list_length(state->filtered_ri_clauses));
Assert(list_length(state->filtered_ri_clauses) == list_length(state->filtered_constraints));

state->lock = chunk_append_get_lock_pointer();
state->choose_next_subplan = choose_next_subplan_for_worker;
state->current = INVALID_SUBPLAN_INDEX;
state->pstate = pstate;

perform_plan_init(state, state->estate, state->eflags);
Assert(state->num_subplans == list_length(state->filtered_subplans));
}

/*
Expand Down
Loading

0 comments on commit 21f9986

Please sign in to comment.