Skip to content

Commit

Permalink
[WIP] PO DEBUG
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 10, 2024
1 parent 3ce3fb1 commit 727ea5b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 9 deletions.
3 changes: 3 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ void ByteOutputStream::extend(int32_t bytes) {
ranges_.emplace_back();
current_ = &ranges_.back();
lastRangeEnd_ = 0;
if (bytes == 0) {
return;
}
arena_->newRange(
newRangeSize(bytes),
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class Destination {
return bytesInCurrent_;
}

int64_t actualSerializedBytes() {
return current_->totalLengthRecursive();
}

/// Adds stats from 'this' to runtime stats of 'op'.
void updateStats(Operator* op);

Expand Down
47 changes: 38 additions & 9 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1387,8 +1387,8 @@ class VectorStream {

// The first element in the offsets in the wire format is always 0 for
// nested types.
lengths_.startWrite(sizeof(vector_size_t));
lengths_.appendOne<int32_t>(0);
lengths_.startWrite(0);
// lengths_.appendOne<int32_t>(0);
}
return;
}
Expand Down Expand Up @@ -1437,6 +1437,17 @@ class VectorStream {
initializeFlatStream(vector, initialNumRows);
}

int64_t totalLengthRecursive() {
if (children_.empty()) {
return totalLength_;
}
int64_t totalLength{0};
for (auto& child : children_){
totalLength += child->totalLengthRecursive();
}
return totalLength + totalLength_;
}

void flattenStream(const VectorPtr& vector, int32_t initialNumRows) {
VELOX_CHECK_EQ(nullCount_, 0);
VELOX_CHECK_EQ(nonNullCount_, 0);
Expand Down Expand Up @@ -1506,6 +1517,9 @@ class VectorStream {

void appendLength(int32_t length) {
totalLength_ += length;
if (UNLIKELY(lengths_.size() == 0 && type_->size() > 0)) {
lengths_.appendOne<int32_t>(0);
}
lengths_.appendOne<int32_t>(totalLength_);
}

Expand Down Expand Up @@ -1721,7 +1735,7 @@ class VectorStream {
if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY ||
type_->kind() == TypeKind::MAP) {
// A complex type has a 0 as first length.
lengths_.appendOne<int32_t>(0);
// lengths_.appendOne<int32_t>(0);
}
}
nulls_.startWrite(nulls_.size());
Expand All @@ -1736,7 +1750,7 @@ class VectorStream {
std::optional<VectorPtr> vector,
vector_size_t initialNumRows) {
initializeHeader(typeToEncodingName(type_), *streamArena_);
nulls_.startWrite(1 + (initialNumRows / 8));
nulls_.startWrite(0);

switch (type_->kind()) {
case TypeKind::ROW:
Expand All @@ -1745,7 +1759,7 @@ class VectorStream {
[[fallthrough]];
case TypeKind::MAP:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
lengths_.startWrite(0);
children_.resize(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
children_[i] = std::make_unique<VectorStream>(
Expand All @@ -1758,20 +1772,20 @@ class VectorStream {
}
// The first element in the offsets in the wire format is always 0 for
// nested types.
lengths_.appendOne<int32_t>(0);
// lengths_.appendOne<int32_t>(0);
break;
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
lengths_.startWrite(0);
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 10);
values_.startWrite(0);
}
break;
default:
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 4);
values_.startWrite(0);
}
break;
}
Expand Down Expand Up @@ -1972,6 +1986,8 @@ void serializeWrapped(
}
}

// Serialized layout for RowVector
// null bits (1 bit per row) | serialized child_0 | serialized child_1 | ...
void serializeRowVector(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Expand Down Expand Up @@ -3938,6 +3954,14 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
}
}

int64_t totalLengthRecursive() {
int64_t totalLength{0};
for (auto& stream : streams_) {
totalLength += stream->totalLengthRecursive();
}
return totalLength;
}

void append(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Expand All @@ -3952,6 +3976,11 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
}
}

static int64_t& maxLength() {
static int64_t maxLength{0};
return maxLength;
}

void append(
const RowVectorPtr& vector,
const folly::Range<const vector_size_t*>& rows,
Expand Down
5 changes: 5 additions & 0 deletions velox/vector/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class IterativeVectorSerializer {
public:
virtual ~IterativeVectorSerializer() = default;

virtual int64_t totalLengthRecursive() {return 0;}
/// Serialize a subset of rows in a vector.
virtual void append(
const RowVectorPtr& vector,
Expand Down Expand Up @@ -268,6 +269,10 @@ class VectorStreamGroup : public StreamArena {
: StreamArena(pool),
serde_(serde != nullptr ? serde : getVectorSerde()) {}

int64_t totalLengthRecursive() {
return serializer_->totalLengthRecursive();
}

void createStreamTree(
RowTypePtr type,
int32_t numRows,
Expand Down

0 comments on commit 727ea5b

Please sign in to comment.