diff --git a/velox/connectors/hive/HiveConnectorSplit.cpp b/velox/connectors/hive/HiveConnectorSplit.cpp index ed69eb7d6303..e05411da9424 100644 --- a/velox/connectors/hive/HiveConnectorSplit.cpp +++ b/velox/connectors/hive/HiveConnectorSplit.cpp @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const { customSplitInfoObj[key] = value; } obj["customSplitInfo"] = customSplitInfoObj; - obj["extraFileInfo"] = *extraFileInfo; + obj["extraFileInfo"] = + extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo); folly::dynamic serdeParametersObj = folly::dynamic::object; for (const auto& [key, value] : serdeParameters) { @@ -118,8 +119,9 @@ std::shared_ptr HiveConnectorSplit::create( customSplitInfo[key.asString()] = value.asString(); } - std::shared_ptr extraFileInfo = - std::make_shared(obj["extraFileInfo"].asString()); + std::shared_ptr extraFileInfo = obj["extraFileInfo"].isNull() + ? nullptr + : std::make_shared(obj["extraFileInfo"].asString()); std::unordered_map serdeParameters; for (const auto& [key, value] : obj["serdeParameters"].items()) { serdeParameters[key.asString()] = value.asString(); diff --git a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp index f44bf9348fe4..828547d74b4f 100644 --- a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase { ASSERT_EQ(value, clone->customSplitInfo.at(key)); } - ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + if (split.extraFileInfo != nullptr) { + ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + } else { + ASSERT_EQ(clone->extraFileInfo, nullptr); + } ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size()); for (const auto& [key, value] : split.serdeParameters) { ASSERT_EQ(value, clone->serdeParameters.at(key)); @@ -216,7 +220,7 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { FileProperties fileProperties{ .fileSize = 2048, .modificationTime = std::nullopt}; const auto properties = std::optional(fileProperties); - const auto split = HiveConnectorSplit( + const auto split1 = HiveConnectorSplit( connectorId, filePath, fileFormat, @@ -229,8 +233,24 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { serdeParameters, splitWeight, infoColumns, + properties); + testSerde(split1); + + const auto split2 = HiveConnectorSplit( + connectorId, + filePath, + fileFormat, + start, + length, + {}, + tableBucketNumber, + customSplitInfo, + nullptr, + {}, + splitWeight, + {}, std::nullopt); - testSerde(split); + testSerde(split2); } } // namespace diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 9972f46b3ecb..3574ddb6ff34 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -61,6 +61,7 @@ velox_add_library( QueryDataWriter.cpp QueryMetadataReader.cpp QueryMetadataWriter.cpp + QuerySplitTracer.cpp QueryTraceConfig.cpp QueryTraceScan.cpp QueryTraceUtil.cpp diff --git a/velox/exec/QueryDataWriter.cpp b/velox/exec/QueryDataWriter.cpp index d8e1ed2e64d5..86ae1bc6d2e9 100644 --- a/velox/exec/QueryDataWriter.cpp +++ b/velox/exec/QueryDataWriter.cpp @@ -26,15 +26,16 @@ namespace facebook::velox::exec::trace { QueryDataWriter::QueryDataWriter( - std::string path, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB) - : dirPath_(std::move(path)), - fs_(filesystems::getFileSystem(dirPath_, nullptr)), + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), pool_(pool), updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) { + VELOX_CHECK_NOT_NULL(fs_); dataFile_ = fs_->openFileForWrite( - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName)); + fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName)); VELOX_CHECK_NOT_NULL(dataFile_); } @@ -83,7 +84,7 @@ void QueryDataWriter::finish(bool limitExceeded) { void QueryDataWriter::writeSummary(bool limitExceeded) const { const auto summaryFilePath = - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName); + fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataSummaryFileName); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize(); diff --git a/velox/exec/QueryDataWriter.h b/velox/exec/QueryDataWriter.h index 8e6073dcd3b7..a126e1859cfa 100644 --- a/velox/exec/QueryDataWriter.h +++ b/velox/exec/QueryDataWriter.h @@ -30,11 +30,14 @@ namespace facebook::velox::exec::trace { class QueryDataWriter { public: explicit QueryDataWriter( - std::string path, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB); - /// Serializes rows and writes out each batch. + /// Serializes and writes out each batch, enabling us to replay the execution + /// with the same batch numbers and order. Each serialized batch is flushed + /// immediately, ensuring that the traced operator can be replayed even if a + /// crash occurs during execution. void write(const RowVectorPtr& rows); /// Closes the data file and writes out the data summary. @@ -49,7 +52,7 @@ class QueryDataWriter { // TODO: add more summaries such as number of rows etc. void writeSummary(bool limitExceeded = false) const; - const std::string dirPath_; + const std::string traceDir_; // TODO: make 'useLosslessTimestamp' configuerable. const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, diff --git a/velox/exec/QuerySplitTracer.cpp b/velox/exec/QuerySplitTracer.cpp new file mode 100644 index 000000000000..df467b943724 --- /dev/null +++ b/velox/exec/QuerySplitTracer.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/QuerySplitTracer.h" +#include "QueryTraceUtil.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/QueryTraceTraits.h" + +using namespace facebook::velox::connector::hive; + +namespace facebook::velox::exec::trace { +/// Used to record and load the input splits from a tracing 'TableScan' +/// operator, and for getting the traced splits when relaying 'TableScan'. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +QuerySplitTracer::QuerySplitTracer(std::string traceDir) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)) { + VELOX_CHECK_NOT_NULL(fs_); +} + +void QuerySplitTracer::write(const exec::Split& split) { + VELOX_CHECK(!split.hasGroup()); + VELOX_CHECK(split.hasConnectorSplit()); + const auto splitInfoFile = fs_->openFileForWrite(fmt::format( + "{}/trace{}.{}", + traceDir_, + QueryTraceTraits::kSplitInfoFileSuffix, + fileId_)); + const auto splitObj = split.connectorSplit->serialize(); + const auto splitJson = folly::toJson(splitObj); + splitInfoFile->append(splitJson); + splitInfoFile->flush(); + splitInfoFile->close(); + ++fileId_; +} + +// static +int32_t QuerySplitTracer::extractFileIndex(const std::string& str) { + std::string capturedStr; + if (!RE2::FullMatch(str, kFileRegExp, &capturedStr)) { + return -1; + } + return std::stoul(capturedStr); +} + +std::vector QuerySplitTracer::read() const { + std::vector splits; + std::map fileMap; + for (const auto& filePath : fs_->list(traceDir_)) { + const auto index = extractFileIndex(filePath); + if (index == -1) { + continue; + } + fileMap[index] = filePath; + } + + for (const auto& [_, filePath] : fileMap) { + const auto splitInfoFilePath = fs_->openFileForRead(filePath); + folly::dynamic splitInfoObj = getMetadata(filePath, fs_); + const auto split = + ISerializable::deserialize(splitInfoObj); + splits.emplace_back( + std::make_shared( + split->connectorId, + split->filePath, + split->fileFormat, + split->start, + split->length, + split->partitionKeys, + split->tableBucketNumber, + split->customSplitInfo, + split->extraFileInfo, + split->serdeParameters, + split->splitWeight, + split->infoColumns, + split->properties), + -1); + } + return splits; +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QuerySplitTracer.h b/velox/exec/QuerySplitTracer.h new file mode 100644 index 000000000000..9fbf226c1884 --- /dev/null +++ b/velox/exec/QuerySplitTracer.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" + +#include + +namespace facebook::velox::exec::trace { + +class QuerySplitTracer { + public: + explicit QuerySplitTracer(std::string traceDir); + + /// Serializes and writes out each split. Each serialized split is immediately + /// flushed to a separate file to ensure that we can still replay a traced + /// operator even if a crash occurs during execution. + void write(const exec::Split& split); + + /// Lists the split info files and deserializes the splits. The splits are + /// sorted by the file index generated during the tracing process, allowing us + /// to replay the execution in the same order as the original split + /// processing. + std::vector read() const; + + private: + static int32_t extractFileIndex(const std::string& str); + + const std::string traceDir_; + const std::shared_ptr fs_; + int32_t fileId_{0}; + + static inline RE2 kFileRegExp{std::string(R"(.+\.split\.(\d+)$)")}; +}; +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/QueryTraceTraits.h index ad817115b490..dcf851eb726f 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/QueryTraceTraits.h @@ -31,5 +31,6 @@ struct QueryTraceTraits { static inline const std::string kQueryMetaFileName = "query_meta.json"; static inline const std::string kDataSummaryFileName = "data_summary.json"; static inline const std::string kDataFileName = "trace.data"; + static inline const std::string kSplitInfoFileSuffix = ".split"; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/QueryTraceTest.cpp index 0f7671672214..3bec5e6f8655 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/QueryTraceTest.cpp @@ -20,11 +20,13 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/QueryDataReader.h" #include "velox/exec/QueryDataWriter.h" #include "velox/exec/QueryMetadataReader.h" #include "velox/exec/QueryMetadataWriter.h" +#include "velox/exec/QuerySplitTracer.h" #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -50,6 +52,7 @@ class QueryTracerTest : public HiveConnectorTestBase { connector::hive::LocationHandle::registerSerDe(); connector::hive::HiveColumnHandle::registerSerDe(); connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); core::PlanNode::registerSerDe(); core::ITypedExpr::registerSerDe(); registerPartitionFunctionSerDe(); @@ -247,6 +250,47 @@ TEST_F(QueryTracerTest, traceMetadata) { } } +TEST_F(QueryTracerTest, traceSplit) { + const auto numSplits = 13; + std::vector splits; + for (int i = 0; i < numSplits; ++i) { + auto builder = HiveConnectorSplitBuilder(fmt::format("path-{}-{}", i, i)); + const auto key = fmt::format("k{}", i); + const auto value = fmt::format("v{}", i); + splits.emplace_back( + builder.start(i) + .length(i) + .connectorId(fmt::format("{}", i)) + .fileFormat(dwio::common::FileFormat(i + 1)) + .infoColumn(key, value) + .partitionKey( + key, i > 1 ? std::nullopt : std::optional(value)) + .tableBucketNumber(i) + .build(), + -1); + } + + const auto traceDir = TempDirectoryPath::create(); + auto writer = exec::trace::QuerySplitTracer(traceDir->getPath()); + for (int i = 0; i < numSplits; ++i) { + writer.write(splits.at(i)); + } + const auto fs = filesystems::getFileSystem(traceDir->getPath(), nullptr); + const auto splitInfoFiles = fs->list(traceDir->getPath()); + ASSERT_EQ(splitInfoFiles.size(), numSplits); + + const auto reader = exec::trace::QuerySplitTracer(traceDir->getPath()); + auto actualSplits = reader.read(); + for (int i = 0; i < numSplits; ++i) { + ASSERT_FALSE(actualSplits[i].hasGroup()); + ASSERT_TRUE(actualSplits[i].hasConnectorSplit()); + const auto actualConnectorSplit = actualSplits[i].connectorSplit; + const auto expectedConnectorSplit = splits[i].connectorSplit; + ASSERT_EQ( + actualConnectorSplit->toString(), expectedConnectorSplit->toString()); + } +} + TEST_F(QueryTracerTest, task) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"},