Skip to content

Commit

Permalink
Fix metrics for writes (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 authored Jun 17, 2020
1 parent 30d8237 commit b3da6a5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import org.apache.spark.util.SerializableConfiguration
* An extension of `DataWritingCommand` that allows columnar execution.
*/
trait GpuDataWritingCommand extends DataWritingCommand {
override lazy val metrics: Map[String, SQLMetric] = GpuWriteJobStatsTracker.metrics
lazy val basicMetrics: Map[String, SQLMetric] = GpuWriteJobStatsTracker.basicMetrics
lazy val taskMetrics: Map[String, SQLMetric] = GpuWriteJobStatsTracker.taskMetrics

override lazy val metrics: Map[String, SQLMetric] = basicMetrics ++ taskMetrics

override final def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] =
throw new UnsupportedOperationException(
Expand All @@ -44,7 +47,7 @@ trait GpuDataWritingCommand extends DataWritingCommand {
def gpuWriteJobStatsTracker(
hadoopConf: Configuration): GpuWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
GpuWriteJobStatsTracker(serializableHadoopConf)
GpuWriteJobStatsTracker(serializableHadoopConf, this)
}

def requireSingleBatch: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.apache.spark.sql.rapids

import com.nvidia.spark.rapids.GpuDataWritingCommand
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteTaskStats}
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -60,18 +61,17 @@ object GpuWriteJobStatsTracker {
val GPU_TIME_KEY = "gpuTime"
val WRITE_TIME_KEY = "writeTime"

lazy val basicMetrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
def basicMetrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics

lazy val taskMetrics: Map[String, SQLMetric] = {
def taskMetrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
GPU_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sparkContext, "GPU time"),
WRITE_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sparkContext, "write time")
)
}

def metrics: Map[String, SQLMetric] = basicMetrics ++ taskMetrics

def apply(serializableHadoopConf: SerializableConfiguration): GpuWriteJobStatsTracker =
new GpuWriteJobStatsTracker(serializableHadoopConf, basicMetrics, taskMetrics)
def apply(serializableHadoopConf: SerializableConfiguration,
command: GpuDataWritingCommand): GpuWriteJobStatsTracker =
new GpuWriteJobStatsTracker(serializableHadoopConf, command.basicMetrics, command.taskMetrics)
}

0 comments on commit b3da6a5

Please sign in to comment.