Skip to content

Commit

Permalink
Avoid redundant collection conversions (#2210)
Browse files Browse the repository at this point in the history
Contributes to #2109 
- unnecessary toSeq
- .size on Array or String causes conversion to SeqLike
- use mnemonic isEmpty/nonEmpty where possible
- substitute map for unzip

Signed-off-by: Gera Shegalov <gera@apache.org>
  • Loading branch information
gerashegalov authored Apr 21, 2021
1 parent 0a73d48 commit d0964b5
Show file tree
Hide file tree
Showing 23 changed files with 48 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -151,7 +151,7 @@ object DateUtils {
format.map(character => {
// We are checking to see if this char is a part of a previously read pattern
// or start of a new one.
if (sb.length == 0 || sb.charAt(sb.length - 1) == character) {
if (sb.isEmpty || sb.last == character) {
if (unsupportedCharacter(character)) {
throw TimestampFormatConversionException(s"Unsupported character: $character")
}
Expand All @@ -173,7 +173,7 @@ object DateUtils {
}
index = index + 1
})
if (sb.length > 0) {
if (sb.nonEmpty) {
val word = sb.toString()
if (unsupportedWordContextAware(word)) {
throw TimestampFormatConversionException(s"Unsupported word: $word")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,10 @@ object GpuCSVScan {
}
val tsFormat = parsedOptions.timestampFormat
val parts = tsFormat.split("'T'", 2)
if (parts.length == 0) {
if (parts.isEmpty) {
meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported")
}
if (parts.length > 0 && !supportedDateFormats.contains(parts(0))) {
if (parts.headOption.exists(h => !supportedDateFormats.contains(h))) {
meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported")
}
if (parts.length > 1 && !supportedTsPortionFormats.contains(parts(1))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class AcceleratedColumnarToRowIterator(

private[this] def loadNextBatch(): Unit = {
closeCurrentBatch()
if (!pendingCvs.isEmpty) {
if (pendingCvs.nonEmpty) {
setCurrentBatch(pendingCvs.dequeue())
} else {
while (batches.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,7 @@ object GpuDeviceManager extends Logging {
// loop multiple times to see if a GPU was released or something unexpected happened that
// we couldn't acquire on first try
var numRetries = 2
val addrsToTry = ArrayBuffer.empty ++= (0 to (deviceCount - 1))
val addrsToTry = ArrayBuffer.empty ++= (0 until deviceCount)
while (numRetries > 0) {
val addr = addrsToTry.find(tryToSetGpuDeviceAndAcquire)
if (addr.isDefined) {
Expand All @@ -103,7 +103,7 @@ object GpuDeviceManager extends Logging {
def getGPUAddrFromResources(resources: Map[String, ResourceInformation]): Option[Int] = {
if (resources.contains("gpu")) {
val addrs = resources("gpu").addresses
if (addrs.size > 1) {
if (addrs.length > 1) {
// Throw an exception since we assume one GPU per executor.
// If multiple GPUs are allocated by spark, then different tasks could get assigned
// different GPUs but RMM would only be initialized for 1. We could also just get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3034,7 +3034,7 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
if (!exp.equalsIgnoreCase("NONE")) {
wrap.tagForExplain()
val explain = wrap.explain(exp.equalsIgnoreCase("ALL"))
if (!explain.isEmpty) {
if (explain.nonEmpty) {
logWarning(s"\n$explain")
if (conf.optimizerExplain.equalsIgnoreCase("ALL") && optimizations.nonEmpty) {
logWarning(s"Cost-based optimizations applied:\n${optimizations.mkString("\n")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ abstract class FileParquetPartitionReaderBase(

val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala)
// check if there are cols with precision that can be stored in an int
val typeCastingNeeded = precisions.filter(p => p <= Decimal.MAX_INT_DIGITS).nonEmpty
val typeCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS)
if (readDataSchema.length > inputTable.getNumberOfColumns || typeCastingNeeded) {
// Spark+Parquet schema evolution is relatively simple with only adding/removing columns
// To type casting or anyting like that
Expand Down Expand Up @@ -890,7 +890,7 @@ object MultiFileThreadPoolFactory {
private def initThreadPool(
maxThreads: Int = 20,
keepAliveSeconds: Long = 60): ThreadPoolExecutor = synchronized {
if (!threadPool.isDefined) {
if (threadPool.isEmpty) {
val threadFactory = new ThreadFactoryBuilder()
.setNameFormat("parquet reader worker-%d")
.setDaemon(true)
Expand Down Expand Up @@ -1084,13 +1084,13 @@ class MultiFileParquetPartitionReader(
private def buildAndConcatPartitionColumns(
rowsPerPartition: Array[Long],
inPartitionValues: Array[InternalRow]): Array[GpuColumnVector] = {
val numCols = partitionSchema.fields.size
val numCols = partitionSchema.fields.length
val allPartCols = new Array[GpuColumnVector](numCols)
// build the partitions vectors for all partitions within each column
// and concatenate those together then go to the next column
for ((field, colIndex) <- partitionSchema.fields.zipWithIndex) {
val dataType = field.dataType
withResource(new Array[GpuColumnVector](inPartitionValues.size)) {
withResource(new Array[GpuColumnVector](inPartitionValues.length)) {
partitionColumns =>
for ((rowsInPart, partIndex) <- rowsPerPartition.zipWithIndex) {
val partInternalRow = inPartitionValues(partIndex)
Expand Down Expand Up @@ -1138,10 +1138,10 @@ class MultiFileParquetPartitionReader(
inPartitionValues: Array[InternalRow],
rowsPerPartition: Array[Long],
partitionSchema: StructType): Option[ColumnarBatch] = {
assert(rowsPerPartition.size == inPartitionValues.size)
assert(rowsPerPartition.length == inPartitionValues.length)
if (partitionSchema.nonEmpty) {
batch.map { cb =>
val numPartitions = inPartitionValues.size
val numPartitions = inPartitionValues.length
if (numPartitions > 1) {
concatAndAddPartitionColsToBatch(cb, rowsPerPartition, inPartitionValues)
} else {
Expand Down Expand Up @@ -1291,7 +1291,7 @@ class MultiFileParquetPartitionReader(
blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName)
val schemaCurrentfile =
currentClippedSchema.asGroupType().getFields.asScala.map(_.getName)
if (!schemaNextfile.sameElements(schemaCurrentfile)) {
if (!(schemaNextfile == schemaCurrentfile)) {
logInfo(s"File schema for the next file ${blockIterator.head.filePath}" +
s" doesn't match current $currentFile, splitting it into another batch!")
return
Expand Down Expand Up @@ -1409,7 +1409,7 @@ class MultiFileCloudParquetPartitionReader(
val hostBuffers = new ArrayBuffer[(HostMemoryBuffer, Long)]
try {
val fileBlockMeta = filterHandler.filterBlocks(file, conf, filters, readDataSchema)
if (fileBlockMeta.blocks.length == 0) {
if (fileBlockMeta.blocks.isEmpty) {
val bytesRead = fileSystemBytesRead() - startingBytesRead
// no blocks so return null buffer and size 0
return HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode,
Expand Down Expand Up @@ -1503,7 +1503,7 @@ class MultiFileCloudParquetPartitionReader(
}

private def addNextTaskIfNeeded(): Unit = {
if (tasksToRun.size > 0 && !isDone) {
if (tasksToRun.nonEmpty && !isDone) {
val runner = tasksToRun.dequeue()
tasks.add(MultiFileThreadPoolFactory.submitToThreadPool(runner, numThreads))
}
Expand Down Expand Up @@ -1550,7 +1550,7 @@ class MultiFileCloudParquetPartitionReader(

// this shouldn't happen but if somehow the batch is None and we still
// have work left skip to the next file
if (!batch.isDefined && filesToRead > 0 && !isDone) {
if (batch.isEmpty && filesToRead > 0 && !isDone) {
next()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal)
// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
if (output.length > 0 && output.length < 100 &&
if ((1 until 100).contains(output.length) &&
CudfRowTransitions.areAllSupported(output)) {
val localOutput = output
rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object ConfHelper {

def byteFromString(str: String, unit: ByteUnit): Long = {
val (input, multiplier) =
if (str.length() > 0 && str.charAt(0) == '-') {
if (str.nonEmpty && str.head == '-') {
(str.substring(1), -1)
} else {
(str, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,9 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT,
private def fixUpExchangeOverhead(): Unit = {
childPlans.foreach(_.fixUpExchangeOverhead())
if (wrapped.isInstanceOf[ShuffleExchangeExec] &&
childPlans.filter(_.canThisBeReplaced).isEmpty &&
!childPlans.exists(_.canThisBeReplaced) &&
(plan.conf.adaptiveExecutionEnabled ||
parent.filter(_.canThisBeReplaced).isEmpty)) {
!parent.exists(_.canThisBeReplaced))) {
willNotWorkOnGpu("Columnar exchange without columnar children is inefficient")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class RapidsShuffleHeartbeatManager extends Logging {

executors.append(ExecutorRegistration(id, System.nanoTime))

lastRegistrationSeen.put(id, new MutableLong(allExecutors.size))
lastRegistrationSeen.put(id, new MutableLong(allExecutors.length))
RapidsExecutorUpdateMsg(allExecutors)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,7 +153,7 @@ class RapidsShuffleIterator(
case class BlockIdMapIndex(id: ShuffleBlockBatchId, mapIndex: Int)

def start(): Unit = {
logInfo(s"Fetching ${blocksByAddress.size} blocks.")
logInfo(s"Fetching ${blocksByAddress.length} blocks.")

// issue local fetches first
val (local, remote) = blocksByAddress.partition(ba => ba._1.host == localHost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object GpuPartitioningUtils {
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)

// filter out non-data path and get unique leaf dirs of inputFiles
val leafDirs: Seq[Path] = leafFiles.filter(isDataPath).map(_.getParent).toSet.toSeq
val leafDirs: Seq[Path] = leafFiles.filter(isDataPath).map(_.getParent).distinct

val basePathOption = parameters.get(BASE_PATH_PARAM).map(file => {
// need to replace the base path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ case class GpuAggregateInPandasExec(
// Schema of input rows to the python runner
val aggInputSchema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
StructField(s"_$i", dt)
}.toSeq)
})

// Map grouped rows to ArrowPythonRunner results, Only execute if partition is not empty
inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output)
val prunedProj = UnsafeProjection.create(allInputs, child.output)

val grouped = if (groupingExpressions.isEmpty) {
// Use an empty unsafe row as a place holder for the grouping key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ case class GpuFileSourceScanExec(
}

/** SQL metrics generated only for scans using dynamic partition pruning. */
private lazy val staticMetrics = if (partitionFilters.filter(isDynamicPruningFilter).nonEmpty) {
private lazy val staticMetrics = if (partitionFilters.exists(isDynamicPruningFilter)) {
Map("staticFilesNum" -> createMetric(ESSENTIAL_LEVEL, "static number of files read"),
"staticFilesSize" -> createSizeMetric(ESSENTIAL_LEVEL, "static size of files read"))
} else {
Expand All @@ -331,7 +331,7 @@ case class GpuFileSourceScanExec(
static: Boolean): Unit = {
val filesNum = partitions.map(_.files.size.toLong).sum
val filesSize = ShimLoader.getSparkShims.getPartitionFileStatusSize(partitions)
if (!static || partitionFilters.filter(isDynamicPruningFilter).isEmpty) {
if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
driverMetrics("numFiles") = filesNum
driverMetrics("filesSize") = filesSize
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ trait OrcFiltersBase {
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.mapValues(_.head._2)
CaseInsensitiveMap(dedupPrimitiveFields.toMap)
CaseInsensitiveMap(dedupPrimitiveFields)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
// verify nothing ran on the gpu
if (gpuPlan.conf.getAllConfs(RapidsConf.SQL_ENABLED.key).toBoolean) {
val execNode = gpuPlan.find(_.isInstanceOf[GpuHashAggregateExec])
assert(!execNode.isDefined)
assert(execNode.isEmpty)
}
}}

Expand All @@ -1637,7 +1637,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite {
// verify nothing ran on the gpu
if (gpuPlan.conf.getAllConfs(RapidsConf.SQL_ENABLED.key).toBoolean) {
val execNode = gpuPlan.find(_.isInstanceOf[GpuHashAggregateExec])
assert(!execNode.isDefined)
assert(execNode.isEmpty)
}
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ImplicitsTestSuite extends FlatSpec with Matchers {
} catch {
case t: Throwable => {
threw = true
assert(t.getSuppressed().size == 5)
assert(t.getSuppressed().length == 5)
}
}
assert(threw)
Expand Down Expand Up @@ -137,7 +137,7 @@ class ImplicitsTestSuite extends FlatSpec with Matchers {
} catch {
case t: Throwable => {
threw = true
assert(t.getSuppressed().size == 5)
assert(t.getSuppressed().length == 5)
}
}
batch.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RapidsShuffleHeartbeatManagerTest extends RapidsShuffleTestHelper {
val hbMgr = new RapidsShuffleHeartbeatManager
val updateMsg = hbMgr.registerExecutor(
TrampolineUtil.newBlockManagerId("1", "peer", 123, Some("rapids=123")))
assertResult(0)(updateMsg.ids.size)
assertResult(0)(updateMsg.ids.length)
}

test("a heartbeat picks up new executors") {
Expand All @@ -45,7 +45,7 @@ class RapidsShuffleHeartbeatManagerTest extends RapidsShuffleTestHelper {

match {
case RapidsExecutorUpdateMsg(idAndExecutorData) =>
assertResult(1)(idAndExecutorData.size)
assertResult(1)(idAndExecutorData.length)
val peerBlockManager = idAndExecutorData.head
assertResult(exec2)(peerBlockManager)
}
Expand All @@ -69,11 +69,11 @@ class RapidsShuffleHeartbeatManagerTest extends RapidsShuffleTestHelper {

// second heartbeat from exec1, should be empty
val secondUpdate = hbMgr.executorHeartbeat(exec1)
assertResult(0)(secondUpdate.ids.size)
assertResult(0)(secondUpdate.ids.length)

// first heartbeat from exec2 returns empty, it already knew about exec1 on registration
val firstUpdateExec2 = hbMgr.executorHeartbeat(exec2)
assertResult(0)(firstUpdateExec2.ids.size)
assertResult(0)(firstUpdateExec2.ids.length)
}

test("an executor tells heartbeat handler about new peers") {
Expand Down
6 changes: 3 additions & 3 deletions udf-compiler/src/main/scala/com/nvidia/spark/udf/CFG.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -203,7 +203,7 @@ object CFG {
} :+ default
collectLabelsAndEdges(
codeIterator, constPool,
labels ++ table.unzip._2,
labels ++ table.map(_._2),
edges + (offset -> table))
case Opcode.LOOKUPSWITCH =>
val defaultOffset = (offset + 4) / 4 * 4
Expand All @@ -217,7 +217,7 @@ object CFG {
} :+ default
collectLabelsAndEdges(
codeIterator, constPool,
labels ++ table.unzip._2,
labels ++ table.map(_._2),
edges + (offset -> table))
case Opcode.GOTO | Opcode.GOTO_W =>
// goto statements have a single address target, we must go there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ case class CatalystExpressionBuilder(private val function: AnyRef) extends Loggi
}
}

if (compiled == None) {
if (compiled.isEmpty) {
logDebug(s"[CatalystExpressionBuilder] failed to compile")
} else {
logDebug(s"[CatalystExpressionBuilder] compiled expression: ${compiled.get.toString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend
newState
}
} else if (declaringClassName.equals("scala.collection.mutable.ArrayBuffer") ||
((!args.isEmpty && args.head.isInstanceOf[Repr.ArrayBuffer]) &&
((args.nonEmpty && args.head.isInstanceOf[Repr.ArrayBuffer]) &&
((declaringClassName.equals("scala.collection.AbstractSeq") &&
opcode == Opcode.INVOKEVIRTUAL) ||
(declaringClassName.equals("scala.collection.TraversableOnce") &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class LogicalPlanRules() extends Rule[LogicalPlan] with Logging {
exp
} else {
try {
if (exp.children != null && !exp.children.exists(x => x == null)) {
if (exp.children != null && !exp.children.contains(null)) {
exp.withNewChildren(exp.children.map(c => {
if (c != null && c.isInstanceOf[Expression]) {
attemptToReplaceExpression(plan, c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2313,7 +2313,7 @@ class OpcodeSuite extends FunSuite {

test("Conditional array buffer processing") {
def cond(s: String): Boolean = {
s == null || s.trim.length == 0
s == null || s.trim.isEmpty
}

def transform(str: String): String = {
Expand Down

0 comments on commit d0964b5

Please sign in to comment.