Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat segmentby columns same as compressed columns with default value #6817

Merged
merged 5 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 40 additions & 34 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static void
decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, int i)
{
CompressionColumnDescription *column_description = &dcontext->template_columns[i];
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->arrow = NULL;
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
Expand All @@ -177,7 +177,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
* The column will have a default value for the entire batch,
* set it now.
*/
column_values->decompression_type = DT_Default;
column_values->decompression_type = DT_Scalar;

*column_values->output_value =
getmissingattr(dcontext->decompressed_slot->tts_tupleDescriptor,
Expand Down Expand Up @@ -396,22 +396,21 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
Var *var = castNode(Var, linitial(args));
CompressionColumnDescription *column_description = NULL;
int column_index = 0;
for (; column_index < dcontext->num_total_columns; column_index++)
for (; column_index < dcontext->num_data_columns; column_index++)
{
column_description = &dcontext->template_columns[column_index];
column_description = &dcontext->compressed_chunk_columns[column_index];
if (column_description->output_attno == var->varattno)
{
break;
}
}
Ensure(column_index < dcontext->num_total_columns,
Ensure(column_index < dcontext->num_data_columns,
"decompressed column %d not found in batch",
var->varattno);
Assert(column_description != NULL);
Assert(column_description->typid == var->vartype);
Ensure(column_description->type == COMPRESSED_COLUMN,
"only compressed columns are supported in vectorized quals");
Assert(column_index < dcontext->num_compressed_columns);

CompressedColumnValues *column_values = &batch_state->compressed_columns[column_index];

Expand Down Expand Up @@ -444,7 +443,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
* with this default value, check if it passes the predicate, and apply
* it to the entire batch.
*/
Assert(column_values->decompression_type == DT_Default);
Assert(column_values->decompression_type == DT_Scalar);

/*
* We saved the actual default value into the decompressed scan slot
Expand Down Expand Up @@ -548,7 +547,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
/* Translate the result if the column had a default value. */
if (column_values->arrow == NULL)
{
Assert(column_values->decompression_type == DT_Default);
Assert(column_values->decompression_type == DT_Scalar);
if (!(default_value_predicate_result[0] & 1))
{
/*
Expand Down Expand Up @@ -804,20 +803,20 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,

MemoryContextReset(batch_state->per_batch_context);

for (int i = 0; i < dcontext->num_total_columns; i++)
for (int i = 0; i < dcontext->num_columns_with_metadata; i++)
{
CompressionColumnDescription *column_description = &dcontext->template_columns[i];
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];

switch (column_description->type)
{
case COMPRESSED_COLUMN:
{
Assert(i < dcontext->num_compressed_columns);
/*
* We decompress the compressed columns on demand, so that we can
* skip decompressing some columns if the entire batch doesn't pass
* the quals. Skip them for now.
*/
Assert(i < dcontext->num_data_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->decompression_type = DT_Invalid;
column_values->arrow = NULL;
Expand All @@ -830,25 +829,32 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
* and our output tuples are read-only, so it's enough to only
* save it once per batch, which we do here.
*/
Assert(i < dcontext->num_data_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->decompression_type = DT_Scalar;
AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
decompressed_tuple->tts_values[attr] =
slot_getattr(compressed_slot,
column_description->compressed_scan_attno,
&decompressed_tuple->tts_isnull[attr]);
Datum *output_value = &decompressed_tuple->tts_values[attr];
bool *output_isnull = &decompressed_tuple->tts_isnull[attr];
column_values->output_value = output_value;
column_values->output_isnull = output_isnull;
column_values->arrow = NULL;

*output_value = slot_getattr(compressed_slot,
column_description->compressed_scan_attno,
output_isnull);

/*
* Note that if it's not a by-value type, we should copy it into
* the slot context.
*/
if (!column_description->by_value &&
DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL)
if (!column_description->by_value && !*output_isnull &&
DatumGetPointer(*output_value) != NULL)
{
if (column_description->value_bytes < 0)
{
/* This is a varlena type. */
decompressed_tuple->tts_values[attr] = PointerGetDatum(
detoaster_detoast_attr_copy((struct varlena *)
decompressed_tuple->tts_values[attr],
*output_value = PointerGetDatum(
detoaster_detoast_attr_copy((struct varlena *) *output_value,
&dcontext->detoaster,
batch_state->per_batch_context));
}
Expand All @@ -858,9 +864,9 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
void *tmp = MemoryContextAlloc(batch_state->per_batch_context,
column_description->value_bytes);
memcpy(tmp,
DatumGetPointer(decompressed_tuple->tts_values[attr]),
DatumGetPointer(*output_value),
column_description->value_bytes);
decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp);
*output_value = PointerGetDatum(tmp);
}
}
break;
Expand Down Expand Up @@ -923,8 +929,8 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
* We have some rows in the batch that pass the vectorized filters, so
* we have to decompress the rest of the compressed columns.
*/
const int num_compressed_columns = dcontext->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
const int num_data_columns = dcontext->num_data_columns;
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Invalid)
Expand Down Expand Up @@ -965,14 +971,14 @@ store_text_datum(CompressedColumnValues *column_values, int arrow_row)
* Doesn't check the quals.
*/
static void
make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_compressed_columns)
make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_data_columns)
{
TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base;

Assert(batch_state->total_batch_rows > 0);
Assert(batch_state->next_batch_row < batch_state->total_batch_rows);

for (int i = 0; i < num_compressed_columns; i++)
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
Expand Down Expand Up @@ -1035,7 +1041,7 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com
else
{
/* A compressed column with default value, do nothing. */
Assert(column_values->decompression_type == DT_Default);
Assert(column_values->decompression_type == DT_Scalar);
}
}

Expand Down Expand Up @@ -1101,7 +1107,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base;

const bool reverse = dcontext->reverse;
const int num_compressed_columns = dcontext->num_compressed_columns;
const int num_data_columns = dcontext->num_data_columns;

for (; batch_state->next_batch_row < batch_state->total_batch_rows;
batch_state->next_batch_row++)
Expand All @@ -1116,7 +1122,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
* This row doesn't pass the vectorized quals. Advance the iterated
* compressed columns if we have any.
*/
for (int i = 0; i < num_compressed_columns; i++)
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
Expand All @@ -1131,7 +1137,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
continue;
}

make_next_tuple(batch_state, arrow_row, num_compressed_columns);
make_next_tuple(batch_state, arrow_row, num_data_columns);

if (!postgres_qual(dcontext, batch_state))
{
Expand All @@ -1153,7 +1159,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
* row-by-row have also ended.
*/
Assert(batch_state->next_batch_row == batch_state->total_batch_rows);
for (int i = 0; i < num_compressed_columns; i++)
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
Expand Down Expand Up @@ -1191,8 +1197,8 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt
* vectorized decompression is disabled with sorted merge.
*/
#ifdef USE_ASSERT_CHECKING
const int num_compressed_columns = dcontext->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
const int num_data_columns = dcontext->num_data_columns;
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
Assert(column_values->decompression_type != DT_Invalid);
Expand All @@ -1202,7 +1208,7 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt
/* Make the first tuple and save it. */
Assert(batch_state->next_batch_row == 0);
const uint16 arrow_row = dcontext->reverse ? batch_state->total_batch_rows - 1 : 0;
make_next_tuple(batch_state, arrow_row, dcontext->num_compressed_columns);
make_next_tuple(batch_state, arrow_row, dcontext->num_data_columns);
ExecCopySlot(first_tuple_slot, &batch_state->decompressed_scan_slot_data.base);

/*
Expand Down
20 changes: 18 additions & 2 deletions tsl/src/nodes/decompress_chunk/compressed_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,23 @@ typedef struct ArrowArray ArrowArray;
typedef enum
{
DT_ArrowTextDict = -4,

DT_ArrowText = -3,
DT_Default = -2,

/*
* The decompressed value is already in the decompressed slot. This is used
* for segmentby and compressed columns with default value in batch.
*/
DT_Scalar = -2,

DT_Iterator = -1,

DT_Invalid = 0,

/*
* Any positive number is also valid for the decompression type. It means
* arrow array of a fixed-size by-value type, with size given by the number.
* arrow array of a fixed-size by-value type, with size in bytes given by
* the number.
*/
} DecompressionType;

Expand Down Expand Up @@ -93,6 +103,12 @@ typedef struct DecompressBatchState
*/
uint64 *restrict vector_qual_result;

/*
* This follows DecompressContext.compressed_chunk_columns, but does not
* include the trailing metadata columns, but only the leading data columns.
* These columns are compressed and segmentby columns, their total number is
* given by DecompressContext.num_data_columns.
*/
CompressedColumnValues compressed_columns[FLEXIBLE_ARRAY_MEMBER];
} DecompressBatchState;

Expand Down
20 changes: 17 additions & 3 deletions tsl/src/nodes/decompress_chunk/decompress_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,23 @@ typedef struct CompressionColumnDescription

typedef struct DecompressContext
{
CompressionColumnDescription *template_columns;
int num_total_columns;
int num_compressed_columns;
/*
* Note that this array contains only those columns that are decompressed
* (output_attno != 0), and the order is different from the compressed chunk
* tuple order: first go the actual data columns, and after that the metadata
* columns.
*/
CompressionColumnDescription *compressed_chunk_columns;

/*
* This includes all decompressed columns (output_attno != 0), including the
* metadata columns.
*/
int num_columns_with_metadata;

/* This excludes the metadata columns. */
int num_data_columns;

List *vectorized_quals_constified;
bool reverse;
bool batch_sorted_merge; /* Merge append optimization enabled */
Expand Down
Loading
Loading