From eaa1206b7f01672b95ea45486bcb7602499ffd25 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 6 Jul 2023 18:14:08 +0200 Subject: [PATCH] Improvements for bulk decompression * Restore default batch context size to fix a performance regression on sorted batch merge plans. * Support reverse direction. * Improve gorilla decompression by computing prefix sums of tag bitmaps during decompression. --- .github/workflows/libfuzzer.yaml | 3 + src/adts/bit_array_impl.h | 27 +- tsl/src/compression/arrow_c_data_interface.h | 18 +- tsl/src/compression/compression.c | 11 +- tsl/src/compression/compression.h | 10 +- tsl/src/compression/decompress_test_impl.c | 22 +- tsl/src/compression/deltadelta.c | 8 +- tsl/src/compression/deltadelta.h | 3 +- tsl/src/compression/deltadelta_impl.c | 60 +++-- tsl/src/compression/gorilla.c | 14 +- tsl/src/compression/gorilla.h | 2 +- tsl/src/compression/gorilla_impl.c | 118 ++++---- tsl/src/compression/simple8b_rle_bitmap.h | 251 +++++++++++++++--- .../compression/simple8b_rle_decompress_all.h | 68 +++-- tsl/src/nodes/decompress_chunk/exec.c | 189 +++++-------- tsl/src/nodes/decompress_chunk/exec.h | 11 +- tsl/test/expected/compression_algos.out | 6 +- .../transparent_decompress_chunk-12.out | 10 +- .../transparent_decompress_chunk-13.out | 10 +- .../transparent_decompress_chunk-14.out | 10 +- .../transparent_decompress_chunk-15.out | 10 +- tsl/test/src/test_compression.c | 47 +++- 22 files changed, 549 insertions(+), 359 deletions(-) diff --git a/.github/workflows/libfuzzer.yaml b/.github/workflows/libfuzzer.yaml index 55c01617ea8..883d7111371 100644 --- a/.github/workflows/libfuzzer.yaml +++ b/.github/workflows/libfuzzer.yaml @@ -1,5 +1,8 @@ name: Libfuzzer "on": + schedule: + # run daily 1:00 on main branch + - cron: '0 1 * * *' push: branches: - main diff --git a/src/adts/bit_array_impl.h b/src/adts/bit_array_impl.h index 5bea913ed90..4d5846d63c0 100644 --- a/src/adts/bit_array_impl.h +++ b/src/adts/bit_array_impl.h @@ -112,22 +112,13 @@ bit_array_recv(const StringInfo buffer) .num_elements = num_elements, .max_elements = num_elements, .ctx = CurrentMemoryContext, - /* Add one-element padding so that we can relax the checks for incorrect data. */ - .data = palloc((num_elements + 1) * sizeof(uint64)), + .data = palloc(num_elements * sizeof(uint64)), }, }; for (i = 0; i < num_elements; i++) array.buckets.data[i] = pq_getmsgint64(buffer); - /* Zero out the padding for more predictable behavior under fuzzing. */ - array.buckets.data[num_elements] = 0; - if (num_elements > 0) - { - CheckCompressedData(bits_used_in_last_bucket > 0); - array.buckets.data[num_elements - 1] &= -1ULL >> (64 - bits_used_in_last_bucket); - } - return array; } @@ -249,15 +240,11 @@ bit_array_iter_next(BitArrayIterator *iter, uint8 num_bits) uint8 num_bits_from_next_bucket; uint64 value = 0; uint64 value_from_next_bucket; - Assert(num_bits <= 64); + CheckCompressedData(num_bits <= 64); if (num_bits == 0) return 0; CheckCompressedData(iter->current_bucket < iter->array->buckets.num_elements); - if (iter->current_bucket == iter->array->buckets.num_elements - 1) - { - Assert(iter->bits_used_in_current_bucket <= iter->array->bits_used_in_last_bucket); - } bits_remaining_in_current_bucket = 64 - iter->bits_used_in_current_bucket; if (bits_remaining_in_current_bucket >= num_bits) @@ -267,11 +254,6 @@ bit_array_iter_next(BitArrayIterator *iter, uint8 num_bits) value &= bit_array_low_bits_mask(num_bits); iter->bits_used_in_current_bucket += num_bits; - if (iter->current_bucket == iter->array->buckets.num_elements - 1) - { - CheckCompressedData(iter->bits_used_in_current_bucket <= - iter->array->bits_used_in_last_bucket); - } return value; } @@ -293,11 +275,6 @@ bit_array_iter_next(BitArrayIterator *iter, uint8 num_bits) iter->current_bucket += 1; iter->bits_used_in_current_bucket = num_bits_from_next_bucket; - if (iter->current_bucket == iter->array->buckets.num_elements - 1) - { - CheckCompressedData(iter->bits_used_in_current_bucket <= - iter->array->bits_used_in_last_bucket); - } return value; } diff --git a/tsl/src/compression/arrow_c_data_interface.h b/tsl/src/compression/arrow_c_data_interface.h index dadcd3c4946..9c073e9ba61 100644 --- a/tsl/src/compression/arrow_c_data_interface.h +++ b/tsl/src/compression/arrow_c_data_interface.h @@ -109,7 +109,7 @@ typedef struct ArrowArray /* * We don't use the schema but have to define it for completeness because we're - * defining ARROW_C_DATA_INTERFACE macro. + * defining the ARROW_C_DATA_INTERFACE macro. */ struct ArrowSchema { @@ -135,22 +135,22 @@ struct ArrowSchema #endif static pg_attribute_always_inline bool -arrow_row_is_valid(const uint64 *bitmap, int row_number) +arrow_row_is_valid(const uint64 *bitmap, size_t row_number) { - const int qword_index = row_number / 64; - const int bit_index = row_number % 64; + const size_t qword_index = row_number / 64; + const size_t bit_index = row_number % 64; const uint64 mask = 1ull << bit_index; - return (bitmap[qword_index] & mask) ? 1 : 0; + return bitmap[qword_index] & mask; } static pg_attribute_always_inline void -arrow_set_row_validity(uint64 *bitmap, int row_number, bool value) +arrow_set_row_validity(uint64 *bitmap, size_t row_number, bool value) { - const int qword_index = row_number / 64; - const int bit_index = row_number % 64; + const size_t qword_index = row_number / 64; + const size_t bit_index = row_number % 64; const uint64 mask = 1ull << bit_index; - bitmap[qword_index] = (bitmap[qword_index] & ~mask) | (((uint64) !!value) << bit_index); + bitmap[qword_index] = (bitmap[qword_index] & ~mask) | ((-(uint64) value) & mask); Assert(arrow_row_is_valid(bitmap, row_number) == value); } diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index ffd00df5bd0..ba43c2cdd91 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -90,18 +90,13 @@ DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorith return definitions[algorithm].iterator_init_forward; } -ArrowArray * -tsl_try_decompress_all(CompressionAlgorithms algorithm, Datum compressed_data, Oid element_type) +DecompressAllFunction +tsl_get_decompress_all_function(CompressionAlgorithms algorithm) { if (algorithm >= _END_COMPRESSION_ALGORITHMS) elog(ERROR, "invalid compression algorithm %d", algorithm); - if (definitions[algorithm].decompress_all) - { - return definitions[algorithm].decompress_all(compressed_data, element_type); - } - - return NULL; + return definitions[algorithm].decompress_all; } static Tuplesortstate *compress_chunk_sort_relation(Relation in_rel, int n_keys, diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index f43a510dfdb..fbfd6e49b1d 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -164,11 +164,14 @@ typedef enum TOAST_STORAGE_EXTENDED } CompressionStorage; +typedef ArrowArray *(*DecompressAllFunction)(Datum compressed, Oid element_type, + MemoryContext dest_mctx); + typedef struct CompressionAlgorithmDefinition { DecompressionIterator *(*iterator_init_forward)(Datum, Oid element_type); DecompressionIterator *(*iterator_init_reverse)(Datum, Oid element_type); - ArrowArray *(*decompress_all)(Datum, Oid element_type); + DecompressAllFunction decompress_all; void (*compressed_data_send)(CompressedDataHeader *, StringInfo); Datum (*compressed_data_recv)(StringInfo); @@ -313,8 +316,7 @@ extern void decompress_chunk(Oid in_table, Oid out_table); extern DecompressionIterator *(*tsl_get_decompression_iterator_init( CompressionAlgorithms algorithm, bool reverse))(Datum, Oid element_type); -extern ArrowArray *tsl_try_decompress_all(CompressionAlgorithms algorithm, Datum compressed_data, - Oid element_type); +extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithms algorithm); typedef struct Chunk Chunk; typedef struct ChunkInsertState ChunkInsertState; @@ -373,7 +375,7 @@ extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel); #endif #define CheckCompressedData(X) \ - if (!(X)) \ + if (unlikely(!(X))) \ ereport(ERROR, CORRUPT_DATA_MESSAGE) inline static void * diff --git a/tsl/src/compression/decompress_test_impl.c b/tsl/src/compression/decompress_test_impl.c index 42c0c3433fd..69897d45b99 100644 --- a/tsl/src/compression/decompress_test_impl.c +++ b/tsl/src/compression/decompress_test_impl.c @@ -42,14 +42,25 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks) * For routine fuzzing, we only run bulk decompression to make it faster * and the coverage space smaller. */ - tsl_try_decompress_all(algo, compressed_data, PGTYPE); + DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo); + decompress_all(compressed_data, PGTYPE, CurrentMemoryContext); return 0; } /* - * For normal testing, as opposed to the fuzzing code path above, run - * row-by-row decompression first, so that it's not masked by the more - * strict correctness checks of bulk decompression. + * Test bulk decompression. This might hide some errors in the row-by-row + * decompression, but testing both is significantly more complicated, and + * the row-by-row is old and stable. + */ + ArrowArray *arrow = NULL; + DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo); + if (decompress_all) + { + arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext); + } + + /* + * Test row-by-row decompression. */ DecompressionIterator *iter = definitions[algo].iterator_init_forward(compressed_data, PGTYPE); DecompressResult results[GLOBAL_MAX_ROWS_PER_COMPRESSION]; @@ -64,9 +75,6 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks) results[n++] = r; } - /* Test bulk decompression. */ - ArrowArray *arrow = tsl_try_decompress_all(algo, compressed_data, PGTYPE); - /* Check that both ways of decompression match. */ if (arrow) { diff --git a/tsl/src/compression/deltadelta.c b/tsl/src/compression/deltadelta.c index fb0dfb67cc0..e7806a60d0f 100644 --- a/tsl/src/compression/deltadelta.c +++ b/tsl/src/compression/deltadelta.c @@ -588,19 +588,19 @@ delta_delta_decompression_iterator_try_next_forward(DecompressionIterator *iter) #undef ELEMENT_TYPE ArrowArray * -delta_delta_decompress_all(Datum compressed_data, Oid element_type) +delta_delta_decompress_all(Datum compressed_data, Oid element_type, MemoryContext dest_mctx) { switch (element_type) { case INT8OID: case TIMESTAMPOID: case TIMESTAMPTZOID: - return delta_delta_decompress_all_uint64(compressed_data); + return delta_delta_decompress_all_uint64(compressed_data, dest_mctx); case INT4OID: case DATEOID: - return delta_delta_decompress_all_uint32(compressed_data); + return delta_delta_decompress_all_uint32(compressed_data, dest_mctx); case INT2OID: - return delta_delta_decompress_all_uint16(compressed_data); + return delta_delta_decompress_all_uint16(compressed_data, dest_mctx); default: elog(ERROR, "type '%s' is not supported for deltadelta decompression", diff --git a/tsl/src/compression/deltadelta.h b/tsl/src/compression/deltadelta.h index dcc7c474073..850d02ed6bc 100644 --- a/tsl/src/compression/deltadelta.h +++ b/tsl/src/compression/deltadelta.h @@ -43,7 +43,8 @@ delta_delta_decompression_iterator_from_datum_reverse(Datum deltadelta_compresse extern DecompressResult delta_delta_decompression_iterator_try_next_forward(DecompressionIterator *iter); -extern ArrowArray *delta_delta_decompress_all(Datum compressed_data, Oid element_type); +extern ArrowArray *delta_delta_decompress_all(Datum compressed_data, Oid element_type, + MemoryContext dest_mctx); extern DecompressResult delta_delta_decompression_iterator_try_next_reverse(DecompressionIterator *iter); diff --git a/tsl/src/compression/deltadelta_impl.c b/tsl/src/compression/deltadelta_impl.c index f07989dbe72..48562aaec23 100644 --- a/tsl/src/compression/deltadelta_impl.c +++ b/tsl/src/compression/deltadelta_impl.c @@ -13,7 +13,7 @@ #define FUNCTION_NAME(X, Y) FUNCTION_NAME_HELPER(X, Y) static ArrowArray * -FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed) +FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, MemoryContext dest_mctx) { StringInfoData si = { .data = DatumGetPointer(compressed), .len = VARSIZE(compressed) }; DeltaDeltaCompressed *header = consumeCompressedData(&si, sizeof(DeltaDeltaCompressed)); @@ -23,8 +23,14 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed) Assert(header->has_nulls == 0 || header->has_nulls == 1); - /* Can't use element type here because of zig-zag encoding. */ - int16 num_deltas; + /* + * Can't use element type here because of zig-zag encoding. The deltas are + * computed in uint64, so we can get a delta that is actually larger than + * the element type. We can't just truncate the delta either, because it + * will lead to broken decompression results. The test case is in + * test_delta4(). + */ + uint16 num_deltas; const uint64 *restrict deltas_zigzag = simple8brle_decompress_all_uint64(deltas_compressed, &num_deltas); @@ -35,11 +41,15 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed) nulls = simple8brle_bitmap_decompress(nulls_compressed); } - const int n_total = has_nulls ? nulls.num_elements : num_deltas; - const int n_total_padded = + /* + * Pad the number of elements to multiple of 64 bytes if needed, so that we + * can work in 64-byte blocks. + */ + const uint16 n_total = has_nulls ? nulls.num_elements : num_deltas; + const uint16 n_total_padded = ((n_total * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE); - const int n_notnull = num_deltas; - const int n_notnull_padded = + const uint16 n_notnull = num_deltas; + const uint16 n_notnull_padded = ((n_notnull * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE); Assert(n_total_padded >= n_total); Assert(n_notnull_padded >= n_notnull); @@ -47,8 +57,14 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed) Assert(n_total <= GLOBAL_MAX_ROWS_PER_COMPRESSION); const int validity_bitmap_bytes = sizeof(uint64) * ((n_total + 64 - 1) / 64); - uint64 *restrict validity_bitmap = palloc(validity_bitmap_bytes); - ELEMENT_TYPE *restrict decompressed_values = palloc(sizeof(ELEMENT_TYPE) * n_total_padded); + uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes); + + /* + * We need additional padding at the end of buffer, because the code that + * converts the elements to postres Datum always reads in 8 bytes. + */ + const int buffer_bytes = n_total_padded * sizeof(ELEMENT_TYPE) + 8; + ELEMENT_TYPE *restrict decompressed_values = MemoryContextAlloc(dest_mctx, buffer_bytes); /* Now fill the data w/o nulls. */ ELEMENT_TYPE current_delta = 0; @@ -62,9 +78,9 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed) */ #define INNER_LOOP_SIZE 8 Assert(n_notnull_padded % INNER_LOOP_SIZE == 0); - for (int outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE) + for (uint16 outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE) { - for (int inner = 0; inner < INNER_LOOP_SIZE; inner++) + for (uint16 inner = 0; inner < INNER_LOOP_SIZE; inner++) { current_delta += zig_zag_decode(deltas_zigzag[outer + inner]); current_element += current_delta; @@ -107,19 +123,27 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed) else { /* - * The validity bitmap is padded at the end to a multiple of 64 bytes. - * Fill the padding with zeros, because the elements corresponding to - * the padding bits are not valid. + * The validity bitmap size is a multiple of 64 bits. Fill the tail bits + * with zeros, because the corresponding elements are not valid. */ - for (int i = n_total; i < validity_bitmap_bytes * 8; i++) + if (n_total % 64) { - arrow_set_row_validity(validity_bitmap, i, false); + const uint64 tail_mask = -1ULL >> (64 - n_total % 64); + validity_bitmap[n_total / 64] &= tail_mask; + +#ifdef USE_ASSERT_CHECKING + for (int i = 0; i < 64; i++) + { + Assert(arrow_row_is_valid(validity_bitmap, (n_total / 64) * 64 + i) == + (i < n_total % 64)); + } +#endif } } /* Return the result. */ - ArrowArray *result = palloc0(sizeof(ArrowArray)); - const void **buffers = palloc(sizeof(void *) * 2); + ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2); + const void **buffers = (const void **) &result[1]; buffers[0] = validity_bitmap; buffers[1] = decompressed_values; result->n_buffers = 2; diff --git a/tsl/src/compression/gorilla.c b/tsl/src/compression/gorilla.c index 0b32c3be0aa..4389232c543 100644 --- a/tsl/src/compression/gorilla.c +++ b/tsl/src/compression/gorilla.c @@ -826,13 +826,15 @@ gorilla_decompression_iterator_try_next_reverse(DecompressionIterator *iter_base iter_base->element_type); } -#define MAX_NUM_LEADING_ZEROS_PADDED (((GLOBAL_MAX_ROWS_PER_COMPRESSION + 63) / 64) * 64) +#define MAX_NUM_LEADING_ZEROS_PADDED_N64 (((GLOBAL_MAX_ROWS_PER_COMPRESSION + 63) / 64) * 64) + +int16 unpack_leading_zeros_array(BitArray *bitarray, uint8 *restrict dest); /* * Decompress packed 6bit values in lanes that contain a round number of both * packed and unpacked bytes -- 4 6-bit values are packed into 3 8-bit values. */ -pg_attribute_always_inline static int16 +int16 unpack_leading_zeros_array(BitArray *bitarray, uint8 *restrict dest) { #define LANE_INPUTS 3 @@ -851,7 +853,7 @@ unpack_leading_zeros_array(BitArray *bitarray, uint8 *restrict dest) const int16 n_bytes_packed = bitarray->buckets.num_elements * sizeof(uint64); const int16 n_lanes = (n_bytes_packed + LANE_INPUTS - 1) / LANE_INPUTS; const int16 n_outputs = n_lanes * LANE_OUTPUTS; - CheckCompressedData(n_outputs <= MAX_NUM_LEADING_ZEROS_PADDED); + CheckCompressedData(n_outputs <= MAX_NUM_LEADING_ZEROS_PADDED_N64); for (int lane = 0; lane < n_lanes; lane++) { @@ -894,7 +896,7 @@ unpack_leading_zeros_array(BitArray *bitarray, uint8 *restrict dest) #undef ELEMENT_TYPE ArrowArray * -gorilla_decompress_all(Datum datum, Oid element_type) +gorilla_decompress_all(Datum datum, Oid element_type, MemoryContext dest_mctx) { CompressedGorillaData gorilla_data; compressed_gorilla_data_init_from_datum(&gorilla_data, datum); @@ -902,9 +904,9 @@ gorilla_decompress_all(Datum datum, Oid element_type) switch (element_type) { case FLOAT8OID: - return gorilla_decompress_all_uint64(&gorilla_data); + return gorilla_decompress_all_uint64(&gorilla_data, dest_mctx); case FLOAT4OID: - return gorilla_decompress_all_uint32(&gorilla_data); + return gorilla_decompress_all_uint32(&gorilla_data, dest_mctx); default: elog(ERROR, "type '%s' is not supported for gorilla decompression", diff --git a/tsl/src/compression/gorilla.h b/tsl/src/compression/gorilla.h index 2458aa04fd2..6d3fbe81a22 100644 --- a/tsl/src/compression/gorilla.h +++ b/tsl/src/compression/gorilla.h @@ -87,7 +87,7 @@ extern DecompressionIterator * gorilla_decompression_iterator_from_datum_reverse(Datum gorilla_compressed, Oid element_type); extern DecompressResult gorilla_decompression_iterator_try_next_reverse(DecompressionIterator *iter); -extern ArrowArray *gorilla_decompress_all(Datum datum, Oid element_type); +extern ArrowArray *gorilla_decompress_all(Datum datum, Oid element_type, MemoryContext dest_mctx); extern void gorilla_compressed_send(CompressedDataHeader *header, StringInfo buffer); extern Datum gorilla_compressed_recv(StringInfo buf); diff --git a/tsl/src/compression/gorilla_impl.c b/tsl/src/compression/gorilla_impl.c index f21e3fc7ac4..41f98a31345 100644 --- a/tsl/src/compression/gorilla_impl.c +++ b/tsl/src/compression/gorilla_impl.c @@ -13,35 +13,49 @@ #define FUNCTION_NAME(X, Y) FUNCTION_NAME_HELPER(X, Y) static ArrowArray * -FUNCTION_NAME(gorilla_decompress_all, ELEMENT_TYPE)(CompressedGorillaData *gorilla_data) +FUNCTION_NAME(gorilla_decompress_all, ELEMENT_TYPE)(CompressedGorillaData *gorilla_data, + MemoryContext dest_mctx) { const bool has_nulls = gorilla_data->nulls != NULL; - const int n_total = + const uint16 n_total = has_nulls ? gorilla_data->nulls->num_elements : gorilla_data->tag0s->num_elements; CheckCompressedData(n_total <= GLOBAL_MAX_ROWS_PER_COMPRESSION); - const int n_total_padded = + /* + * Pad the number of elements to multiple of 64 bytes if needed, so that we + * can work in 64-byte blocks. + */ + const uint16 n_total_padded = ((n_total * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE); Assert(n_total_padded >= n_total); - const int n_notnull = gorilla_data->tag0s->num_elements; + /* + * We need additional padding at the end of buffer, because the code that + * converts the elements to postres Datum always reads in 8 bytes. + */ + const int buffer_bytes = n_total_padded * sizeof(ELEMENT_TYPE) + 8; + ELEMENT_TYPE *restrict decompressed_values = MemoryContextAlloc(dest_mctx, buffer_bytes); + + const uint16 n_notnull = gorilla_data->tag0s->num_elements; CheckCompressedData(n_total >= n_notnull); /* Unpack the basic compressed data parts. */ - Simple8bRleBitmap tag0s = simple8brle_bitmap_decompress(gorilla_data->tag0s); - Simple8bRleBitmap tag1s = simple8brle_bitmap_decompress(gorilla_data->tag1s); + Simple8bRleBitmap tag0s = simple8brle_bitmap_prefixsums(gorilla_data->tag0s); + Simple8bRleBitmap tag1s = simple8brle_bitmap_prefixsums(gorilla_data->tag1s); BitArray leading_zeros_bitarray = gorilla_data->leading_zeros; BitArrayIterator leading_zeros_iterator; bit_array_iterator_init(&leading_zeros_iterator, &leading_zeros_bitarray); - uint8 all_leading_zeros[MAX_NUM_LEADING_ZEROS_PADDED]; - const int16 leading_zeros_padded = + uint8 all_leading_zeros[MAX_NUM_LEADING_ZEROS_PADDED_N64]; + const uint16 leading_zeros_padded = unpack_leading_zeros_array(&gorilla_data->leading_zeros, all_leading_zeros); - int16 num_bit_widths; - const uint8 *restrict bit_widths = - simple8brle_decompress_all_uint8(gorilla_data->num_bits_used_per_xor, &num_bit_widths); + uint8 bit_widths[MAX_NUM_LEADING_ZEROS_PADDED_N64]; + const uint16 num_bit_widths = + simple8brle_decompress_all_buf_uint8(gorilla_data->num_bits_used_per_xor, + bit_widths, + MAX_NUM_LEADING_ZEROS_PADDED_N64); BitArray xors_bitarray = gorilla_data->xors; BitArrayIterator xors_iterator; @@ -62,12 +76,12 @@ FUNCTION_NAME(gorilla_decompress_all, ELEMENT_TYPE)(CompressedGorillaData *goril * 1b) Sanity check: the first tag1 must be 1, so that we initialize the bit * widths. */ - CheckCompressedData(simple8brle_bitmap_get_at(&tag1s, 0) == 1); + CheckCompressedData(simple8brle_bitmap_prefix_sum(&tag1s, 0) == 1); /* * 1c) Sanity check: can't have more different elements than notnull elements. */ - const int n_different = tag1s.num_elements; + const uint16 n_different = tag1s.num_elements; CheckCompressedData(n_different <= n_notnull); /* @@ -77,42 +91,21 @@ FUNCTION_NAME(gorilla_decompress_all, ELEMENT_TYPE)(CompressedGorillaData *goril * having a fast path for stretches of tag1 == 0. */ ELEMENT_TYPE prev = 0; - int next_bit_widths_index = 0; - int16 next_leading_zeros_index = 0; - uint8 current_leading_zeros = 0; - uint8 current_xor_bits = 0; - ELEMENT_TYPE *restrict decompressed_values = palloc(sizeof(ELEMENT_TYPE) * n_total_padded); - for (int i = 0; i < n_different; i++) + for (uint16 i = 0; i < n_different; i++) { - if (simple8brle_bitmap_get_at(&tag1s, i) != 0) - { - /* Load new bit widths. */ - Assert(next_bit_widths_index < num_bit_widths); - current_xor_bits = bit_widths[next_bit_widths_index++]; - - Assert(next_leading_zeros_index < MAX_NUM_LEADING_ZEROS_PADDED); - current_leading_zeros = all_leading_zeros[next_leading_zeros_index++]; - - /* - * More than 64 significant bits don't make sense. Exactly 64 we get for - * the first encoded number. - */ - CheckCompressedData(current_leading_zeros + current_xor_bits <= 64); - - /* - * Theoretically, leading zeros + xor bits == 0 would mean that the - * number is the same as the previous one, and it should have been - * encoded as tag0s == 0. Howewer, we can encounter it in the corrupt - * data. Shifting by 64 bytes left would be undefined behavior. - */ - CheckCompressedData(current_leading_zeros + current_xor_bits > 0); - } + const uint8 current_xor_bits = bit_widths[simple8brle_bitmap_prefix_sum(&tag1s, i) - 1]; + const uint8 current_leading_zeros = + all_leading_zeros[simple8brle_bitmap_prefix_sum(&tag1s, i) - 1]; + + /* + * Truncate the shift here not to cause UB on the corrupt data. + */ + const uint8 shift = (64 - (current_xor_bits + current_leading_zeros)) & 63; const uint64 current_xor = bit_array_iter_next(&xors_iterator, current_xor_bits); - prev ^= current_xor << (64 - (current_leading_zeros + current_xor_bits)); + prev ^= current_xor << shift; decompressed_values[i] = prev; } - Assert(next_bit_widths_index == num_bit_widths); /* * 2) Fill out the stretches of repeated elements, encoded with tag0 = 0. @@ -127,31 +120,22 @@ FUNCTION_NAME(gorilla_decompress_all, ELEMENT_TYPE)(CompressedGorillaData *goril * 2b) Sanity check: tag0s[0] == 1 -- the first element of the sequence is * always "different from the previous one". */ - CheckCompressedData(simple8brle_bitmap_get_at(&tag0s, 0) == 1); + CheckCompressedData(simple8brle_bitmap_prefix_sum(&tag0s, 0) == 1); /* * 2b) Fill the repeated elements. */ - int current_element = n_different - 1; for (int i = n_notnull - 1; i >= 0; i--) { - Assert(current_element >= 0); - Assert(current_element <= i); - decompressed_values[i] = decompressed_values[current_element]; - - if (simple8brle_bitmap_get_at(&tag0s, i)) - { - current_element--; - } + decompressed_values[i] = decompressed_values[simple8brle_bitmap_prefix_sum(&tag0s, i) - 1]; } - Assert(current_element == -1); /* * We have unpacked the non-null data. Now reshuffle it to account for nulls, * and fill the validity bitmap. */ const int validity_bitmap_bytes = sizeof(uint64) * ((n_total + 64 - 1) / 64); - uint64 *restrict validity_bitmap = palloc(validity_bitmap_bytes); + uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes); /* * For starters, set the validity bitmap to all ones. We probably have less @@ -190,19 +174,27 @@ FUNCTION_NAME(gorilla_decompress_all, ELEMENT_TYPE)(CompressedGorillaData *goril else { /* - * The validity bitmap is padded at the end to a multiple of 64 bytes. - * Fill the padding with zeros, because the elements corresponding to - * the padding bits are not valid. + * The validity bitmap size is a multiple of 64 bits. Fill the tail bits + * with zeros, because the corresponding elements are not valid. */ - for (int i = n_total; i < validity_bitmap_bytes * 8; i++) + if (n_total % 64) { - arrow_set_row_validity(validity_bitmap, i, false); + const uint64 tail_mask = -1ULL >> (64 - n_total % 64); + validity_bitmap[n_total / 64] &= tail_mask; + +#ifdef USE_ASSERT_CHECKING + for (int i = 0; i < 64; i++) + { + Assert(arrow_row_is_valid(validity_bitmap, (n_total / 64) * 64 + i) == + (i < n_total % 64)); + } +#endif } } /* Return the result. */ - ArrowArray *result = palloc0(sizeof(ArrowArray)); - const void **buffers = palloc(sizeof(void *) * 2); + ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2); + const void **buffers = (const void **) &result[1]; buffers[0] = validity_bitmap; buffers[1] = decompressed_values; result->n_buffers = 2; diff --git a/tsl/src/compression/simple8b_rle_bitmap.h b/tsl/src/compression/simple8b_rle_bitmap.h index f8d180fb557..629f0f422ea 100644 --- a/tsl/src/compression/simple8b_rle_bitmap.h +++ b/tsl/src/compression/simple8b_rle_bitmap.h @@ -15,20 +15,27 @@ typedef struct Simple8bRleBitmap { - char *bitmap_bools_; - int16 num_elements; - int16 num_ones; + /* Either the bools or prefix sums, depending on the decompression method. */ + void *data; + + uint16 num_elements; + uint16 num_ones; } Simple8bRleBitmap; pg_attribute_always_inline static bool -simple8brle_bitmap_get_at(Simple8bRleBitmap *bitmap, int i) +simple8brle_bitmap_get_at(Simple8bRleBitmap *bitmap, uint16 i) { - Assert(i >= 0); - /* We have some padding on the right but we shouldn't overrun it. */ Assert(i < ((bitmap->num_elements + 63) / 64 + 1) * 64); - return bitmap->bitmap_bools_[i]; + return ((bool *restrict) bitmap->data)[i]; +} + +pg_attribute_always_inline static uint16 +simple8brle_bitmap_prefix_sum(Simple8bRleBitmap *bitmap, uint16 i) +{ + Assert(i < ((bitmap->num_elements + 63) / 64 + 1) * 64); + return ((uint16 *restrict) bitmap->data)[i]; } pg_attribute_always_inline static uint16 @@ -37,19 +44,23 @@ simple8brle_bitmap_num_ones(Simple8bRleBitmap *bitmap) return bitmap->num_ones; } +/* + * Calculate prefix sum of bits instead of bitmap itself, because it's more + * useful for gorilla decompression. Can be unused by other users of this + * header. + */ +static Simple8bRleBitmap simple8brle_bitmap_prefixsums(Simple8bRleSerialized *compressed) + pg_attribute_unused(); + static Simple8bRleBitmap -simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) +simple8brle_bitmap_prefixsums(Simple8bRleSerialized *compressed) { - Simple8bRleBitmap result; - result.num_elements = compressed->num_elements; - CheckCompressedData(compressed->num_elements <= GLOBAL_MAX_ROWS_PER_COMPRESSION); CheckCompressedData(compressed->num_blocks <= GLOBAL_MAX_ROWS_PER_COMPRESSION); - const int16 num_elements = compressed->num_elements; - int16 num_ones = 0; + const uint16 num_elements = compressed->num_elements; - const int16 num_selector_slots = + const uint16 num_selector_slots = simple8brle_num_selector_slots_for_num_blocks(compressed->num_blocks); const uint64 *compressed_data = compressed->slots + num_selector_slots; @@ -58,15 +69,17 @@ simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) * decompression loop and the get() function. Note that for get() we need at * least one byte of padding, hence the next multiple. */ - const int16 num_elements_padded = ((num_elements + 63) / 64 + 1) * 64; - const int16 num_blocks = compressed->num_blocks; + const uint16 num_elements_padded = ((num_elements + 63) / 64 + 1) * 64; + const uint16 num_blocks = compressed->num_blocks; + + uint16 *restrict prefix_sums = palloc(sizeof(uint16) * num_elements_padded); - char *restrict bitmap_bools_ = palloc(num_elements_padded); - int16 decompressed_index = 0; - for (int16 block_index = 0; block_index < num_blocks; block_index++) + uint16 current_prefix_sum = 0; + uint16 decompressed_index = 0; + for (uint16 block_index = 0; block_index < num_blocks; block_index++) { - const int selector_slot = block_index / SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; - const int selector_pos_in_slot = block_index % SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; + const uint16 selector_slot = block_index / SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; + const uint16 selector_pos_in_slot = block_index % SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; const uint64 slot_value = compressed->slots[selector_slot]; const uint8 selector_shift = selector_pos_in_slot * SIMPLE8B_BITS_PER_SELECTOR; const uint64 selector_mask = 0xFULL << selector_shift; @@ -80,38 +93,31 @@ simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) /* * RLE block. */ - const int32 n_block_values = simple8brle_rledata_repeatcount(block_data); + const size_t n_block_values = simple8brle_rledata_repeatcount(block_data); CheckCompressedData(n_block_values <= GLOBAL_MAX_ROWS_PER_COMPRESSION); - const uint8 repeated_value = simple8brle_rledata_value(block_data); - CheckCompressedData(repeated_value <= 1); + const bool repeated_value = simple8brle_rledata_value(block_data); CheckCompressedData(decompressed_index + n_block_values <= num_elements); - /* - * If we see an RLE-encoded block in bitmap, this means we had more - * than 64 consecutive bits, otherwise it would be inefficient to - * use RLE. Work in batches of 64 values and then process the tail - * separately. This affects performance on some synthetic data sets. - */ - const int16 full_qword_values = (n_block_values / 64) * 64; - for (int16 outer = 0; outer < full_qword_values; outer += 64) + if (repeated_value) { - for (int16 inner = 0; inner < 64; inner++) + for (uint16 i = 0; i < n_block_values; i++) { - bitmap_bools_[decompressed_index + outer + inner] = repeated_value; + prefix_sums[decompressed_index + i] = current_prefix_sum + i + 1; } + current_prefix_sum += n_block_values; } - - for (int16 i = 0; i < n_block_values - full_qword_values; i++) + else { - bitmap_bools_[decompressed_index + full_qword_values + i] = repeated_value; + for (uint16 i = 0; i < n_block_values; i++) + { + prefix_sums[decompressed_index + i] = current_prefix_sum; + } } decompressed_index += n_block_values; Assert(decompressed_index <= num_elements); - - num_ones += repeated_value * n_block_values; } else { @@ -126,15 +132,171 @@ simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) Assert(SIMPLE8B_BIT_LENGTH[selector_value] == 1); Assert(SIMPLE8B_NUM_ELEMENTS[selector_value] == 64); + /* + * We should require at least one element from the block. Previous + * blocks might have had incorrect lengths, so this is not an + * assertion. + */ + CheckCompressedData(decompressed_index < num_elements); + /* Have to zero out the unused bits, so that the popcnt works properly. */ const int elements_this_block = Min(64, num_elements - decompressed_index); Assert(elements_this_block <= 64); + Assert(elements_this_block > 0); + block_data &= (-1ULL) >> (64 - elements_this_block); + + /* + * The number of block elements should fit within padding. Previous + * blocks might have had incorrect lengths, so this is not an + * assertion. + */ + CheckCompressedData(decompressed_index + 64 < num_elements_padded); + +#ifdef HAVE__BUILTIN_POPCOUNT + for (uint16 i = 0; i < 64; i++) + { + const uint16 word_prefix_sum = + __builtin_popcountll(block_data & (-1ULL >> (63 - i))); + prefix_sums[decompressed_index + i] = current_prefix_sum + word_prefix_sum; + } + current_prefix_sum += __builtin_popcountll(block_data); +#else + /* + * Unfortunatly, we have to have this fallback for Windows. + */ + for (uint16 i = 0; i < 64; i++) + { + const bool this_bit = (block_data >> i) & 1; + current_prefix_sum += this_bit; + prefix_sums[decompressed_index + i] = current_prefix_sum; + } +#endif + decompressed_index += 64; + } + } + + /* + * We might have unpacked more because we work in full blocks, but at least + * we shouldn't have unpacked less. + */ + CheckCompressedData(decompressed_index >= num_elements); + Assert(decompressed_index <= num_elements_padded); + + /* + * Might happen if we have stray ones in the higher unused bits of the last + * block. + */ + CheckCompressedData(current_prefix_sum <= num_elements); + + Simple8bRleBitmap result = { + .data = prefix_sums, + .num_elements = num_elements, + .num_ones = current_prefix_sum, + }; + + return result; +} + +static Simple8bRleBitmap +simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) +{ + CheckCompressedData(compressed->num_elements <= GLOBAL_MAX_ROWS_PER_COMPRESSION); + CheckCompressedData(compressed->num_blocks <= GLOBAL_MAX_ROWS_PER_COMPRESSION); + + const uint16 num_elements = compressed->num_elements; + uint16 num_ones = 0; + + const uint16 num_selector_slots = + simple8brle_num_selector_slots_for_num_blocks(compressed->num_blocks); + const uint64 *compressed_data = compressed->slots + num_selector_slots; + + /* + * Pad to next multiple of 64 bytes on the right, so that we can simplify the + * decompression loop and the get() function. Note that for get() we need at + * least one byte of padding, hence the next multiple. + */ + const uint16 num_elements_padded = ((num_elements + 63) / 64 + 1) * 64; + const uint16 num_blocks = compressed->num_blocks; + + bool *restrict bitmap_bools_ = palloc(sizeof(bool) * num_elements_padded); + uint16 decompressed_index = 0; + for (uint16 block_index = 0; block_index < num_blocks; block_index++) + { + const uint16 selector_slot = block_index / SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; + const uint16 selector_pos_in_slot = block_index % SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; + const uint64 slot_value = compressed->slots[selector_slot]; + const uint8 selector_shift = selector_pos_in_slot * SIMPLE8B_BITS_PER_SELECTOR; + const uint64 selector_mask = 0xFULL << selector_shift; + const uint8 selector_value = (slot_value & selector_mask) >> selector_shift; + Assert(selector_value < 16); + + uint64 block_data = compressed_data[block_index]; + + if (simple8brle_selector_is_rle(selector_value)) + { + /* + * RLE block. + */ + const uint16 n_block_values = simple8brle_rledata_repeatcount(block_data); + CheckCompressedData(n_block_values <= GLOBAL_MAX_ROWS_PER_COMPRESSION); + + /* + * We might get an incorrect value from the corrupt data. Explicitly + * truncate it to 0/1 in case the bool is not a standard bool type + * which would have done it for us. + */ + const bool repeated_value = simple8brle_rledata_value(block_data) & 1; + + CheckCompressedData(decompressed_index + n_block_values <= num_elements); + + /* + * Write out the loop for both true and false, so that it becomes a + * simple memset. + */ + if (repeated_value) + { + for (uint16 i = 0; i < n_block_values; i++) + { + bitmap_bools_[decompressed_index + i] = true; + } + + num_ones += n_block_values; + } + else + { + for (uint16 i = 0; i < n_block_values; i++) + { + bitmap_bools_[decompressed_index + i] = false; + } + } + + decompressed_index += n_block_values; + Assert(decompressed_index <= num_elements); + } + else + { + /* + * Bit-packed block. Since this is a bitmap, this block has 64 bits + * packed. The last block might contain less than maximal possible + * number of elements, but we have 64 bytes of padding on the right + * so we don't care. + */ + CheckCompressedData(selector_value == 1); + + Assert(SIMPLE8B_BIT_LENGTH[selector_value] == 1); + Assert(SIMPLE8B_NUM_ELEMENTS[selector_value] == 64); + /* * We should require at least one element from the block. Previous * blocks might have had incorrect lengths, so this is not an * assertion. */ - CheckCompressedData(elements_this_block > 0); + CheckCompressedData(decompressed_index < num_elements); + + /* Have to zero out the unused bits, so that the popcnt works properly. */ + const int elements_this_block = Min(64, num_elements - decompressed_index); + Assert(elements_this_block <= 64); + Assert(elements_this_block > 0); block_data &= (-1ULL) >> (64 - elements_this_block); /* @@ -147,7 +309,7 @@ simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) #ifdef HAVE__BUILTIN_POPCOUNT num_ones += __builtin_popcountll(block_data); #endif - for (int16 i = 0; i < 64; i++) + for (uint16 i = 0; i < 64; i++) { const uint64 value = (block_data >> i) & 1; bitmap_bools_[decompressed_index + i] = value; @@ -172,8 +334,11 @@ simple8brle_bitmap_decompress(Simple8bRleSerialized *compressed) */ CheckCompressedData(num_ones <= num_elements); - result.bitmap_bools_ = bitmap_bools_; - result.num_ones = num_ones; + Simple8bRleBitmap result = { + .num_elements = num_elements, + .data = bitmap_bools_, + .num_ones = num_ones, + }; /* Sanity check. */ #ifdef USE_ASSERT_CHECKING diff --git a/tsl/src/compression/simple8b_rle_decompress_all.h b/tsl/src/compression/simple8b_rle_decompress_all.h index a13ea365896..48a168fb581 100644 --- a/tsl/src/compression/simple8b_rle_decompress_all.h +++ b/tsl/src/compression/simple8b_rle_decompress_all.h @@ -11,21 +11,16 @@ * Specialization of bulk simple8brle decompression for a data type specified by * ELEMENT_TYPE macro. */ -static ELEMENT_TYPE * -FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *compressed, - int16 *n_) +static uint16 +FUNCTION_NAME(simple8brle_decompress_all_buf, + ELEMENT_TYPE)(Simple8bRleSerialized *compressed, + ELEMENT_TYPE *restrict decompressed_values, uint16 n_buffer_elements) { - const uint16 num_selector_slots = - simple8brle_num_selector_slots_for_num_blocks(compressed->num_blocks); - const uint16 n_total_values = compressed->num_elements; - Assert(n_total_values <= GLOBAL_MAX_ROWS_PER_COMPRESSION); + const uint16 num_selector_slots = + simple8brle_num_selector_slots_for_num_blocks(compressed->num_blocks); const uint16 num_blocks = compressed->num_blocks; - const uint16 n_padded_values = ((n_total_values + 63) / 64 + 1) * 64; - - ELEMENT_TYPE *restrict decompressed_values = palloc(sizeof(ELEMENT_TYPE) * n_padded_values); - uint32 decompressed_index = 0; /* * Unpack the selector slots to get the selector values. Best done separately, @@ -36,8 +31,8 @@ FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *c const uint64 *restrict slots = compressed->slots; for (uint32 block_index = 0; block_index < num_blocks; block_index++) { - const int selector_slot = block_index / SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; - const int selector_pos_in_slot = block_index % SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; + const uint16 selector_slot = block_index / SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; + const uint16 selector_pos_in_slot = block_index % SIMPLE8B_SELECTORS_PER_SELECTOR_SLOT; const uint64 slot_value = slots[selector_slot]; const uint8 selector_shift = selector_pos_in_slot * SIMPLE8B_BITS_PER_SELECTOR; const uint64 selector_mask = 0xFULL << selector_shift; @@ -48,6 +43,7 @@ FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *c /* * Now decompress the individual blocks. */ + int decompressed_index = 0; const uint64 *restrict blocks = compressed->slots + num_selector_slots; for (uint32 block_index = 0; block_index < num_blocks; block_index++) { @@ -57,11 +53,11 @@ FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *c /* We don't see RLE blocks so often in the real data, <1% of blocks. */ if (unlikely(simple8brle_selector_is_rle(selector_value))) { - const int n_block_values = simple8brle_rledata_repeatcount(block_data); - CheckCompressedData(decompressed_index + n_block_values <= n_total_values); + const uint16 n_block_values = simple8brle_rledata_repeatcount(block_data); + CheckCompressedData(decompressed_index + n_block_values <= n_buffer_elements); const ELEMENT_TYPE repeated_value = simple8brle_rledata_value(block_data); - for (int i = 0; i < n_block_values; i++) + for (uint16 i = 0; i < n_block_values; i++) { decompressed_values[decompressed_index + i] = repeated_value; } @@ -90,11 +86,11 @@ FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *c * might be incorrect. \ */ \ const uint16 n_block_values = SIMPLE8B_NUM_ELEMENTS[X]; \ - CheckCompressedData(decompressed_index + n_block_values < n_padded_values); \ + CheckCompressedData(decompressed_index + n_block_values < n_buffer_elements); \ \ const uint64 bitmask = simple8brle_selector_get_bitmask(X); \ \ - for (int i = 0; i < n_block_values; i++) \ + for (uint16 i = 0; i < n_block_values; i++) \ { \ const ELEMENT_TYPE value = (block_data >> (bits_per_value * i)) & bitmask; \ decompressed_values[decompressed_index + i] = value; \ @@ -133,11 +129,41 @@ FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *c /* * We can decompress more than expected because we work in full blocks, - * but if we decompressed less, this means broken data. + * but if we decompressed less, this means broken data. Better to report it + * not to have an uninitialized tail. */ CheckCompressedData(decompressed_index >= n_total_values); - Assert(decompressed_index <= n_padded_values); + Assert(decompressed_index <= n_buffer_elements); + + return n_total_values; +} + +/* + * The same function as above, but does palloc instead of taking the buffer as + * an input. We mark it as possibly unused because it is used not for every + * element type we have. + */ +static ELEMENT_TYPE *FUNCTION_NAME(simple8brle_decompress_all, + ELEMENT_TYPE)(Simple8bRleSerialized *compressed, uint16 *n_) + pg_attribute_unused(); + +static ELEMENT_TYPE * +FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *compressed, + uint16 *n_) +{ + const uint16 n_total_values = compressed->num_elements; + Assert(n_total_values <= GLOBAL_MAX_ROWS_PER_COMPRESSION); + + /* + * We need a significant padding of 64 elements, not bytes, here, because we + * work in Simple8B blocks which can contain up to 64 elements. + */ + const uint16 n_buffer_elements = ((n_total_values + 63) / 64 + 1) * 64; + + ELEMENT_TYPE *restrict decompressed_values = palloc(sizeof(ELEMENT_TYPE) * n_buffer_elements); + + *n_ = FUNCTION_NAME(simple8brle_decompress_all_buf, + ELEMENT_TYPE)(compressed, decompressed_values, n_buffer_elements); - *n_ = n_total_values; return decompressed_values; } diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index d8665f9bd26..2bb89c8eb9f 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -215,10 +215,6 @@ decompress_set_batch_state_to_unused(DecompressChunkState *chunk_state, int batc ExecClearTuple(batch_state->decompressed_slot_scan); MemoryContextReset(batch_state->per_batch_context); - if (batch_state->arrow_context) - { - MemoryContextReset(batch_state->arrow_context); - } chunk_state->unused_batch_states = bms_add_member(chunk_state->unused_batch_states, batch_id); } @@ -265,12 +261,8 @@ decompress_initialize_batch_state(DecompressChunkState *chunk_state, } batch_state->per_batch_context = AllocSetContextCreate(CurrentMemoryContext, - "DecompressChunk batch", - /* minContextSize = */ 0, - /* initBlockSize = */ 64 * 1024, - /* maxBlockSize = */ 64 * 1024); - /* Initialized on demand to save memory. */ - batch_state->arrow_context = NULL; + "DecompressChunk per_batch", + ALLOCSET_DEFAULT_SIZES); batch_state->columns = palloc0(list_length(chunk_state->decompression_map) * sizeof(DecompressChunkColumnState)); @@ -442,78 +434,6 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) node->custom_ps = lappend(node->custom_ps, ExecInitNode(compressed_scan, estate, eflags)); } -/* - * Convert Arrow array to an array of Postgres Datum's. - */ -static void -convert_arrow_to_data(DecompressChunkColumnState *column, ArrowArray *arrow) -{ - const int n = arrow->length; - Assert(n > 0); - -/* Unroll the conversion loop to generate more efficient code. */ -#define INNER_LOOP_SIZE 8 - const int n_padded = ((n + INNER_LOOP_SIZE - 1) / INNER_LOOP_SIZE) * (INNER_LOOP_SIZE); - - const int arrow_bitmap_elements = (n + 64 - 1) / 64; - const int n_nulls_padded = arrow_bitmap_elements * 64; - - /* Convert Arrow to Data/nulls arrays. */ - const uint64 *restrict validity_bitmap = arrow->buffers[0]; - const void *restrict arrow_values = arrow->buffers[1]; - Datum *restrict datums = palloc(sizeof(Datum) * n_padded); - bool *restrict nulls = palloc(sizeof(bool) * n_nulls_padded); - -/* - * Specialize the conversion loop for the particular Arrow and Postgres type, to - * generate more efficient code. - */ -#define CONVERSION_LOOP(OID, CTYPE, CONVERSION) \ - case OID: \ - for (int outer = 0; outer < n_padded; outer += INNER_LOOP_SIZE) \ - { \ - for (int inner = 0; inner < INNER_LOOP_SIZE; inner++) \ - { \ - const int row = outer + inner; \ - datums[row] = CONVERSION(((CTYPE *) arrow_values)[row]); \ - Assert(row >= 0); \ - Assert(row < n_padded); \ - } \ - } \ - break - - switch (column->typid) - { - CONVERSION_LOOP(INT2OID, int16, Int16GetDatum); - CONVERSION_LOOP(INT4OID, int32, Int32GetDatum); - CONVERSION_LOOP(INT8OID, int64, Int64GetDatum); - CONVERSION_LOOP(FLOAT4OID, float4, Float4GetDatum); - CONVERSION_LOOP(FLOAT8OID, float8, Float8GetDatum); - CONVERSION_LOOP(DATEOID, int32, DateADTGetDatum); - CONVERSION_LOOP(TIMESTAMPOID, int64, TimestampGetDatum); - CONVERSION_LOOP(TIMESTAMPTZOID, int64, TimestampTzGetDatum); - default: - Assert(false); - } -#undef CONVERSION_LOOP -#undef INNER_LOOP_SIZE - - for (int outer = 0; outer < arrow_bitmap_elements; outer++) - { - const uint64 element = validity_bitmap[outer]; - for (int inner = 0; inner < 64; inner++) - { - const int row = outer * 64 + inner; - nulls[row] = ((element >> inner) & 1) ? false : true; - Assert(row >= 0); - Assert(row < n_nulls_padded); - } - } - column->compressed.iterator = NULL; - column->compressed.datums = datums; - column->compressed.nulls = nulls; -} - void decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, TupleTableSlot *subslot) @@ -616,8 +536,9 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt { case COMPRESSED_COLUMN: { - column->compressed.datums = NULL; column->compressed.iterator = NULL; + column->compressed.arrow = NULL; + column->compressed.value_bytes = -1; value = slot_getattr(batch_state->compressed_slot, column->compressed_scan_attno, &isnull); @@ -627,7 +548,6 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt * The column will have a default value for the entire batch, * set it now. */ - column->compressed.iterator = NULL; AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno); batch_state->decompressed_slot_scan->tts_values[attr] = @@ -640,49 +560,44 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt /* Decompress the entire batch if it is supported. */ CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value); + /* + * For now we disable bulk decompression for batch sorted + * merge plans. They involve keeping many open batches at + * the same time, so the memory usage might increase greatly. + */ ArrowArray *arrow = NULL; - if (!chunk_state->reverse && !chunk_state->sorted_merge_append && - ts_guc_enable_bulk_decompression) + if (!chunk_state->sorted_merge_append && ts_guc_enable_bulk_decompression) { - /* - * In principle, we could do this for reverse decompression - * as well, but have to figure out how to do the batch - * Arrow->Datum conversion while respecting the paddings. - * Maybe have to allocate the output array at offset so that the - * padding is at the beginning. - * - * For now we also disable bulk decompression for batch sorted - * merge plans. They involve keeping many open batches at - * the same time, so the memory usage might increase greatly. - */ - if (batch_state->arrow_context == NULL) + if (chunk_state->bulk_decompression_context == NULL) { - batch_state->arrow_context = + chunk_state->bulk_decompression_context = AllocSetContextCreate(MemoryContextGetParent( batch_state->per_batch_context), - "DecompressChunk Arrow arrays", + "bulk decompression", /* minContextSize = */ 0, /* initBlockSize = */ 64 * 1024, /* maxBlockSize = */ 64 * 1024); } - MemoryContext context_before_decompression = - MemoryContextSwitchTo(batch_state->arrow_context); + DecompressAllFunction decompress_all = + tsl_get_decompress_all_function(header->compression_algorithm); + if (decompress_all) + { + MemoryContext context_before_decompression = + MemoryContextSwitchTo(chunk_state->bulk_decompression_context); + + arrow = decompress_all(PointerGetDatum(header), + column->typid, + batch_state->per_batch_context); + + MemoryContextReset(chunk_state->bulk_decompression_context); - arrow = tsl_try_decompress_all(header->compression_algorithm, - PointerGetDatum(header), - column->typid); - MemoryContextSwitchTo(context_before_decompression); + MemoryContextSwitchTo(context_before_decompression); + } } if (arrow) { - /* - * Currently we're going to convert the Arrow arrays to postgres - * Data right away, but in the future we will perform vectorized - * operations on Arrow arrays directly before that. - */ - if (batch_state->total_batch_rows == 0) { batch_state->total_batch_rows = arrow->length; @@ -692,9 +607,7 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt elog(ERROR, "compressed column out of sync with batch counter"); } - convert_arrow_to_data(column, arrow); - - MemoryContextReset(batch_state->arrow_context); + column->compressed.arrow = arrow; /* * Note the fact that we are using bulk decompression, for @@ -702,6 +615,8 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt */ chunk_state->using_bulk_decompression = true; + column->compressed.value_bytes = get_typlen(column->typid); + break; } @@ -970,6 +885,11 @@ decompress_get_next_tuple_from_batch(DecompressChunkState *chunk_state, Assert(batch_state->total_batch_rows > 0); Assert(batch_state->current_batch_row < batch_state->total_batch_rows); + const int output_row = batch_state->current_batch_row++; + const size_t arrow_row = unlikely(chunk_state->reverse) ? + batch_state->total_batch_rows - 1 - output_row : + output_row; + for (int i = 0; i < chunk_state->num_columns; i++) { DecompressChunkColumnState *column = &batch_state->columns[i]; @@ -993,17 +913,44 @@ decompress_get_next_tuple_from_batch(DecompressChunkState *chunk_state, decompressed_slot_scan->tts_isnull[attr] = result.is_null; decompressed_slot_scan->tts_values[attr] = result.val; } - else if (column->compressed.datums != NULL) + else if (column->compressed.arrow != NULL) { + const char *src = column->compressed.arrow->buffers[1]; + Assert(column->compressed.value_bytes > 0); + + /* + * The conversion of Datum to more narrow types will truncate + * the higher bytes, so we don't care if we read some garbage + * into them. These are unaligned reads, so technically we have + * to do memcpy. + */ + uint64 value; + memcpy(&value, &src[column->compressed.value_bytes * arrow_row], 8); + +#ifdef USE_FLOAT8_BYVAL + Datum datum = Int64GetDatum(value); +#else + /* + * On 32-bit systems, the data larger than 4 bytes go by + * reference, so we have to jump through these hoops. + */ + Datum datum; + if (column->compressed.value_bytes <= 4) + { + datum = Int32GetDatum((uint32) value); + } + else + { + datum = Int64GetDatum(value); + } +#endif + const AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno); + decompressed_slot_scan->tts_values[attr] = datum; decompressed_slot_scan->tts_isnull[attr] = - column->compressed.nulls[batch_state->current_batch_row]; - decompressed_slot_scan->tts_values[attr] = - column->compressed.datums[batch_state->current_batch_row]; + !arrow_row_is_valid(column->compressed.arrow->buffers[0], arrow_row); } } - batch_state->current_batch_row++; - /* * It's a virtual tuple slot, so no point in clearing/storing it * per each row, we can just update the values in-place. This saves diff --git a/tsl/src/nodes/decompress_chunk/exec.h b/tsl/src/nodes/decompress_chunk/exec.h index c51e0f4e4e1..ab3f88a38b9 100644 --- a/tsl/src/nodes/decompress_chunk/exec.h +++ b/tsl/src/nodes/decompress_chunk/exec.h @@ -65,8 +65,8 @@ typedef struct DecompressChunkColumnState DecompressionIterator *iterator; /* For entire batch decompression, mutually exclusive with the above. */ - Datum *datums; - bool *nulls; + ArrowArray *arrow; + int value_bytes; } compressed; }; } DecompressChunkColumnState; @@ -84,7 +84,6 @@ typedef struct DecompressBatchState int total_batch_rows; int current_batch_row; MemoryContext per_batch_context; - MemoryContext arrow_context; } DecompressBatchState; typedef struct DecompressChunkState @@ -111,6 +110,12 @@ typedef struct DecompressChunkState bool using_bulk_decompression; /* For EXPLAIN ANALYZE. */ + /* + * Scratch space for bulk decompression which might need a lot of temporary + * data. + */ + MemoryContext bulk_decompression_context; + /* * Make non-refcounted copies of the tupdesc for reuse across all batch states * and avoid spending CPU in ResourceOwner when creating a big number of table diff --git a/tsl/test/expected/compression_algos.out b/tsl/test/expected/compression_algos.out index 5260977e70e..d2a8a6a5622 100644 --- a/tsl/test/expected/compression_algos.out +++ b/tsl/test/expected/compression_algos.out @@ -1550,9 +1550,9 @@ from ts_read_compressed_data_directory('gorilla', 'float8', (:'TEST_INPUT_DIR' | group by 2 order by 1 desc; count | result -------+-------- - 227 | XX001 - 53 | true - 22 | 08P01 + 224 | XX001 + 55 | true + 23 | 08P01 (3 rows) select count(*), coalesce((rows >= 0)::text, sqlstate) result diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-12.out b/tsl/test/shared/expected/transparent_decompress_chunk-12.out index 994663b615c..35a4ec81fee 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-12.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-12.out @@ -14,7 +14,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=5 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -380,7 +380,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 Filter: (_hyper_X_X_chunk."time" > 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) Rows Removed by Filter: 31 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -517,7 +517,7 @@ SET enable_seqscan TO FALSE; QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -528,7 +528,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -539,7 +539,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk test_table (actual rows=3598 loops=1) Output: test_table.*, test_table.device_id, test_table."time" - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-13.out b/tsl/test/shared/expected/transparent_decompress_chunk-13.out index 092e98d5497..40ba67f35a6 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-13.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-13.out @@ -14,7 +14,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=5 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -380,7 +380,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 Filter: (_hyper_X_X_chunk."time" > 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) Rows Removed by Filter: 31 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -519,7 +519,7 @@ SET enable_seqscan TO FALSE; QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -530,7 +530,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -541,7 +541,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk test_table (actual rows=3598 loops=1) Output: test_table.*, test_table.device_id, test_table."time" - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-14.out b/tsl/test/shared/expected/transparent_decompress_chunk-14.out index 092e98d5497..40ba67f35a6 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-14.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-14.out @@ -14,7 +14,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=5 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -380,7 +380,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 Filter: (_hyper_X_X_chunk."time" > 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) Rows Removed by Filter: 31 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -519,7 +519,7 @@ SET enable_seqscan TO FALSE; QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -530,7 +530,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -541,7 +541,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk test_table (actual rows=3598 loops=1) Output: test_table.*, test_table.device_id, test_table."time" - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-15.out b/tsl/test/shared/expected/transparent_decompress_chunk-15.out index bc6faa7817c..6a7b2baa9c1 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-15.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-15.out @@ -14,7 +14,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=5 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -382,7 +382,7 @@ QUERY PLAN Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 Filter: (_hyper_X_X_chunk."time" > 'Fri Dec 31 17:00:00 1999 PST'::timestamp with time zone) Rows Removed by Filter: 31 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=1 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -521,7 +521,7 @@ SET enable_seqscan TO FALSE; QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -532,7 +532,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk (actual rows=3598 loops=1) Output: _hyper_X_X_chunk."time", _hyper_X_X_chunk.device_id, _hyper_X_X_chunk.v0, _hyper_X_X_chunk.v1, _hyper_X_X_chunk.v2, _hyper_X_X_chunk.v3 - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) @@ -543,7 +543,7 @@ QUERY PLAN QUERY PLAN Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_X_X_chunk test_table (actual rows=3598 loops=1) Output: test_table.*, test_table.device_id, test_table."time" - Bulk Decompression: false + Bulk Decompression: true -> Index Scan Backward using compress_hyper_X_X_chunk__compressed_hypertable_4_device_id__t on _timescaledb_internal.compress_hyper_X_X_chunk (actual rows=4 loops=1) Output: compress_hyper_X_X_chunk."time", compress_hyper_X_X_chunk.device_id, compress_hyper_X_X_chunk.v0, compress_hyper_X_X_chunk.v1, compress_hyper_X_X_chunk.v2, compress_hyper_X_X_chunk.v3, compress_hyper_X_X_chunk._ts_meta_count, compress_hyper_X_X_chunk._ts_meta_sequence_num, compress_hyper_X_X_chunk._ts_meta_min_1, compress_hyper_X_X_chunk._ts_meta_max_1 Index Cond: (compress_hyper_X_X_chunk.device_id = 1) diff --git a/tsl/test/src/test_compression.c b/tsl/test/src/test_compression.c index 6a0bb16d9f0..41fe7ae7fbc 100644 --- a/tsl/test/src/test_compression.c +++ b/tsl/test/src/test_compression.c @@ -392,7 +392,8 @@ test_gorilla_double(bool have_nulls, bool have_random) /* Forward decompression. */ DecompressionIterator *iter = gorilla_decompression_iterator_from_datum_forward(PointerGetDatum(compressed), FLOAT8OID); - ArrowArray *bulk_result = gorilla_decompress_all(PointerGetDatum(compressed), FLOAT8OID); + ArrowArray *bulk_result = + gorilla_decompress_all(PointerGetDatum(compressed), FLOAT8OID, CurrentMemoryContext); for (int i = 0; i < TEST_ELEMENTS; i++) { DecompressResult r = gorilla_decompression_iterator_try_next_forward(iter); @@ -550,7 +551,8 @@ test_delta3(bool have_nulls, bool have_random) /* Forward decompression. */ DecompressionIterator *iter = delta_delta_decompression_iterator_from_datum_forward(PointerGetDatum(compressed), INT8OID); - ArrowArray *bulk_result = delta_delta_decompress_all(PointerGetDatum(compressed), INT8OID); + ArrowArray *bulk_result = + delta_delta_decompress_all(PointerGetDatum(compressed), INT8OID, CurrentMemoryContext); for (int i = 0; i < TEST_ELEMENTS; i++) { DecompressResult r = delta_delta_decompression_iterator_try_next_forward(iter); @@ -592,6 +594,42 @@ test_delta3(bool have_nulls, bool have_random) TestAssertTrue(r.is_done); } +static int32 test_delta4_case1[] = { -603979776, 1462059044 }; + +static int32 test_delta4_case2[] = { + 0x7979fd07, 0x79797979, 0x79797979, 0x79797979, 0x79797979, 0x79797979, 0x79797979, + 0x79797979, 0x79797979, 0x79797979, 0x79797979, 0x79797979, 0x79797979, 0x79797979, + 0x79797979, 0x50505050, 0xc4c4c4c4, 0xc4c4c4c4, 0x50505050, 0x50505050, 0xc4c4c4c4, +}; + +static void +test_delta4(const int32 *values, int n) +{ + Compressor *compressor = delta_delta_compressor_for_type(INT4OID); + for (int i = 0; i < n; i++) + { + compressor->append_val(compressor, Int32GetDatum(values[i])); + } + Datum compressed = (Datum) compressor->finish(compressor); + + ArrowArray *arrow = delta_delta_decompress_all(compressed, INT4OID, CurrentMemoryContext); + DecompressionIterator *iter = + delta_delta_decompression_iterator_from_datum_forward(compressed, INT4OID); + int i = 0; + for (DecompressResult r = delta_delta_decompression_iterator_try_next_forward(iter); !r.is_done; + r = delta_delta_decompression_iterator_try_next_forward(iter)) + { + TestAssertTrue(!r.is_null); + TestAssertTrue(i < arrow->length); + TestAssertTrue(((int32 *) arrow->buffers[1])[i] == DatumGetInt32(r.val)); + TestAssertTrue(arrow_row_is_valid(arrow->buffers[0], i)); + TestAssertTrue(values[i] == DatumGetInt32(r.val)); + i++; + } + TestAssertTrue(i == arrow->length); + TestAssertTrue(i == n); +} + Datum ts_test_compression(PG_FUNCTION_ARGS) { @@ -611,6 +649,11 @@ ts_test_compression(PG_FUNCTION_ARGS) test_delta3(/* have_nulls = */ false, /* have_random = */ true); test_delta3(/* have_nulls = */ true, /* have_random = */ false); test_delta3(/* have_nulls = */ true, /* have_random = */ true); + + /* Some tests for zig-zag encoding overflowing the original element width. */ + test_delta4(test_delta4_case1, sizeof(test_delta4_case1) / sizeof(*test_delta4_case1)); + test_delta4(test_delta4_case2, sizeof(test_delta4_case2) / sizeof(*test_delta4_case2)); + PG_RETURN_VOID(); }