From 727ea5bac1f057c8c7c8e416a540000d31dd351e Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Wed, 9 Oct 2024 17:12:04 -0700 Subject: [PATCH] [WIP] PO DEBUG --- velox/common/memory/ByteStream.cpp | 3 ++ velox/exec/PartitionedOutput.h | 4 +++ velox/serializers/PrestoSerializer.cpp | 47 +++++++++++++++++++++----- velox/vector/VectorStream.h | 5 +++ 4 files changed, 50 insertions(+), 9 deletions(-) diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index ae503a19237e..7ff3bd1d80d4 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -350,6 +350,9 @@ void ByteOutputStream::extend(int32_t bytes) { ranges_.emplace_back(); current_ = &ranges_.back(); lastRangeEnd_ = 0; + if (bytes == 0) { + return; + } arena_->newRange( newRangeSize(bytes), ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2], diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 4589646a167f..62aea5c3a5b4 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -86,6 +86,10 @@ class Destination { return bytesInCurrent_; } + int64_t actualSerializedBytes() { + return current_->totalLengthRecursive(); + } + /// Adds stats from 'this' to runtime stats of 'op'. void updateStats(Operator* op); diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index ecbf1be4b911..e075859db461 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1387,8 +1387,8 @@ class VectorStream { // The first element in the offsets in the wire format is always 0 for // nested types. - lengths_.startWrite(sizeof(vector_size_t)); - lengths_.appendOne(0); + lengths_.startWrite(0); +// lengths_.appendOne(0); } return; } @@ -1437,6 +1437,17 @@ class VectorStream { initializeFlatStream(vector, initialNumRows); } + int64_t totalLengthRecursive() { + if (children_.empty()) { + return totalLength_; + } + int64_t totalLength{0}; + for (auto& child : children_){ + totalLength += child->totalLengthRecursive(); + } + return totalLength + totalLength_; + } + void flattenStream(const VectorPtr& vector, int32_t initialNumRows) { VELOX_CHECK_EQ(nullCount_, 0); VELOX_CHECK_EQ(nonNullCount_, 0); @@ -1506,6 +1517,9 @@ class VectorStream { void appendLength(int32_t length) { totalLength_ += length; + if (UNLIKELY(lengths_.size() == 0 && type_->size() > 0)) { + lengths_.appendOne(0); + } lengths_.appendOne(totalLength_); } @@ -1721,7 +1735,7 @@ class VectorStream { if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY || type_->kind() == TypeKind::MAP) { // A complex type has a 0 as first length. - lengths_.appendOne(0); +// lengths_.appendOne(0); } } nulls_.startWrite(nulls_.size()); @@ -1736,7 +1750,7 @@ class VectorStream { std::optional vector, vector_size_t initialNumRows) { initializeHeader(typeToEncodingName(type_), *streamArena_); - nulls_.startWrite(1 + (initialNumRows / 8)); + nulls_.startWrite(0); switch (type_->kind()) { case TypeKind::ROW: @@ -1745,7 +1759,7 @@ class VectorStream { [[fallthrough]]; case TypeKind::MAP: hasLengths_ = true; - lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); + lengths_.startWrite(0); children_.resize(type_->size()); for (int32_t i = 0; i < type_->size(); ++i) { children_[i] = std::make_unique( @@ -1758,20 +1772,20 @@ class VectorStream { } // The first element in the offsets in the wire format is always 0 for // nested types. - lengths_.appendOne(0); +// lengths_.appendOne(0); break; case TypeKind::VARCHAR: [[fallthrough]]; case TypeKind::VARBINARY: hasLengths_ = true; - lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); + lengths_.startWrite(0); if (values_.ranges().empty()) { - values_.startWrite(initialNumRows * 10); + values_.startWrite(0); } break; default: if (values_.ranges().empty()) { - values_.startWrite(initialNumRows * 4); + values_.startWrite(0); } break; } @@ -1972,6 +1986,8 @@ void serializeWrapped( } } +// Serialized layout for RowVector +// null bits (1 bit per row) | serialized child_0 | serialized child_1 | ... void serializeRowVector( const VectorPtr& vector, const folly::Range& ranges, @@ -3938,6 +3954,14 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } } + int64_t totalLengthRecursive() { + int64_t totalLength{0}; + for (auto& stream : streams_) { + totalLength += stream->totalLengthRecursive(); + } + return totalLength; + } + void append( const RowVectorPtr& vector, const folly::Range& ranges, @@ -3952,6 +3976,11 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } } + static int64_t& maxLength() { + static int64_t maxLength{0}; + return maxLength; + } + void append( const RowVectorPtr& vector, const folly::Range& rows, diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index b8304bdc2e28..9a90d875be29 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -45,6 +45,7 @@ class IterativeVectorSerializer { public: virtual ~IterativeVectorSerializer() = default; + virtual int64_t totalLengthRecursive() {return 0;} /// Serialize a subset of rows in a vector. virtual void append( const RowVectorPtr& vector, @@ -268,6 +269,10 @@ class VectorStreamGroup : public StreamArena { : StreamArena(pool), serde_(serde != nullptr ? serde : getVectorSerde()) {} + int64_t totalLengthRecursive() { + return serializer_->totalLengthRecursive(); + } + void createStreamTree( RowTypePtr type, int32_t numRows,