Skip to content

Commit

Permalink
Add QuerySplitTracer to records and rebuild splits
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 9, 2024
1 parent 2a42006 commit cb32bbb
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 14 deletions.
8 changes: 5 additions & 3 deletions velox/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const {
customSplitInfoObj[key] = value;
}
obj["customSplitInfo"] = customSplitInfoObj;
obj["extraFileInfo"] = *extraFileInfo;
obj["extraFileInfo"] =
extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo);

folly::dynamic serdeParametersObj = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters) {
Expand Down Expand Up @@ -118,8 +119,9 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
customSplitInfo[key.asString()] = value.asString();
}

std::shared_ptr<std::string> extraFileInfo =
std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::shared_ptr<std::string> extraFileInfo = obj["extraFileInfo"].isNull()
? nullptr
: std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& [key, value] : obj["serdeParameters"].items()) {
serdeParameters[key.asString()] = value.asString();
Expand Down
26 changes: 23 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
ASSERT_EQ(value, clone->customSplitInfo.at(key));
}

ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
if (split.extraFileInfo != nullptr) {
ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
} else {
ASSERT_EQ(clone->extraFileInfo, nullptr);
}
ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size());
for (const auto& [key, value] : split.serdeParameters) {
ASSERT_EQ(value, clone->serdeParameters.at(key));
Expand Down Expand Up @@ -216,7 +220,7 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
FileProperties fileProperties{
.fileSize = 2048, .modificationTime = std::nullopt};
const auto properties = std::optional<FileProperties>(fileProperties);
const auto split = HiveConnectorSplit(
const auto split1 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
Expand All @@ -229,8 +233,24 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
serdeParameters,
splitWeight,
infoColumns,
properties);
testSerde(split1);

const auto split2 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
start,
length,
{},
tableBucketNumber,
customSplitInfo,
nullptr,
{},
splitWeight,
{},
std::nullopt);
testSerde(split);
testSerde(split2);
}

} // namespace
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ velox_add_library(
QueryDataWriter.cpp
QueryMetadataReader.cpp
QueryMetadataWriter.cpp
QuerySplitTracer.cpp
QueryTraceConfig.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp
Expand Down
11 changes: 6 additions & 5 deletions velox/exec/QueryDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
namespace facebook::velox::exec::trace {

QueryDataWriter::QueryDataWriter(
std::string path,
std::string traceDir,
memory::MemoryPool* pool,
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB)
: dirPath_(std::move(path)),
fs_(filesystems::getFileSystem(dirPath_, nullptr)),
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
pool_(pool),
updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) {
VELOX_CHECK_NOT_NULL(fs_);
dataFile_ = fs_->openFileForWrite(
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName));
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName));
VELOX_CHECK_NOT_NULL(dataFile_);
}

Expand Down Expand Up @@ -83,7 +84,7 @@ void QueryDataWriter::finish(bool limitExceeded) {

void QueryDataWriter::writeSummary(bool limitExceeded) const {
const auto summaryFilePath =
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName);
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataSummaryFileName);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize();
Expand Down
9 changes: 6 additions & 3 deletions velox/exec/QueryDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ namespace facebook::velox::exec::trace {
class QueryDataWriter {
public:
explicit QueryDataWriter(
std::string path,
std::string traceDir,
memory::MemoryPool* pool,
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB);

/// Serializes rows and writes out each batch.
/// Serializes and writes out each batch, enabling us to replay the execution
/// with the same batch numbers and order. Each serialized batch is flushed
/// immediately, ensuring that the traced operator can be replayed even if a
/// crash occurs during execution.
void write(const RowVectorPtr& rows);

/// Closes the data file and writes out the data summary.
Expand All @@ -49,7 +52,7 @@ class QueryDataWriter {
// TODO: add more summaries such as number of rows etc.
void writeSummary(bool limitExceeded = false) const;

const std::string dirPath_;
const std::string traceDir_;
// TODO: make 'useLosslessTimestamp' configuerable.
const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = {
true,
Expand Down
97 changes: 97 additions & 0 deletions velox/exec/QuerySplitTracer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/QuerySplitTracer.h"
#include "QueryTraceUtil.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/QueryTraceTraits.h"

using namespace facebook::velox::connector::hive;

namespace facebook::velox::exec::trace {
/// Used to record and load the input splits from a tracing 'TableScan'
/// operator, and for getting the traced splits when replaies 'TableScan'.
///
/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will
/// be extended to handle more types of splits, such as
/// 'IcebergHiveConnectorSplit'.
QuerySplitTracer::QuerySplitTracer(std::string traceDir)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)) {
VELOX_CHECK_NOT_NULL(fs_);
}

void QuerySplitTracer::write(const exec::Split& split) {
VELOX_CHECK(!split.hasGroup());
VELOX_CHECK(split.hasConnectorSplit());
const auto splitInfoFile = fs_->openFileForWrite(fmt::format(
"{}/trace{}.{}",
traceDir_,
QueryTraceTraits::kSplitInfoFileSuffix,
fileId_));
const auto splitObj = split.connectorSplit->serialize();
const auto splitJson = folly::toJson(splitObj);
splitInfoFile->append(splitJson);
splitInfoFile->flush();
splitInfoFile->close();
++fileId_;
}

// static
int32_t QuerySplitTracer::extractFileIndex(const std::string& str) {
std::string capturedStr;
if (!RE2::FullMatch(str, kFileRegExp, &capturedStr)) {
return -1;
}
return std::stoul(capturedStr);
}

std::vector<exec::Split> QuerySplitTracer::read() const {
std::vector<exec::Split> splits;
std::map<int32_t, std::string> fileMap;
for (const auto& filePath : fs_->list(traceDir_)) {
const auto index = extractFileIndex(filePath);
if (index == -1) {
continue;
}
fileMap[index] = filePath;
}

for (const auto& [_, filePath] : fileMap) {
const auto splitInfoFilePath = fs_->openFileForRead(filePath);
folly::dynamic splitInfoObj = getMetadata(filePath, fs_);
const auto split =
ISerializable::deserialize<HiveConnectorSplit>(splitInfoObj);
splits.emplace_back(
std::make_shared<HiveConnectorSplit>(
split->connectorId,
split->filePath,
split->fileFormat,
split->start,
split->length,
split->partitionKeys,
split->tableBucketNumber,
split->customSplitInfo,
split->extraFileInfo,
split->serdeParameters,
split->splitWeight,
split->infoColumns,
split->properties),
-1);
}
return splits;
}
} // namespace facebook::velox::exec::trace
50 changes: 50 additions & 0 deletions velox/exec/QuerySplitTracer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/common/file/FileSystems.h"
#include "velox/exec/Split.h"

#include <re2/re2.h>

namespace facebook::velox::exec::trace {

class QuerySplitTracer {
public:
explicit QuerySplitTracer(std::string traceDir);

/// Serializes and writes out each split. Each serialized split is immediately
/// flushed to a separate file to ensure that we can still replay a traced
/// operator even if a crash occurs during execution.
void write(const exec::Split& split);

/// Lists the split info files and deserializes the splits. The splits are
/// sorted by the file index generated during the tracing process, allowing us
/// to replay the execution in the same order as the original split
/// processing.
std::vector<exec::Split> read() const;

private:
static int32_t extractFileIndex(const std::string& str);

const std::string traceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
int32_t fileId_{0};

static inline RE2 kFileRegExp{std::string(R"(.+\.split\.(\d+)$)")};
};
} // namespace facebook::velox::exec::trace
1 change: 1 addition & 0 deletions velox/exec/QueryTraceTraits.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ struct QueryTraceTraits {
static inline const std::string kQueryMetaFileName = "query_meta.json";
static inline const std::string kDataSummaryFileName = "data_summary.json";
static inline const std::string kDataFileName = "trace.data";
static inline const std::string kSplitInfoFileSuffix = ".split";
};
} // namespace facebook::velox::exec::trace
44 changes: 44 additions & 0 deletions velox/exec/tests/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
#include <memory>

#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PartitionFunction.h"
#include "velox/exec/QueryDataReader.h"
#include "velox/exec/QueryDataWriter.h"
#include "velox/exec/QueryMetadataReader.h"
#include "velox/exec/QueryMetadataWriter.h"
#include "velox/exec/QuerySplitTracer.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
Expand All @@ -50,6 +52,7 @@ class QueryTracerTest : public HiveConnectorTestBase {
connector::hive::LocationHandle::registerSerDe();
connector::hive::HiveColumnHandle::registerSerDe();
connector::hive::HiveInsertTableHandle::registerSerDe();
connector::hive::HiveConnectorSplit::registerSerDe();
core::PlanNode::registerSerDe();
core::ITypedExpr::registerSerDe();
registerPartitionFunctionSerDe();
Expand Down Expand Up @@ -247,6 +250,47 @@ TEST_F(QueryTracerTest, traceMetadata) {
}
}

TEST_F(QueryTracerTest, traceSplit) {
const auto numSplits = 13;
std::vector<exec::Split> splits;
for (int i = 0; i < numSplits; ++i) {
auto builder = HiveConnectorSplitBuilder(fmt::format("path-{}-{}", i, i));
const auto key = fmt::format("k{}", i);
const auto value = fmt::format("v{}", i);
splits.emplace_back(
builder.start(i)
.length(i)
.connectorId(fmt::format("{}", i))
.fileFormat(dwio::common::FileFormat(i + 1))
.infoColumn(key, value)
.partitionKey(
key, i > 1 ? std::nullopt : std::optional<std::string>(value))
.tableBucketNumber(i)
.build(),
-1);
}

const auto traceDir = TempDirectoryPath::create();
auto writer = exec::trace::QuerySplitTracer(traceDir->getPath());
for (int i = 0; i < numSplits; ++i) {
writer.write(splits.at(i));
}
const auto fs = filesystems::getFileSystem(traceDir->getPath(), nullptr);
const auto splitInfoFiles = fs->list(traceDir->getPath());
ASSERT_EQ(splitInfoFiles.size(), numSplits);

const auto reader = exec::trace::QuerySplitTracer(traceDir->getPath());
auto actualSplits = reader.read();
for (int i = 0; i < numSplits; ++i) {
ASSERT_FALSE(actualSplits[i].hasGroup());
ASSERT_TRUE(actualSplits[i].hasConnectorSplit());
const auto actualConnectorSplit = actualSplits[i].connectorSplit;
const auto expectedConnectorSplit = splits[i].connectorSplit;
ASSERT_EQ(
actualConnectorSplit->toString(), expectedConnectorSplit->toString());
}
}

TEST_F(QueryTracerTest, task) {
const auto rowType =
ROW({"c0", "c1", "c2", "c3", "c4", "c5"},
Expand Down

0 comments on commit cb32bbb

Please sign in to comment.