Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow metrics to be configurable by level #1671

Merged
merged 5 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Name | Description | Default Value
<a name="sql.improvedTimeOps.enabled"></a>spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false
<a name="sql.incompatibleDateFormats.enabled"></a>spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, setting this to true will force all parsing onto GPU even for formats that can result in incorrect results when parsing invalid inputs.|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.metrics.level"></a>spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.shims.spark300

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames._

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -28,9 +27,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -96,12 +93,15 @@ case class GpuBroadcastHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

override lazy val additionalMetrics: Map[String, SQLMetric] = Map(
"joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"),
"streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"),
"joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"),
"filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time"))
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand All @@ -125,17 +125,17 @@ case class GpuBroadcastHashJoinExec(
throw new IllegalStateException(
"GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES)
val totalTime = longMetric(TOTAL_TIME)
val streamTime = longMetric("streamTime")
val joinTime = longMetric("joinTime")
val filterTime = longMetric("filterTime")
val joinOutputRows = longMetric("joinOutputRows")
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.shims.spark300

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames._

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
Expand All @@ -27,9 +26,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, ShuffledHashJoinExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuJoinUtils {
Expand Down Expand Up @@ -94,14 +91,17 @@ case class GpuShuffledHashJoinExec(
right,
isSkewJoin)
with GpuHashJoin {

override lazy val additionalMetrics: Map[String, SQLMetric] = Map(
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "build side size"),
"buildTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "build time"),
"streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"),
"joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"),
"joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"),
"filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time"))
import GpuMetric._

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
Expand All @@ -117,15 +117,15 @@ case class GpuShuffledHashJoinExec(
}

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val buildDataSize = longMetric("buildDataSize")
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES)
val totalTime = longMetric(TOTAL_TIME)
val buildTime = longMetric("buildTime")
val streamTime = longMetric("streamTime")
val joinTime = longMetric("joinTime")
val filterTime = longMetric("filterTime")
val joinOutputRows = longMetric("joinOutputRows")
val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark301

import com.nvidia.spark.rapids.{BaseExprMeta, DataFromReplacementRule, GpuBindReferences, GpuBroadcastJoinMeta, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuOverrides, GpuProjectExec, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark300.GpuJoinUtils

import org.apache.spark.rdd.RDD
Expand All @@ -27,10 +27,8 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Dist
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down Expand Up @@ -93,12 +91,15 @@ case class GpuBroadcastHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

override lazy val additionalMetrics: Map[String, SQLMetric] = Map(
"joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"),
"streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"),
"joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"),
"filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time"))
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand All @@ -119,19 +120,20 @@ case class GpuBroadcastHashJoinExec(
}

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException("GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES)
val totalTime = longMetric(TOTAL_TIME)
val streamTime = longMetric("streamTime")
val joinTime = longMetric("joinTime")
val filterTime = longMetric("filterTime")
val joinOutputRows = longMetric("joinOutputRows")
throw new IllegalStateException(
"GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,7 +16,6 @@
package com.nvidia.spark.rapids.shims.spark301db

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -27,10 +26,8 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Dist
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastHashJoinMeta(
Expand Down Expand Up @@ -93,12 +90,15 @@ case class GpuBroadcastHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

override lazy val additionalMetrics: Map[String, SQLMetric] = Map(
"joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"),
"streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"),
"joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"),
"filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time"))
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand All @@ -119,19 +119,20 @@ case class GpuBroadcastHashJoinExec(
}

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException("GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES)
val totalTime = longMetric(TOTAL_TIME)
val streamTime = longMetric("streamTime")
val joinTime = longMetric("joinTime")
val filterTime = longMetric("filterTime")
val joinOutputRows = longMetric("joinOutputRows")
throw new IllegalStateException(
"GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

Expand Down
Loading