Skip to content

Commit

Permalink
Improvements for bulk decompression
Browse files Browse the repository at this point in the history
* Restore default batch context size to fix a performance regression on
  sorted batch merge plans.
* Support reverse direction.
* Improve gorilla decompression by computing prefix sums of tag bitmaps
  during decompression.
  • Loading branch information
akuzm committed Jul 6, 2023
1 parent 7657efe commit eaa1206
Show file tree
Hide file tree
Showing 22 changed files with 549 additions and 359 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/libfuzzer.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: Libfuzzer
"on":
schedule:
# run daily 1:00 on main branch
- cron: '0 1 * * *'
push:
branches:
- main
Expand Down
27 changes: 2 additions & 25 deletions src/adts/bit_array_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,13 @@ bit_array_recv(const StringInfo buffer)
.num_elements = num_elements,
.max_elements = num_elements,
.ctx = CurrentMemoryContext,
/* Add one-element padding so that we can relax the checks for incorrect data. */
.data = palloc((num_elements + 1) * sizeof(uint64)),
.data = palloc(num_elements * sizeof(uint64)),
},
};

for (i = 0; i < num_elements; i++)
array.buckets.data[i] = pq_getmsgint64(buffer);

/* Zero out the padding for more predictable behavior under fuzzing. */
array.buckets.data[num_elements] = 0;
if (num_elements > 0)
{
CheckCompressedData(bits_used_in_last_bucket > 0);
array.buckets.data[num_elements - 1] &= -1ULL >> (64 - bits_used_in_last_bucket);
}

return array;
}

Expand Down Expand Up @@ -249,15 +240,11 @@ bit_array_iter_next(BitArrayIterator *iter, uint8 num_bits)
uint8 num_bits_from_next_bucket;
uint64 value = 0;
uint64 value_from_next_bucket;
Assert(num_bits <= 64);
CheckCompressedData(num_bits <= 64);
if (num_bits == 0)
return 0;

CheckCompressedData(iter->current_bucket < iter->array->buckets.num_elements);
if (iter->current_bucket == iter->array->buckets.num_elements - 1)
{
Assert(iter->bits_used_in_current_bucket <= iter->array->bits_used_in_last_bucket);
}

bits_remaining_in_current_bucket = 64 - iter->bits_used_in_current_bucket;
if (bits_remaining_in_current_bucket >= num_bits)
Expand All @@ -267,11 +254,6 @@ bit_array_iter_next(BitArrayIterator *iter, uint8 num_bits)
value &= bit_array_low_bits_mask(num_bits);
iter->bits_used_in_current_bucket += num_bits;

if (iter->current_bucket == iter->array->buckets.num_elements - 1)
{
CheckCompressedData(iter->bits_used_in_current_bucket <=
iter->array->bits_used_in_last_bucket);
}
return value;
}

Expand All @@ -293,11 +275,6 @@ bit_array_iter_next(BitArrayIterator *iter, uint8 num_bits)
iter->current_bucket += 1;
iter->bits_used_in_current_bucket = num_bits_from_next_bucket;

if (iter->current_bucket == iter->array->buckets.num_elements - 1)
{
CheckCompressedData(iter->bits_used_in_current_bucket <=
iter->array->bits_used_in_last_bucket);
}
return value;
}

Expand Down
18 changes: 9 additions & 9 deletions tsl/src/compression/arrow_c_data_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ typedef struct ArrowArray

/*
* We don't use the schema but have to define it for completeness because we're
* defining ARROW_C_DATA_INTERFACE macro.
* defining the ARROW_C_DATA_INTERFACE macro.
*/
struct ArrowSchema
{
Expand All @@ -135,22 +135,22 @@ struct ArrowSchema
#endif

static pg_attribute_always_inline bool
arrow_row_is_valid(const uint64 *bitmap, int row_number)
arrow_row_is_valid(const uint64 *bitmap, size_t row_number)
{
const int qword_index = row_number / 64;
const int bit_index = row_number % 64;
const size_t qword_index = row_number / 64;
const size_t bit_index = row_number % 64;
const uint64 mask = 1ull << bit_index;
return (bitmap[qword_index] & mask) ? 1 : 0;
return bitmap[qword_index] & mask;
}

static pg_attribute_always_inline void
arrow_set_row_validity(uint64 *bitmap, int row_number, bool value)
arrow_set_row_validity(uint64 *bitmap, size_t row_number, bool value)
{
const int qword_index = row_number / 64;
const int bit_index = row_number % 64;
const size_t qword_index = row_number / 64;
const size_t bit_index = row_number % 64;
const uint64 mask = 1ull << bit_index;

bitmap[qword_index] = (bitmap[qword_index] & ~mask) | (((uint64) !!value) << bit_index);
bitmap[qword_index] = (bitmap[qword_index] & ~mask) | ((-(uint64) value) & mask);

Assert(arrow_row_is_valid(bitmap, row_number) == value);
}
11 changes: 3 additions & 8 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,13 @@ DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorith
return definitions[algorithm].iterator_init_forward;
}

ArrowArray *
tsl_try_decompress_all(CompressionAlgorithms algorithm, Datum compressed_data, Oid element_type)
DecompressAllFunction
tsl_get_decompress_all_function(CompressionAlgorithms algorithm)
{
if (algorithm >= _END_COMPRESSION_ALGORITHMS)
elog(ERROR, "invalid compression algorithm %d", algorithm);

if (definitions[algorithm].decompress_all)
{
return definitions[algorithm].decompress_all(compressed_data, element_type);
}

return NULL;
return definitions[algorithm].decompress_all;
}

static Tuplesortstate *compress_chunk_sort_relation(Relation in_rel, int n_keys,
Expand Down
10 changes: 6 additions & 4 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,14 @@ typedef enum
TOAST_STORAGE_EXTENDED
} CompressionStorage;

typedef ArrowArray *(*DecompressAllFunction)(Datum compressed, Oid element_type,
MemoryContext dest_mctx);

typedef struct CompressionAlgorithmDefinition
{
DecompressionIterator *(*iterator_init_forward)(Datum, Oid element_type);
DecompressionIterator *(*iterator_init_reverse)(Datum, Oid element_type);
ArrowArray *(*decompress_all)(Datum, Oid element_type);
DecompressAllFunction decompress_all;
void (*compressed_data_send)(CompressedDataHeader *, StringInfo);
Datum (*compressed_data_recv)(StringInfo);

Expand Down Expand Up @@ -313,8 +316,7 @@ extern void decompress_chunk(Oid in_table, Oid out_table);
extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
CompressionAlgorithms algorithm, bool reverse))(Datum, Oid element_type);

extern ArrowArray *tsl_try_decompress_all(CompressionAlgorithms algorithm, Datum compressed_data,
Oid element_type);
extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithms algorithm);

typedef struct Chunk Chunk;
typedef struct ChunkInsertState ChunkInsertState;
Expand Down Expand Up @@ -373,7 +375,7 @@ extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel);
#endif

#define CheckCompressedData(X) \
if (!(X)) \
if (unlikely(!(X))) \
ereport(ERROR, CORRUPT_DATA_MESSAGE)

inline static void *
Expand Down
22 changes: 15 additions & 7 deletions tsl/src/compression/decompress_test_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,25 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
* For routine fuzzing, we only run bulk decompression to make it faster
* and the coverage space smaller.
*/
tsl_try_decompress_all(algo, compressed_data, PGTYPE);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo);
decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
return 0;
}

/*
* For normal testing, as opposed to the fuzzing code path above, run
* row-by-row decompression first, so that it's not masked by the more
* strict correctness checks of bulk decompression.
* Test bulk decompression. This might hide some errors in the row-by-row
* decompression, but testing both is significantly more complicated, and
* the row-by-row is old and stable.
*/
ArrowArray *arrow = NULL;
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo);
if (decompress_all)
{
arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
}

/*
* Test row-by-row decompression.
*/
DecompressionIterator *iter = definitions[algo].iterator_init_forward(compressed_data, PGTYPE);
DecompressResult results[GLOBAL_MAX_ROWS_PER_COMPRESSION];
Expand All @@ -64,9 +75,6 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
results[n++] = r;
}

/* Test bulk decompression. */
ArrowArray *arrow = tsl_try_decompress_all(algo, compressed_data, PGTYPE);

/* Check that both ways of decompression match. */
if (arrow)
{
Expand Down
8 changes: 4 additions & 4 deletions tsl/src/compression/deltadelta.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,19 +588,19 @@ delta_delta_decompression_iterator_try_next_forward(DecompressionIterator *iter)
#undef ELEMENT_TYPE

ArrowArray *
delta_delta_decompress_all(Datum compressed_data, Oid element_type)
delta_delta_decompress_all(Datum compressed_data, Oid element_type, MemoryContext dest_mctx)
{
switch (element_type)
{
case INT8OID:
case TIMESTAMPOID:
case TIMESTAMPTZOID:
return delta_delta_decompress_all_uint64(compressed_data);
return delta_delta_decompress_all_uint64(compressed_data, dest_mctx);
case INT4OID:
case DATEOID:
return delta_delta_decompress_all_uint32(compressed_data);
return delta_delta_decompress_all_uint32(compressed_data, dest_mctx);
case INT2OID:
return delta_delta_decompress_all_uint16(compressed_data);
return delta_delta_decompress_all_uint16(compressed_data, dest_mctx);
default:
elog(ERROR,
"type '%s' is not supported for deltadelta decompression",
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/compression/deltadelta.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ delta_delta_decompression_iterator_from_datum_reverse(Datum deltadelta_compresse
extern DecompressResult
delta_delta_decompression_iterator_try_next_forward(DecompressionIterator *iter);

extern ArrowArray *delta_delta_decompress_all(Datum compressed_data, Oid element_type);
extern ArrowArray *delta_delta_decompress_all(Datum compressed_data, Oid element_type,
MemoryContext dest_mctx);

extern DecompressResult
delta_delta_decompression_iterator_try_next_reverse(DecompressionIterator *iter);
Expand Down
60 changes: 42 additions & 18 deletions tsl/src/compression/deltadelta_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#define FUNCTION_NAME(X, Y) FUNCTION_NAME_HELPER(X, Y)

static ArrowArray *
FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed)
FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, MemoryContext dest_mctx)
{
StringInfoData si = { .data = DatumGetPointer(compressed), .len = VARSIZE(compressed) };
DeltaDeltaCompressed *header = consumeCompressedData(&si, sizeof(DeltaDeltaCompressed));
Expand All @@ -23,8 +23,14 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed)

Assert(header->has_nulls == 0 || header->has_nulls == 1);

/* Can't use element type here because of zig-zag encoding. */
int16 num_deltas;
/*
* Can't use element type here because of zig-zag encoding. The deltas are
* computed in uint64, so we can get a delta that is actually larger than
* the element type. We can't just truncate the delta either, because it
* will lead to broken decompression results. The test case is in
* test_delta4().
*/
uint16 num_deltas;
const uint64 *restrict deltas_zigzag =
simple8brle_decompress_all_uint64(deltas_compressed, &num_deltas);

Expand All @@ -35,20 +41,30 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed)
nulls = simple8brle_bitmap_decompress(nulls_compressed);
}

const int n_total = has_nulls ? nulls.num_elements : num_deltas;
const int n_total_padded =
/*
* Pad the number of elements to multiple of 64 bytes if needed, so that we
* can work in 64-byte blocks.
*/
const uint16 n_total = has_nulls ? nulls.num_elements : num_deltas;
const uint16 n_total_padded =
((n_total * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
const int n_notnull = num_deltas;
const int n_notnull_padded =
const uint16 n_notnull = num_deltas;
const uint16 n_notnull_padded =
((n_notnull * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
Assert(n_total_padded >= n_total);
Assert(n_notnull_padded >= n_notnull);
Assert(n_total >= n_notnull);
Assert(n_total <= GLOBAL_MAX_ROWS_PER_COMPRESSION);

const int validity_bitmap_bytes = sizeof(uint64) * ((n_total + 64 - 1) / 64);
uint64 *restrict validity_bitmap = palloc(validity_bitmap_bytes);
ELEMENT_TYPE *restrict decompressed_values = palloc(sizeof(ELEMENT_TYPE) * n_total_padded);
uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes);

/*
* We need additional padding at the end of buffer, because the code that
* converts the elements to postres Datum always reads in 8 bytes.
*/
const int buffer_bytes = n_total_padded * sizeof(ELEMENT_TYPE) + 8;
ELEMENT_TYPE *restrict decompressed_values = MemoryContextAlloc(dest_mctx, buffer_bytes);

/* Now fill the data w/o nulls. */
ELEMENT_TYPE current_delta = 0;
Expand All @@ -62,9 +78,9 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed)
*/
#define INNER_LOOP_SIZE 8
Assert(n_notnull_padded % INNER_LOOP_SIZE == 0);
for (int outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE)
for (uint16 outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE)
{
for (int inner = 0; inner < INNER_LOOP_SIZE; inner++)
for (uint16 inner = 0; inner < INNER_LOOP_SIZE; inner++)
{
current_delta += zig_zag_decode(deltas_zigzag[outer + inner]);
current_element += current_delta;
Expand Down Expand Up @@ -107,19 +123,27 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed)
else
{
/*
* The validity bitmap is padded at the end to a multiple of 64 bytes.
* Fill the padding with zeros, because the elements corresponding to
* the padding bits are not valid.
* The validity bitmap size is a multiple of 64 bits. Fill the tail bits
* with zeros, because the corresponding elements are not valid.
*/
for (int i = n_total; i < validity_bitmap_bytes * 8; i++)
if (n_total % 64)
{
arrow_set_row_validity(validity_bitmap, i, false);
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;

#ifdef USE_ASSERT_CHECKING
for (int i = 0; i < 64; i++)
{
Assert(arrow_row_is_valid(validity_bitmap, (n_total / 64) * 64 + i) ==
(i < n_total % 64));
}
#endif
}
}

/* Return the result. */
ArrowArray *result = palloc0(sizeof(ArrowArray));
const void **buffers = palloc(sizeof(void *) * 2);
ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2);
const void **buffers = (const void **) &result[1];
buffers[0] = validity_bitmap;
buffers[1] = decompressed_values;
result->n_buffers = 2;
Expand Down
Loading

0 comments on commit eaa1206

Please sign in to comment.