Skip to content

Commit

Permalink
GH-43129: [C++][Compute] Fix the unnecessary allocation of extra byte…
Browse files Browse the repository at this point in the history
…s when encoding row table (#43125)

### Rationale for this change

As described in #43129 , current row table occupies more memory than expected. The memory consumption is double of necessary. The reason listed below.

When encoding var length columns into into the row table:
https://github.com/apache/arrow/blob/e59832fb05dc40a85fa63297c77c8f134c9ac8e0/cpp/src/arrow/compute/row/encode_internal.cc#L155-L162

We first call `AppendEmpty` to reserve space for `x` rows but `0` bytes. This is to reserve enough size for the underlying fixed-length buffers: null masks and offsets (for var-length columns).

Then we call `GetRowOffsetsSelected` to populate the offsets.

At last we call `AppendEmpty` again with `0` rows but `y` bytes, where `y` is the last offset element which is essentially the whole size of the var-length columns.

Sounds all reasonable so far.

However, `AppendEmpty` calls `ResizeOptionalVaryingLengthBuffer`, in which:
https://github.com/apache/arrow/blob/e59832fb05dc40a85fa63297c77c8f134c9ac8e0/cpp/src/arrow/compute/row/row_internal.cc#L294-L303

We calculate `bytes_capacity_new` by keeping doubling it until it's big enough for `num_bytes + num_extra_bytes`.

Note by the time of this point, `num_bytes == offsets()[num_rows_]` is already `y`, meanwhile `num_extra_bytes` is also `y`, hence the unexpected doubled size than necessary.

### What changes are included in this PR?

Fix the wasted half size for buffers in row table. Also add tests to make sure the buffer size is as expected.

### Are these changes tested?

UT included.

### Are there any user-facing changes?

None.

* GitHub Issue: #43129

Authored-by: Ruoxi Sun <zanmato1984@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
zanmato1984 committed Jul 10, 2024
1 parent 031497d commit 3b7ad9d
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 12 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ add_arrow_test(internals_test
key_hash_test.cc
row/compare_test.cc
row/grouper_test.cc
row/row_test.cc
util_internal_test.cc)

add_arrow_compute_test(expression_test SOURCES expression_test.cc)
Expand Down
17 changes: 12 additions & 5 deletions cpp/src/arrow/compute/row/encode_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,21 @@ void RowTableEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
Status RowTableEncoder::EncodeSelected(RowTableImpl* rows, uint32_t num_selected,
const uint16_t* selection) {
rows->Clean();
RETURN_NOT_OK(
rows->AppendEmpty(static_cast<uint32_t>(num_selected), static_cast<uint32_t>(0)));

// First AppendEmpty with num_selected rows and zero extra bytes to resize the
// fixed-length buffers (including buffer for offsets).
RETURN_NOT_OK(
rows->AppendEmpty(static_cast<uint32_t>(num_selected),
/*num_extra_bytes_to_append=*/static_cast<uint32_t>(0)));
// Then populate the offsets of the var-length columns, which will be used as the target
// size of the var-length buffers resizing below.
EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected,
selection);

RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(0),
static_cast<uint32_t>(rows->offsets()[num_selected])));
// Last AppendEmpty with zero rows and zero extra bytes to resize the var-length buffers
// based on the populated offsets.
RETURN_NOT_OK(
rows->AppendEmpty(/*num_rows_to_append=*/static_cast<uint32_t>(0),
/*num_extra_bytes_to_append=*/static_cast<uint32_t>(0)));

for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) {
if (batch_all_cols_[icol].metadata().is_fixed_length) {
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/row/row_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ int64_t RowTableImpl::size_rows_varying_length(int64_t num_bytes) const {
}

void RowTableImpl::UpdateBufferPointers() {
buffers_[0] = null_masks_->mutable_data();
buffers_[0] = null_masks_.get();
if (metadata_.is_fixed_length) {
buffers_[1] = rows_->mutable_data();
buffers_[1] = rows_.get();
buffers_[2] = nullptr;
} else {
buffers_[1] = offsets_->mutable_data();
buffers_[2] = rows_->mutable_data();
buffers_[1] = offsets_.get();
buffers_[2] = rows_.get();
}
}

Expand Down
18 changes: 15 additions & 3 deletions cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,17 @@ class ARROW_EXPORT RowTableImpl {
// Accessors into the table's buffers
const uint8_t* data(int i) const {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i];
if (ARROW_PREDICT_TRUE(buffers_[i])) {
return buffers_[i]->data();
}
return NULLPTR;
}
uint8_t* mutable_data(int i) {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i];
if (ARROW_PREDICT_TRUE(buffers_[i])) {
return buffers_[i]->mutable_data();
}
return NULLPTR;
}
const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
Expand All @@ -207,6 +213,12 @@ class ARROW_EXPORT RowTableImpl {
/// successive calls
bool has_any_nulls(const LightContext* ctx) const;

/// \brief Size of the table's buffers
int64_t buffer_size(int i) const {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i]->size();
}

private:
Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
Expand Down Expand Up @@ -236,7 +248,7 @@ class ARROW_EXPORT RowTableImpl {
// Stores the fixed-length parts of the rows
std::unique_ptr<ResizableBuffer> rows_;
static constexpr int kMaxBuffers = 3;
uint8_t* buffers_[kMaxBuffers];
ResizableBuffer* buffers_[kMaxBuffers];
// The number of rows in the table
int64_t num_rows_;
// The number of rows that can be stored in the table without resizing
Expand Down
129 changes: 129 additions & 0 deletions cpp/src/arrow/compute/row/row_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <numeric>

#include "arrow/compute/row/encode_internal.h"
#include "arrow/compute/row/row_internal.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"

namespace arrow {
namespace compute {

namespace {

Result<RowTableImpl> MakeRowTableFromColumn(const std::shared_ptr<Array>& column,
int64_t num_rows, int row_alignment,
int string_alignment) {
DCHECK_GE(column->length(), num_rows);
MemoryPool* pool = default_memory_pool();

std::vector<KeyColumnArray> column_arrays;
std::vector<Datum> values{column};
ExecBatch batch(std::move(values), num_rows);
RETURN_NOT_OK(ColumnArraysFromExecBatch(batch, &column_arrays));

std::vector<KeyColumnMetadata> column_metadatas;
RETURN_NOT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas));
RowTableMetadata table_metadata;
table_metadata.FromColumnMetadataVector(column_metadatas, row_alignment,
string_alignment);

RowTableImpl row_table;
RETURN_NOT_OK(row_table.Init(pool, table_metadata));

RowTableEncoder row_encoder;
row_encoder.Init(column_metadatas, row_alignment, string_alignment);
row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays);

std::vector<uint16_t> row_ids(num_rows);
std::iota(row_ids.begin(), row_ids.end(), 0);

RETURN_NOT_OK(row_encoder.EncodeSelected(&row_table, static_cast<uint32_t>(num_rows),
row_ids.data()));

return row_table;
}

} // namespace

// GH-43129: Ensure that the memory consumption of the row table is reasonable, that is,
// with the growth factor of 2, the actual memory usage does not exceed twice the amount
// of memory actually needed.
TEST(RowTableMemoryConsumption, Encode) {
constexpr int64_t num_rows_max = 8192;
constexpr int64_t padding_for_vectors = 64;

ASSERT_OK_AND_ASSIGN(
auto fixed_length_column,
::arrow::gen::Constant(std::make_shared<UInt32Scalar>(0))->Generate(num_rows_max));
ASSERT_OK_AND_ASSIGN(auto var_length_column,
::arrow::gen::Constant(std::make_shared<BinaryScalar>("X"))
->Generate(num_rows_max));

for (int64_t num_rows : {1023, 1024, 1025, 4095, 4096, 4097}) {
// Fixed length column.
{
SCOPED_TRACE("encoding fixed length column of " + std::to_string(num_rows) +
" rows");
ASSERT_OK_AND_ASSIGN(auto row_table,
MakeRowTableFromColumn(fixed_length_column, num_rows,
uint32()->byte_width(), 0));
ASSERT_NE(row_table.data(0), NULLPTR);
ASSERT_NE(row_table.data(1), NULLPTR);
ASSERT_EQ(row_table.data(2), NULLPTR);

int64_t actual_null_mask_size =
num_rows * row_table.metadata().null_masks_bytes_per_row;
ASSERT_LE(actual_null_mask_size, row_table.buffer_size(0) - padding_for_vectors);
ASSERT_GT(actual_null_mask_size * 2,
row_table.buffer_size(0) - padding_for_vectors);

int64_t actual_rows_size = num_rows * uint32()->byte_width();
ASSERT_LE(actual_rows_size, row_table.buffer_size(1) - padding_for_vectors);
ASSERT_GT(actual_rows_size * 2, row_table.buffer_size(1) - padding_for_vectors);
}

// Var length column.
{
SCOPED_TRACE("encoding var length column of " + std::to_string(num_rows) + " rows");
ASSERT_OK_AND_ASSIGN(auto row_table,
MakeRowTableFromColumn(var_length_column, num_rows, 4, 4));
ASSERT_NE(row_table.data(0), NULLPTR);
ASSERT_NE(row_table.data(1), NULLPTR);
ASSERT_NE(row_table.data(2), NULLPTR);

int64_t actual_null_mask_size =
num_rows * row_table.metadata().null_masks_bytes_per_row;
ASSERT_LE(actual_null_mask_size, row_table.buffer_size(0) - padding_for_vectors);
ASSERT_GT(actual_null_mask_size * 2,
row_table.buffer_size(0) - padding_for_vectors);

int64_t actual_offset_size = num_rows * sizeof(uint32_t);
ASSERT_LE(actual_offset_size, row_table.buffer_size(1) - padding_for_vectors);
ASSERT_GT(actual_offset_size * 2, row_table.buffer_size(1) - padding_for_vectors);

int64_t actual_rows_size = num_rows * row_table.offsets()[1];
ASSERT_LE(actual_rows_size, row_table.buffer_size(2) - padding_for_vectors);
ASSERT_GT(actual_rows_size * 2, row_table.buffer_size(2) - padding_for_vectors);
}
}
}

} // namespace compute
} // namespace arrow

0 comments on commit 3b7ad9d

Please sign in to comment.