diff --git a/velox/exec/OutputBuffer.h b/velox/exec/OutputBuffer.h index ff20834f7dd1..60554051f26a 100644 --- a/velox/exec/OutputBuffer.h +++ b/velox/exec/OutputBuffer.h @@ -306,8 +306,8 @@ class OutputBuffer { DataAvailableCallback notify, DataConsumerActiveCheckCallback activeCheck); - // Continues any possibly waiting producers. Called when the - // producer task has an error or cancellation. + /// Continues any possibly waiting producers. Called when the producer task + /// has an error or cancellation. void terminate(); std::string toString(); diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 30da04c6e40c..847532cd554e 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -51,9 +51,9 @@ class OutputBufferManager { /// Returns true if the buffer exists for a given taskId, else returns false. bool updateNumDrivers(const std::string& taskId, uint32_t newNumDrivers); - // Adds data to the outgoing queue for 'destination'. 'data' must not be - // nullptr. 'data' is always added but if the buffers are full the future is - // set to a ContinueFuture that will be realized when there is space. + /// Adds data to the outgoing queue for 'destination'. 'data' must not be + /// nullptr. 'data' is always added but if the buffers are full the future is + /// set to a ContinueFuture that will be realized when there is space. bool enqueue( const std::string& taskId, int destination, @@ -62,12 +62,12 @@ class OutputBufferManager { void noMoreData(const std::string& taskId); - // Returns true if noMoreData has been called and all the accumulated data - // have been fetched and acknowledged. + /// Returns true if noMoreData has been called and all the accumulated data + /// have been fetched and acknowledged. bool isFinished(const std::string& taskId); - // Removes data with sequence number < 'sequence' from the queue for - // 'destination_'. + /// Removes data with sequence number < 'sequence' from the queue for + /// 'destination_'. void acknowledge(const std::string& taskId, int destination, int64_t sequence); diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index d86563a29d11..c1806d1bcec6 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -232,12 +232,11 @@ void PartitionedOutput::estimateRowSizes() { } void PartitionedOutput::addInput(RowVectorPtr input) { - initializeInput(std::move(input)); + traceInput(input); + initializeInput(std::move(input)); initializeDestinations(); - initializeSizeBuffers(); - estimateRowSizes(); for (auto& destination : destinations_) { diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 618a69c75720..3218b4ee30b0 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -velox_add_library(velox_query_trace_replayer_base OperatorReplayerBase.cpp - TableWriterReplayer.cpp AggregationReplayer.cpp) +velox_add_library( + velox_query_trace_replayer_base + AggregationReplayer.cpp + OperatorReplayerBase.cpp + PartitionedOutputReplayer.cpp + TableWriterReplayer.cpp) + velox_link_libraries( velox_query_trace_replayer_base velox_aggregates diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index ba37cd140769..b54ad323a447 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -34,19 +34,20 @@ OperatorReplayerBase::OperatorReplayerBase( std::string nodeId, int32_t pipelineId, std::string operatorType) - : rootDir_(std::move(rootDir)), - taskId_(std::move(taskId)), + : taskId_(std::move(taskId)), nodeId_(std::move(nodeId)), pipelineId_(pipelineId), - operatorType_(std::move(operatorType)) { + operatorType_(std::move(operatorType)), + rootDir_(std::move(rootDir)), + taskDir_(fmt::format("{}/{}", rootDir_, taskId_)), + nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)) { VELOX_USER_CHECK(!rootDir_.empty()); VELOX_USER_CHECK(!taskId_.empty()); VELOX_USER_CHECK(!nodeId_.empty()); VELOX_USER_CHECK_GE(pipelineId_, 0); VELOX_USER_CHECK(!operatorType_.empty()); - const auto traceTaskDir = fmt::format("{}/{}", rootDir_, taskId_); const auto metadataReader = exec::trace::QueryMetadataReader( - traceTaskDir, memory::MemoryManager::getInstance()->tracePool()); + taskDir_, memory::MemoryManager::getInstance()->tracePool()); metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_); queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; fs_ = filesystems::getFileSystem(rootDir_, nullptr); @@ -54,7 +55,7 @@ OperatorReplayerBase::OperatorReplayerBase( exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_); } -RowVectorPtr OperatorReplayerBase::run() const { +RowVectorPtr OperatorReplayerBase::run() { const auto restoredPlanNode = createPlan(); return exec::test::AssertQueryBuilder(restoredPlanNode) .maxDrivers(maxDrivers_) @@ -67,83 +68,17 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); - const auto traceDir = fmt::format("{}/{}", rootDir_, taskId_); return exec::test::PlanBuilder() - .traceScan( - fmt::format("{}/{}", traceDir, nodeId_), - exec::trace::getDataType(planFragment_, nodeId_)) - .addNode(addReplayNode(replayNode)) + .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) + .addNode(replayNodeFactory(replayNode)) .planNode(); } std::function -OperatorReplayerBase::addReplayNode(const core::PlanNode* node) const { +OperatorReplayerBase::replayNodeFactory(const core::PlanNode* node) const { return [=](const core::PlanNodeId& nodeId, const core::PlanNodePtr& source) -> core::PlanNodePtr { return createPlanNode(node, nodeId, source); }; } - -void OperatorReplayerBase::printSummary( - const std::string& rootDir, - const std::string& taskId, - bool shortSummary) { - const auto fs = filesystems::getFileSystem(rootDir, nullptr); - const auto taskIds = exec::trace::getTaskIds(rootDir, fs); - if (taskIds.empty()) { - LOG(ERROR) << "No traced query task under " << rootDir; - return; - } - - std::ostringstream summary; - summary << "\n++++++Query trace summary++++++\n"; - summary << "Number of tasks: " << taskIds.size() << "\n"; - summary << "Task ids: " << folly::join(",", taskIds); - - if (shortSummary) { - LOG(INFO) << summary.str(); - return; - } - - const auto summaryTaskIds = - taskId.empty() ? taskIds : std::vector{taskId}; - for (const auto& taskId : summaryTaskIds) { - summary << "\n++++++Query configs and plan of task " << taskId - << ":++++++\n"; - const auto traceTaskDir = fmt::format("{}/{}", rootDir, taskId); - const auto queryMetaFile = fmt::format( - "{}/{}", - traceTaskDir, - exec::trace::QueryTraceTraits::kQueryMetaFileName); - const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs); - const auto& configObj = - metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey]; - summary << "++++++Query configs++++++\n"; - summary << folly::toJson(configObj) << "\n"; - summary << "++++++Query plan++++++\n"; - const auto queryPlan = ISerializable::deserialize( - metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey], - memory::MemoryManager::getInstance()->tracePool()); - summary << queryPlan->toString(true, true); - } - LOG(INFO) << summary.str(); -} - -std::string OperatorReplayerBase::usage() { - std::ostringstream usage; - usage - << "++++++Query Trace Tool Usage++++++\n" - << "The following options are available:\n" - << "--usage: Show the usage\n" - << "--root: Root dir of the query tracing, it must be set\n" - << "--summary: Show the summary of the tracing including number of tasks" - << "and task ids. It also print the query metadata including" - << "query configs, connectors properties, and query plan in JSON format.\n" - << "--short_summary: Only show number of tasks and task ids.\n" - << "--pretty: Show the summary of the tracing in pretty JSON.\n" - << "--task_id: Specify the target task id, if empty, show the summary of " - << "all the traced query task.\n"; - return usage.str(); -} - } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 77f45ff41ca8..863a8f5f80f6 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -19,6 +19,10 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +namespace facebook::velox::exec { +class Task; +} + namespace facebook::velox::tool::trace { class OperatorReplayerBase { public: @@ -36,38 +40,34 @@ class OperatorReplayerBase { OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept = delete; - RowVectorPtr run() const; - - static void printSummary( - const std::string& rootDir, - const std::string& taskId, - bool shortSummary); - - static std::string usage(); + virtual RowVectorPtr run(); protected: - virtual core::PlanNodePtr createPlan() const; - - virtual std::function - addReplayNode(const core::PlanNode* node) const; - virtual core::PlanNodePtr createPlanNode( const core::PlanNode* node, const core::PlanNodeId& nodeId, const core::PlanNodePtr& source) const = 0; - const std::string rootDir_; + core::PlanNodePtr createPlan() const; + const std::string taskId_; const std::string nodeId_; const int32_t pipelineId_; const std::string operatorType_; + const std::string rootDir_; + const std::string taskDir_; + const std::string nodeDir_; + std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; core::PlanNodePtr planFragment_; std::shared_ptr fs_; int32_t maxDrivers_{1}; -}; + private: + std::function + replayNodeFactory(const core::PlanNode* node) const; +}; } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp new file mode 100644 index 000000000000..54910d1d4766 --- /dev/null +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/memory/Memory.h" +#include "velox/exec/PartitionedOutput.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/tool/trace/PartitionedOutputReplayer.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { +namespace { +std::shared_ptr createQueryContext( + const std::unordered_map& config, + folly::Executor* executor) { + return core::QueryCtx::create(executor, core::QueryConfig(std::move(config))); +} + +std::vector> getData( + const std::shared_ptr& bufferManager, + const std::string& taskId, + int destination, + int64_t sequence, + folly::Executor* executor) { + auto [promise, semiFuture] = + folly::makePromiseContract>>(); + VELOX_CHECK(bufferManager->getData( + taskId, + destination, + exec::PartitionedOutput::kMinDestinationSize, + sequence, + [result = std::make_shared< + folly::Promise>>>( + std::move(promise))]( + std::vector> pages, + int64_t /*inSequence*/, + std::vector /*remainingBytes*/) { + result->setValue(std::move(pages)); + })); + auto future = std::move(semiFuture).via(executor); + future.wait(std::chrono::seconds{10}); + VELOX_CHECK(future.isReady()); + return std::move(future).value(); +} +} // namespace + +void consumeAllData( + const std::shared_ptr& bufferManager, + const std::string& taskId, + uint32_t numPartitions, + folly::Executor* driverExecutor, + folly::ThreadPoolExecutor* consumerExecutor, + std::function)> consumer) { + std::vector consumerThreads; + std::vector sequences; + consumerThreads.reserve(numPartitions); + sequences.reserve(numPartitions); + sequences.resize(numPartitions); + for (uint32_t i = 0; i < numPartitions; i++) { + consumerExecutor->add([&, partition = i]() { + bool finished{false}; + while (!finished) { + std::vector> pages; + { + pages = getData( + bufferManager, + taskId, + partition, + sequences[partition], + driverExecutor); + } + for (auto& page : pages) { + if (page) { + consumer(partition, std::move(page)); + sequences[partition]++; + } else { + // Null page indicates this buffer is finished. + bufferManager->deleteResults(taskId, partition); + finished = true; + } + } + } + }); + } + consumerExecutor->join(); +} + +PartitionedOutputReplayer::PartitionedOutputReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType, + const ConsumerCallBack& consumerCb) + : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + originalNode_(dynamic_cast( + core::PlanNode::findFirstNode( + planFragment_.get(), + [this](const core::PlanNode* node) { + return node->id() == nodeId_; + }))), + consumerCb_(consumerCb) { + VELOX_CHECK_NOT_NULL(originalNode_); + consumerExecutor_ = std::make_unique( + originalNode_->numPartitions(), + std::make_shared("Consumer")); +} + +RowVectorPtr PartitionedOutputReplayer::run() { + auto task = Task::create( + "local://partitioned-output-replayer", + core::PlanFragment{createPlan()}, + 0, + createQueryContext(queryConfigs_, executor_.get()), + Task::ExecutionMode::kParallel); + task->start(maxDrivers_); + + consumeAllData( + bufferManager_, + task->taskId(), + originalNode_->numPartitions(), + executor_.get(), + consumerExecutor_.get(), + consumerCb_); + return nullptr; +} + +core::PlanNodePtr PartitionedOutputReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const { + auto originalNode = dynamic_cast(node); + VELOX_CHECK_NOT_NULL(originalNode); + return std::make_shared( + nodeId, + originalNode->kind(), + originalNode->keys(), + originalNode->numPartitions(), + originalNode->isReplicateNullsAndAny(), + originalNode->partitionFunctionSpecPtr(), + originalNode->outputType(), + source); +} + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h new file mode 100644 index 000000000000..76c31cbd34e1 --- /dev/null +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/OutputBufferManager.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { + +/// Concurrently gets all partitioned buffer content (vec) for every +/// partition. +void consumeAllData( + const std::shared_ptr& bufferManager, + const std::string& taskId, + uint32_t numPartitions, + folly::Executor* executor, + folly::ThreadPoolExecutor* consumerExecutor, + std::function)> consumer); + +/// The replayer to replay the traced 'PartitionedOutput' operator. +class PartitionedOutputReplayer final : public OperatorReplayerBase { + public: + using ConsumerCallBack = + std::function)>; + + PartitionedOutputReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType, + const ConsumerCallBack& consumerCb = [](auto partition, auto page) {}); + + RowVectorPtr run() override; + + private: + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const override; + + const core::PartitionedOutputNode* const originalNode_; + const std::shared_ptr bufferManager_{ + exec::OutputBufferManager::getInstance().lock()}; + const std::unique_ptr executor_{ + std::make_unique( + std::thread::hardware_concurrency(), + std::make_shared("Driver"))}; + const ConsumerCallBack& consumerCb_; + std::unique_ptr consumerExecutor_; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/QueryReplayer.cpp index 2067f2b543b7..4923b571e706 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/QueryReplayer.cpp @@ -27,19 +27,30 @@ #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" #include "velox/core/PlanNode.h" #include "velox/exec/PartitionFunction.h" +#include "velox/exec/QueryTraceUtil.h" #include "velox/tool/trace/AggregationReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" +#include "velox/tool/trace/PartitionedOutputReplayer.h" #include "velox/tool/trace/TableWriterReplayer.h" #include "velox/type/Type.h" -DEFINE_bool(usage, false, "Show the usage"); -DEFINE_string(root, "", "Root dir of the query tracing"); -DEFINE_bool(summary, false, "Show the summary of the tracing"); +DEFINE_string( + root_dir, + "", + "Root directory where the replayer is reading the traced data, it must be " + "set"); +DEFINE_bool( + summary, + false, + "Show the summary of the tracing including number of tasks and task ids. " + "It also print the query metadata including query configs, connectors " + "properties, and query plan in JSON format."); DEFINE_bool(short_summary, false, "Only show number of tasks and task ids"); DEFINE_string( task_id, "", - "Specify the target task id, if empty, show the summary of all the traced query task."); + "Specify the target task id, if empty, show the summary of all the traced " + "query task."); DEFINE_string(node_id, "", "Specify the target node id."); DEFINE_int32(pipeline_id, 0, "Specify the target pipeline id."); DEFINE_string(operator_type, "", "Specify the target operator type."); @@ -65,6 +76,7 @@ void init() { Type::registerSerDe(); core::PlanNode::registerSerDe(); core::ITypedExpr::registerSerDe(); + common::Filter::registerSerDe(); exec::registerPartitionFunctionSerDe(); connector::hive::HiveTableHandle::registerSerDe(); connector::hive::LocationHandle::registerSerDe(); @@ -86,62 +98,107 @@ void init() { connector::registerConnector(hiveConnector); } -std::unique_ptr createReplayer( - const std::string& operatorType) { - std::unique_ptr replayer = nullptr; - if (operatorType == "TableWriter") { +std::unique_ptr createReplayer() { + std::unique_ptr replayer; + if (FLAGS_operator_type == "TableWriter") { replayer = std::make_unique( - FLAGS_root, + FLAGS_root_dir, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, FLAGS_operator_type, FLAGS_table_writer_output_dir); - } else if (operatorType == "Aggregation") { + } else if (FLAGS_operator_type == "Aggregation") { replayer = std::make_unique( - FLAGS_root, + FLAGS_root_dir, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type); + } else if (FLAGS_operator_type == "PartitionedOutput") { + replayer = std::make_unique( + FLAGS_root_dir, FLAGS_task_id, FLAGS_node_id, FLAGS_pipeline_id, FLAGS_operator_type); } else { - VELOX_FAIL("Unsupported opeartor type: {}", FLAGS_operator_type); + VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type); } VELOX_USER_CHECK_NOT_NULL(replayer); return replayer; } -} // namespace -int main(int argc, char** argv) { - if (argc == 1) { - LOG(ERROR) << "\n" << tool::trace::OperatorReplayerBase::usage(); - return 1; +void printSummary( + const std::string& rootDir, + const std::string& taskId, + bool shortSummary) { + const auto fs = filesystems::getFileSystem(rootDir, nullptr); + const auto taskIds = exec::trace::getTaskIds(rootDir, fs); + if (taskIds.empty()) { + LOG(ERROR) << "No traced query task under " << rootDir; + return; } - gflags::ParseCommandLineFlags(&argc, &argv, true); - if (FLAGS_usage) { - LOG(INFO) << "\n" << tool::trace::OperatorReplayerBase::usage(); - return 0; - } + std::ostringstream summary; + summary << "\n++++++Query trace summary++++++\n"; + summary << "Number of tasks: " << taskIds.size() << "\n"; + summary << "Task ids: " << folly::join(",", taskIds); - if (FLAGS_root.empty()) { - LOG(ERROR) << "Root dir is not provided!\n" - << tool::trace::OperatorReplayerBase::usage(); - return 1; + if (shortSummary) { + LOG(INFO) << summary.str(); + return; } - init(); - if (FLAGS_summary || FLAGS_short_summary) { - tool::trace::OperatorReplayerBase::printSummary( - FLAGS_root, FLAGS_task_id, FLAGS_short_summary); - return 0; + const auto summaryTaskIds = + taskId.empty() ? taskIds : std::vector{taskId}; + for (const auto& taskId : summaryTaskIds) { + summary << "\n++++++Query configs and plan of task " << taskId + << ":++++++\n"; + const auto traceTaskDir = fmt::format("{}/{}", rootDir, taskId); + const auto queryMetaFile = fmt::format( + "{}/{}", + traceTaskDir, + exec::trace::QueryTraceTraits::kQueryMetaFileName); + const auto metaObj = exec::trace::getMetadata(queryMetaFile, fs); + const auto& configObj = + metaObj[exec::trace::QueryTraceTraits::kQueryConfigKey]; + summary << "++++++Query configs++++++\n"; + summary << folly::toJson(configObj) << "\n"; + summary << "++++++Query plan++++++\n"; + const auto queryPlan = ISerializable::deserialize( + metaObj[exec::trace::QueryTraceTraits::kPlanNodeKey], + memory::MemoryManager::getInstance()->tracePool()); + summary << queryPlan->toString(true, true); } + LOG(INFO) << summary.str(); +} +} // namespace + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); - const auto replayer = createReplayer(FLAGS_operator_type); - VELOX_USER_CHECK_NOT_NULL( - replayer, "Unsupported opeartor type: {}", FLAGS_operator_type); + if (argc == 1) { + gflags::ShowUsageWithFlags(argv[0]); + return -1; + } + if (FLAGS_root_dir.empty()) { + gflags::SetUsageMessage("--root_dir must be provided."); + gflags::ShowUsageWithFlags(argv[0]); + return -1; + } - replayer->run(); + try { + init(); + if (FLAGS_summary || FLAGS_short_summary) { + printSummary(FLAGS_root_dir, FLAGS_task_id, FLAGS_short_summary); + return 0; + } + createReplayer()->run(); + } catch (const VeloxException& e) { + LOG(ERROR) << e.what(); + return -1; + } return 0; } diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index fb827efef05c..3543011aed9f 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -14,20 +14,18 @@ * limitations under the License. */ -#include "velox/tool/trace/TableWriterReplayer.h" -#include "velox/exec/QueryDataReader.h" +#include + #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" -#include "velox/exec/Task.h" -#include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/tool/trace/TableWriterReplayer.h" using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; namespace facebook::velox::tool::trace { - namespace { std::shared_ptr @@ -66,7 +64,6 @@ std::shared_ptr createInsertTableHanlde( return std::make_shared( connectorId, makeHiveInsertTableHandle(node, std::move(targetDir))); } - } // namespace core::PlanNodePtr TableWriterReplayer::createPlanNode( @@ -74,6 +71,7 @@ core::PlanNodePtr TableWriterReplayer::createPlanNode( const core::PlanNodeId& nodeId, const core::PlanNodePtr& source) const { const auto* tableWriterNode = dynamic_cast(node); + VELOX_CHECK_NOT_NULL(tableWriterNode); const auto insertTableHandle = createInsertTableHanlde("test-hive", tableWriterNode, replayOutputDir_); return std::make_shared( diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index e5c0b1578f46..c0c675ba6333 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -22,6 +22,7 @@ #include "velox/tool/trace/OperatorReplayerBase.h" namespace facebook::velox::tool::trace { + /// The replayer to replay the traced 'TableWriter' operator. class TableWriterReplayer final : public OperatorReplayerBase { public: diff --git a/velox/tool/trace/tests/AggregationReplayerTest.cpp b/velox/tool/trace/tests/AggregationReplayerTest.cpp index fa575de301bd..290a7c20acc0 100644 --- a/velox/tool/trace/tests/AggregationReplayerTest.cpp +++ b/velox/tool/trace/tests/AggregationReplayerTest.cpp @@ -198,9 +198,10 @@ TEST_F(AggregationReplayerTest, test) { .split(makeHiveConnectorSplit(sourceFilePath->getPath())) .copyResults(pool(), task); - const auto tableWriterReplayer = AggregationReplayer( - traceRoot, task->taskId(), traceNodeId_, 0, "TableWriter"); - const auto replayingResult = tableWriterReplayer.run(); + const auto replayingResult = + AggregationReplayer( + traceRoot, task->taskId(), traceNodeId_, 0, "TableWriter") + .run(); assertEqualResults({results}, {replayingResult}); } } diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index 5ed9db65a1b8..03d1705eebe7 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_tool_trace_test TableWriterReplayerTest.cpp - AggregationReplayerTest.cpp) +add_executable( + velox_tool_trace_test + AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp + TableWriterReplayerTest.cpp) add_test( NAME velox_tool_trace_test diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp new file mode 100644 index 000000000000..b5ab4015a0af --- /dev/null +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -0,0 +1,194 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "folly/dynamic.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/PartitionedOutput.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/PartitionedOutputReplayer.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace::test { +class PartitionedOutputReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + std::vector makeBatches( + vector_size_t numBatches, + std::function makeVector) { + std::vector batches; + batches.reserve(numBatches); + for (int32_t i = 0; i < numBatches; ++i) { + batches.push_back(makeVector(i)); + } + return batches; + } + + std::shared_ptr createQueryContext( + const std::unordered_map& config) { + return core::QueryCtx::create( + executor_.get(), core::QueryConfig(std::move(config))); + } + + const std::shared_ptr bufferManager_{ + exec::OutputBufferManager::getInstance().lock()}; +}; + +TEST_F(PartitionedOutputReplayerTest, basic) { + struct TestParam { + std::string testName; + uint32_t numPartitions; + RowVectorPtr input; + std::string debugString() { + return fmt::format( + "testName {}, numPartitions {}, input type {}", + testName, + numPartitions, + input->toString()); + } + }; + std::vector testParams = { + // 10 partitions, 1000 row vector[int, int] + {"small-dataset", + 10, + makeRowVector( + {"key", "value"}, + {makeFlatVector(1'000, [](auto row) { return row; }), + makeFlatVector( + 1'000, [](auto row) { return row * 2; }, nullEvery(7))})}, + // 4 partitions, 80'000 row vector[int, string] with each string being + // 1024 bytes size + {"large-dataset", + 4, + makeRowVector( + {"key", "value"}, + {makeFlatVector(80'000, [](auto row) { return row; }), + makeFlatVector( + 80'000, [](auto row) { return std::string(1024, 'x'); })})}}; + + for (auto& testParam : testParams) { + SCOPED_TRACE(testParam.debugString()); + std::string planNodeId; + auto plan = + PlanBuilder() + .values({testParam.input}, false) + .partitionedOutput( + {"key"}, testParam.numPartitions, false, {"key", "value"}) + .capturePlanNodeId(planNodeId) + .planNode(); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = + fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + auto originalTask = Task::create( + fmt::format( + "local://test-partitioned-output-replayer-basic-{}", + testParam.testName), + core::PlanFragment{plan}, + 0, + createQueryContext( + {{core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, traceRoot}, + {core::QueryConfig::kQueryTraceMaxBytes, + std::to_string(100UL << 30)}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}, + {core::QueryConfig::kQueryTraceNodeIds, planNodeId}, + {core::QueryConfig::kMaxPartitionedOutputBufferSize, + std::to_string(8UL << 20)}, + {core::QueryConfig::kMaxOutputBufferSize, + std::to_string(8UL << 20)}}), + Task::ExecutionMode::kParallel); + originalTask->start(1); + + std::vector>> + originalPartitionedResults; + originalPartitionedResults.reserve(testParam.numPartitions); + originalPartitionedResults.resize(testParam.numPartitions); + auto consumerExecutor = std::make_unique( + testParam.numPartitions, + std::make_shared("Consumer")); + consumeAllData( + bufferManager_, + originalTask->taskId(), + testParam.numPartitions, + executor_.get(), + consumerExecutor.get(), + [&](auto partition, auto page) { + originalPartitionedResults[partition].push_back(std::move(page)); + }); + + std::vector>> + replayedPartitionedResults; + replayedPartitionedResults.reserve(testParam.numPartitions); + replayedPartitionedResults.resize(testParam.numPartitions); + PartitionedOutputReplayer( + traceRoot, + originalTask->taskId(), + planNodeId, + 0, + "PartitionedOutput", + [&](auto partition, auto page) { + replayedPartitionedResults[partition].push_back(std::move(page)); + }) + .run(); + + ASSERT_EQ(replayedPartitionedResults.size(), testParam.numPartitions); + for (uint32_t partition = 0; partition < testParam.numPartitions; + partition++) { + const auto& originalBufList = originalPartitionedResults.at(partition); + const auto& replayedBufList = replayedPartitionedResults[partition]; + ASSERT_EQ(replayedBufList.size(), originalBufList.size()); + for (uint32_t i = 0; i < replayedBufList.size(); i++) { + ASSERT_EQ( + replayedBufList[i]->computeChainDataLength(), + originalBufList[i]->computeChainDataLength()); + } + } + } +} +} // namespace facebook::velox::tool::trace::test diff --git a/velox/tool/trace/tests/TableWriterReplayerTest.cpp b/velox/tool/trace/tests/TableWriterReplayerTest.cpp index 7bda92ec6213..e923ec9f7256 100644 --- a/velox/tool/trace/tests/TableWriterReplayerTest.cpp +++ b/velox/tool/trace/tests/TableWriterReplayerTest.cpp @@ -21,25 +21,19 @@ #include #include "folly/dynamic.h" -#include "folly/experimental/EventCount.h" #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" -#include "velox/common/testutil/TestValue.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/QueryDataReader.h" #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" -#include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/TableWriterReplayer.h" -#include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -268,14 +262,15 @@ TEST_F(TableWriterReplayerTest, basic) { .split(makeHiveConnectorSplit(sourceFilePath->getPath())) .copyResults(pool(), task); const auto traceOutputDir = TempDirectoryPath::create(); - const auto tableWriterReplayer = TableWriterReplayer( - traceRoot, - task->taskId(), - "1", - 0, - "TableWriter", - traceOutputDir->getPath()); - const auto result = tableWriterReplayer.run(); + const auto result = TableWriterReplayer( + traceRoot, + task->taskId(), + "1", + 0, + "TableWriter", + traceOutputDir->getPath()) + .run(); + // Second column contains details about written files. const auto details = results->childAt(TableWriteTraits::kFragmentChannel) ->as>(); @@ -392,14 +387,14 @@ TEST_F(TableWriterReplayerTest, partitionWrite) { rowType); const auto traceOutputDir = TempDirectoryPath::create(); - const auto tableWriterReplayer = TableWriterReplayer( + TableWriterReplayer( traceRoot, task->taskId(), tableWriteNodeId, 0, "TableWriter", - traceOutputDir->getPath()); - tableWriterReplayer.run(); + traceOutputDir->getPath()) + .run(); actualPartitionDirectories = getLeafSubdirectories(traceOutputDir->getPath()); checkWriteResults( actualPartitionDirectories,