Skip to content

Commit

Permalink
[SPARK-46933][SQL] Add query execution time metric to connectors whic…
Browse files Browse the repository at this point in the history
…h use JDBCRDD

### What changes were proposed in this pull request?
This pull request should add measuring query execution time on external JDBC data source.
Another change is changing access right for JDBCRDD class, that is needed for adding another metric (SQL text) which will be done in some next PR.

### Why are the changes needed?
Query execution time is very important metric to have

### Does this PR introduce _any_ user-facing change?
User can see query execution time on SparkPlan graph under node metrics tab

### How was this patch tested?
Tested using custom image

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44969 from urosstan-db/SPARK-46933-Add-scan-metrics-to-jdbc-connector.

Lead-authored-by: Uros Stankovic <uros.stankovic@databricks.com>
Co-authored-by: Uros Stankovic <155642965+urosstan-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Feb 1, 2024
1 parent e610d1d commit 22586ef
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ trait DataSourceScanExec extends LeafExecNode {
def inputRDDs(): Seq[RDD[InternalRow]]
}

object DataSourceScanExec {
val numOutputRowsKey = "numOutputRows"
}

/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
output: Seq[Attribute],
Expand All @@ -111,8 +115,17 @@ case class RowDataSourceScanExec(
tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with InputRDDCodegen {

override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override lazy val metrics: Map[String, SQLMetric] = {
val metrics = Map(
DataSourceScanExec.numOutputRowsKey ->
SQLMetrics.createMetric(sparkContext, "number of output rows")
)

rdd match {
case rddWithDSMetrics: DataSourceMetricsMixin => metrics ++ rddWithDSMetrics.getMetrics
case _ => metrics
}
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.execution.metric.SQLMetric

trait DataSourceMetricsMixin {
def getMetrics: Seq[(String, SQLMetric)]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.execution.datasources.DataSourceMetricsMixin
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types._
import org.apache.spark.util.CompletionIterator
Expand Down Expand Up @@ -157,7 +159,7 @@ object JDBCRDD extends Logging {
* Both the driver code and the workers must be able to access the database; the driver
* needs to fetch the schema while the workers need to fetch the data.
*/
private[jdbc] class JDBCRDD(
class JDBCRDD(
sc: SparkContext,
getConnection: Int => Connection,
schema: StructType,
Expand All @@ -171,7 +173,14 @@ private[jdbc] class JDBCRDD(
limit: Int,
sortOrders: Array[String],
offset: Int)
extends RDD[InternalRow](sc, Nil) {
extends RDD[InternalRow](sc, Nil) with DataSourceMetricsMixin {

/**
* Execution time of the query issued to JDBC connection
*/
val queryExecutionTimeMetric: SQLMetric = SQLMetrics.createNanoTimingMetric(
sparkContext,
name = "JDBC query execution time")

/**
* Retrieve the list of partitions corresponding to this RDD.
Expand Down Expand Up @@ -272,11 +281,24 @@ private[jdbc] class JDBCRDD(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
stmt.setQueryTimeout(options.queryTimeout)

val startTime = System.nanoTime
rs = stmt.executeQuery()
val endTime = System.nanoTime

val executionTime = endTime - startTime
queryExecutionTimeMetric.add(executionTime)

val rowsIterator =
JdbcUtils.resultSetToSparkInternalRows(rs, dialect, schema, inputMetrics)

CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())
}

override def getMetrics: Seq[(String, SQLMetric)] = {
Seq(
"queryExecutionTime" -> queryExecutionTimeMetric
)
}
}

0 comments on commit 22586ef

Please sign in to comment.