From 7e66f53f6af8efdfa4420193d6b9d713f069be4d Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 20 Apr 2021 18:34:06 -0700 Subject: [PATCH] Fix index-based access to the head elements (#2192) Signed-off-by: Gera Shegalov Fix simple collection operations. Contributes to #2109 - replace map with foreach when the return value is not used - forall where appropriate - access head elements without specifying indices. - simplify index iteration --- .../spark/rapids/StringFunctionSuite.scala | 10 +++--- .../spark300/GpuBroadcastHashJoinExec.scala | 9 +++-- .../spark300/GpuShuffledHashJoinExec.scala | 8 +++-- .../shims/spark300/GpuSortMergeJoinExec.scala | 5 +-- .../spark301/GpuBroadcastHashJoinExec.scala | 9 +++-- .../rapids/shims/spark301/Spark301Shims.scala | 4 +-- .../spark301db/GpuBroadcastHashJoinExec.scala | 9 +++-- .../spark301db/GpuShuffledHashJoinExec.scala | 8 +++-- .../spark301db/GpuSortMergeJoinExec.scala | 5 +-- .../spark311/GpuBroadcastHashJoinExec.scala | 9 +++-- .../spark311/GpuShuffledHashJoinExec.scala | 8 +++-- .../shims/spark311/GpuSortMergeJoinExec.scala | 5 +-- .../rapids/shims/spark311/Spark311Shims.scala | 6 ++-- .../com/nvidia/spark/rapids/DateUtils.scala | 2 +- .../nvidia/spark/rapids/GpuExpressions.scala | 31 +++++++++-------- .../nvidia/spark/rapids/GpuOverrides.scala | 34 +++++++++++-------- .../spark/rapids/GpuWindowExpression.scala | 16 ++++----- .../com/nvidia/spark/rapids/RapidsMeta.scala | 29 ++++++++-------- .../com/nvidia/spark/rapids/aggregate.scala | 4 +-- .../spark/rapids/conditionalExpressions.scala | 4 +-- .../spark/sql/rapids/GpuDataSource.scala | 6 ++-- .../execution/GpuBroadcastExchangeExec.scala | 2 +- .../GpuBroadcastNestedLoopJoinExec.scala | 8 ++--- .../execution/GpuShuffleExchangeExec.scala | 4 +-- .../rapids/execution/ShuffledBatchRDD.scala | 2 +- .../GpuFlatMapCoGroupsInPandasExec.scala | 7 ++-- .../rapids/Spark310ParquetWriterSuite.scala | 2 +- .../shuffle/WindowedBlockIteratorSuite.scala | 3 +- 28 files changed, 129 insertions(+), 120 deletions(-) diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala index 461f7e33865..28ee863ba90 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala @@ -208,7 +208,7 @@ class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite { println("\u001b[1;36mSummary of diffs:\u001b[0m") println("\u001b[1;36mCodepoint:\u001b[0m ") - for (i <- 0 until fromCpu.length) { + for (i <- fromCpu.indices) { if (fromCpu(i) != fromGpu(i)) { val codepoint = TestCodepoints.validCodepointIndices(i) print(f"$codepoint%5d, ") @@ -219,7 +219,7 @@ class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite { println("\u001b[1;36mDetails:") println("Codepoint CPU GPU") println("single -> single mappings\u001b[0m"); - for (i <- 0 until fromCpu.length) { + for (i <- fromCpu.indices) { if (fromCpu(i) != fromGpu(i) && fromCpu(i).getString(0).length == 1) { val codepoint = TestCodepoints.validCodepointIndices(i) @@ -230,7 +230,7 @@ class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite { } } println("\u001b[1;36msingle -> multi mappings\u001b[0m"); - for (i <- 0 until fromCpu.length) { + for (i <- fromCpu.indices) { if (fromCpu(i) != fromGpu(i) && fromCpu(i).getString(0).length > 1) { var cpu_str = fromCpu(i).getString(0) var gpu_str = fromGpu(i).getString(0) @@ -264,7 +264,7 @@ class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite { // upper results val (fromCpuUpper, fromGpuUpper) = generateResults(upper) - for (i <- 0 until fromCpuUpper.length) { + for (i <- fromCpuUpper.indices) { if (fromCpuUpper(i) != fromGpuUpper(i) && fromGpuUpper(i).getString(0).length == 1) { val codepoint = TestCodepoints.validCodepointIndices(i) @@ -276,7 +276,7 @@ class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite { // lower results val (fromCpuLower, fromGpuLower) = generateResults(lower) - for (i <- 0 until fromCpuLower.length) { + for (i <- fromCpuLower.indices) { if (fromCpuLower(i) != fromGpuLower(i) && fromGpuLower(i).getString(0).length == 1) { val codepoint = TestCodepoints.validCodepointIndices(i) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index 492198a8519..4cfc2ad9157 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -51,10 +51,10 @@ class GpuBroadcastHashJoinMeta( override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - + val Seq(leftChild, rightChild) = childPlans val buildSide = join.buildSide match { - case BuildLeft => childPlans(0) - case BuildRight => childPlans(1) + case BuildLeft => leftChild + case BuildRight => rightChild } if (!buildSide.canThisBeReplaced) { @@ -67,8 +67,7 @@ class GpuBroadcastHashJoinMeta( } override def convertToGpu(): GpuExec = { - val left = childPlans(0).convertIfNeeded() - val right = childPlans(1).convertIfNeeded() + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSide = join.buildSide match { case BuildLeft => left diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index 941aabdadc6..ff87fe64af9 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -56,16 +56,18 @@ class GpuShuffledHashJoinMeta( GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) } - override def convertToGpu(): GpuExec = + override def convertToGpu(): GpuExec = { + val Seq(left, right) = childPlans.map(_.convertIfNeeded) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded(), + left, + right, isSkewJoin = false) + } } case class GpuShuffledHashJoinExec( diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala index f74c6279645..4d57f837fc9 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala @@ -77,14 +77,15 @@ class GpuSortMergeJoinMeta( } else { throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") } + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(buildSide), condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded(), + left, + right, join.isSkewJoin) } diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index d67d5585caa..ab414e9bea1 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -49,10 +49,10 @@ class GpuBroadcastHashJoinMeta( override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - + val Seq(leftChild, rightChild) = childPlans val buildSide = join.buildSide match { - case BuildLeft => childPlans(0) - case BuildRight => childPlans(1) + case BuildLeft => leftChild + case BuildRight => rightChild } if (!canBuildSideBeReplaced(buildSide)) { @@ -65,8 +65,7 @@ class GpuBroadcastHashJoinMeta( } override def convertToGpu(): GpuExec = { - val left = childPlans(0).convertIfNeeded() - val right = childPlans(1).convertIfNeeded() + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSide = join.buildSide match { case BuildLeft => left diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala index ed9322e559f..1943f988724 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala @@ -68,7 +68,7 @@ class Spark301Shims extends Spark300Shims { Seq(ParamCheck("input", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all))), (a, conf, p, r) => new ExprMeta[First](a, conf, p, r) { override def convertToGpu(): GpuExpression = - GpuFirst(childExprs(0).convertToGpu(), a.ignoreNulls) + GpuFirst(childExprs.head.convertToGpu(), a.ignoreNulls) }), GpuOverrides.expr[Last]( "last aggregate operator", @@ -76,7 +76,7 @@ class Spark301Shims extends Spark300Shims { Seq(ParamCheck("input", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all))), (a, conf, p, r) => new ExprMeta[Last](a, conf, p, r) { override def convertToGpu(): GpuExpression = - GpuLast(childExprs(0).convertToGpu(), a.ignoreNulls) + GpuLast(childExprs.head.convertToGpu(), a.ignoreNulls) }) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala index 4fbfd9713db..cafe239a674 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala @@ -48,10 +48,10 @@ class GpuBroadcastHashJoinMeta( override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - + val Seq(leftChild, rightChild) = childPlans val buildSide = join.buildSide match { - case BuildLeft => childPlans(0) - case BuildRight => childPlans(1) + case BuildLeft => leftChild + case BuildRight => rightChild } if (!canBuildSideBeReplaced(buildSide)) { @@ -64,8 +64,7 @@ class GpuBroadcastHashJoinMeta( } override def convertToGpu(): GpuExec = { - val left = childPlans(0).convertIfNeeded() - val right = childPlans(1).convertIfNeeded() + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSide = join.buildSide match { case BuildLeft => left diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala index ebf511cce29..a957a364812 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -57,15 +57,17 @@ class GpuShuffledHashJoinMeta( GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) } - override def convertToGpu(): GpuExec = + override def convertToGpu(): GpuExec = { + val Seq(leftChild, rightChild) = childPlans.map(_.convertIfNeeded()) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded()) + leftChild, + rightChild) + } } case class GpuShuffledHashJoinExec( diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala index c5329ccd5c3..e7c6c8fc717 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala @@ -78,14 +78,15 @@ class GpuSortMergeJoinMeta( } else { throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") } + val Seq(leftChild, rightChild) = childPlans.map(_.convertIfNeeded()) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(buildSide), condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded()) + leftChild, + rightChild) } /** diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala index 3a36fb2c7f2..012e29fe5d9 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala @@ -53,10 +53,10 @@ class GpuBroadcastHashJoinMeta( override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - + val Seq(leftChild, rightChild) = childPlans val buildSide = join.buildSide match { - case BuildLeft => childPlans(0) - case BuildRight => childPlans(1) + case BuildLeft => leftChild + case BuildRight => rightChild } if (!canBuildSideBeReplaced(buildSide)) { @@ -69,8 +69,7 @@ class GpuBroadcastHashJoinMeta( } override def convertToGpu(): GpuExec = { - val left = childPlans(0).convertIfNeeded() - val right = childPlans(1).convertIfNeeded() + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSide = join.buildSide match { case BuildLeft => left diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala index fa2b3439681..e25927e0c28 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala @@ -57,16 +57,18 @@ class GpuShuffledHashJoinMeta( GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) } - override def convertToGpu(): GpuExec = + override def convertToGpu(): GpuExec = { + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded(), + left, + right, isSkewJoin = false) + } } case class GpuShuffledHashJoinExec( diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuSortMergeJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuSortMergeJoinExec.scala index d36aa5829b0..7792e47ac07 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuSortMergeJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuSortMergeJoinExec.scala @@ -78,14 +78,15 @@ class GpuSortMergeJoinMeta( } else { throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") } + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(buildSide), condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded(), + left, + right, join.isSkewJoin) } diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala index 243b2a9da06..f6d5ea29a53 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala @@ -191,10 +191,8 @@ class Spark311Shims extends Spark301Shims { } } override def convertToGpu(): GpuExpression = { - GpuStringReplace( - childExprs(0).convertToGpu(), - childExprs(1).convertToGpu(), - childExprs(2).convertToGpu()) + val Seq(child0, child1, child2) = childExprs.map(_.convertToGpu()) + GpuStringReplace(child0, child1, child2) } }), // Spark 3.1.1-specific LEAD expression, using custom OffsetWindowFunctionMeta. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 1d2b2fc7be4..efdc1a4789c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -148,7 +148,7 @@ object DateUtils { var sb = new StringBuilder() var index = 0; val patterns = new ListBuffer[FormatKeywordToReplace] - format.map(character => { + format.foreach(character => { // We are checking to see if this char is a part of a previously read pattern // or start of a new one. if (sb.isEmpty || sb.last == character) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index f7322af3a03..29ee0c5eb51 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -272,46 +272,47 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression { def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector override def columnarEval(batch: ColumnarBatch): Any = { - withResourceIfAllowed(children(0).columnarEval(batch)) { val0 => - withResourceIfAllowed(children(1).columnarEval(batch)) { val1 => - withResourceIfAllowed(children(2).columnarEval(batch)) { val2 => + val Seq(child0, child1, child2) = children + withResourceIfAllowed(child0.columnarEval(batch)) { val0 => + withResourceIfAllowed(child1.columnarEval(batch)) { val1 => + withResourceIfAllowed(child2.columnarEval(batch)) { val2 => (val0, val1, val2) match { case (v0: GpuColumnVector, v1: GpuColumnVector, v2: GpuColumnVector) => doColumnar(v0, v1, v2) case (v0, v1: GpuColumnVector, v2: GpuColumnVector) => - withResource(GpuScalar.from(v0, children(0).dataType)) { scalar0 => + withResource(GpuScalar.from(v0, child0.dataType)) { scalar0 => GpuColumnVector.from(doColumnar(scalar0, v1, v2), dataType) } case (v0: GpuColumnVector, v1, v2: GpuColumnVector) => - withResource(GpuScalar.from(v1, children(1).dataType)) { scalar1 => + withResource(GpuScalar.from(v1, child1.dataType)) { scalar1 => GpuColumnVector.from(doColumnar(v0, scalar1, v2), dataType) } case (v0: GpuColumnVector, v1: GpuColumnVector, v2) => - withResource(GpuScalar.from(v2, children(2).dataType)) { scalar2 => + withResource(GpuScalar.from(v2, child2.dataType)) { scalar2 => GpuColumnVector.from(doColumnar(v0, v1, scalar2), dataType) } case (v0, v1, v2: GpuColumnVector) => - withResource(GpuScalar.from(v0, children(0).dataType)) { scalar0 => - withResource(GpuScalar.from(v1, children(1).dataType)) { scalar1 => + withResource(GpuScalar.from(v0, child0.dataType)) { scalar0 => + withResource(GpuScalar.from(v1, child1.dataType)) { scalar1 => GpuColumnVector.from(doColumnar(scalar0, scalar1, v2), dataType) } } case (v0, v1: GpuColumnVector, v2) => - withResource(GpuScalar.from(v0, children(0).dataType)) { scalar0 => - withResource(GpuScalar.from(v2, children(2).dataType)) { scalar2 => + withResource(GpuScalar.from(v0, child0.dataType)) { scalar0 => + withResource(GpuScalar.from(v2, child2.dataType)) { scalar2 => GpuColumnVector.from(doColumnar(scalar0, v1, scalar2), dataType) } } case (v0: GpuColumnVector, v1, v2) => - withResource(GpuScalar.from(v1, children(1).dataType)) { scalar1 => - withResource(GpuScalar.from(v2, children(2).dataType)) { scalar2 => + withResource(GpuScalar.from(v1, child1.dataType)) { scalar1 => + withResource(GpuScalar.from(v2, child2.dataType)) { scalar2 => GpuColumnVector.from(doColumnar(v0, scalar1, scalar2), dataType) } } case (v0, v1, v2) if v0 != null && v1 != null && v2 != null => - withResource(GpuScalar.from(v0, children(0).dataType)) { v0Scalar => - withResource(GpuScalar.from(v1, children(1).dataType)) { v1Scalar => - withResource(GpuScalar.from(v2, children(2).dataType)) { v2Scalar => + withResource(GpuScalar.from(v0, child0.dataType)) { v0Scalar => + withResource(GpuScalar.from(v1, child1.dataType)) { v1Scalar => + withResource(GpuScalar.from(v2, child2.dataType)) { v2Scalar => GpuColumnVector.from(doColumnar(batch.numRows(), v0Scalar, v1Scalar, v2Scalar), dataType) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index efb78ef454b..cf7591fa1cc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1550,7 +1550,8 @@ object GpuOverrides { // types were and cannot recover it. As such for now we are going to do what Spark does, // but we have to recompute/recheck the temporary precision to be sure it will fit // on the GPU. - (childExprs.head.dataType, childExprs(1).dataType) match { + val Seq(leftDataType, rightDataType) = childExprs.map(_.dataType) + (leftDataType, rightDataType) match { case (l: DecimalType, r: DecimalType) => val intermediateResult = GpuMultiplyUtil.decimalDataType(l, r) if (intermediateResult.precision > DType.DECIMAL64_MAX_PRECISION) { @@ -1758,7 +1759,8 @@ object GpuOverrides { // effectively calculating an extra digit of precision. Because cudf does not support this // right now we actually increase the scale (and corresponding precision) to get an extra // decimal place so we can round it in GpuCheckOverflow - (childExprs.head.dataType, childExprs(1).dataType) match { + val Seq(leftDataType, rightDataType) = childExprs.map(_.dataType) + (leftDataType, rightDataType) match { case (l: DecimalType, r: DecimalType) => val outputType = GpuDivideUtil.decimalDataType(l, r) // Case 1: OutputType.precision doesn't get truncated @@ -1851,9 +1853,9 @@ object GpuOverrides { } catch { case _: Exception => val resultMethod = a.getClass.getMethod("resultIds") - resultMethod.invoke(a).asInstanceOf[Seq[ExprId]](0) + resultMethod.invoke(a).asInstanceOf[Seq[ExprId]].head } - GpuAggregateExpression(childExprs(0).convertToGpu().asInstanceOf[GpuAggregateFunction], + GpuAggregateExpression(childExprs.head.convertToGpu().asInstanceOf[GpuAggregateFunction], a.mode, a.isDistinct, filter.map(_.convertToGpu()), resultId) } }), @@ -1911,8 +1913,10 @@ object GpuOverrides { " pivot values provided") } } - override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = - GpuPivotFirst(childExprs(0), childExprs(1), pivot.pivotColumnValues) + override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = { + val Seq(pivotColumn, valueColumn) = childExprs + GpuPivotFirst(pivotColumn, valueColumn, pivot.pivotColumnValues) + } }), expr[Count]( "Count aggregate operator", @@ -2479,7 +2483,7 @@ object GpuOverrides { TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY), (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)), (a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) { - override def convertToGpu(): GpuExpression = GpuExplode(childExprs(0).convertToGpu()) + override def convertToGpu(): GpuExpression = GpuExplode(childExprs.head.convertToGpu()) }), expr[PosExplode]( "Given an input array produces a sequence of rows for each value in the array. " @@ -2495,7 +2499,7 @@ object GpuOverrides { TypeSig.ARRAY.nested( TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY)), (a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) { - override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs(0).convertToGpu()) + override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu()) }), expr[CollectList]( "Collect a list of elements, now only supported by windowing.", @@ -2680,7 +2684,7 @@ object GpuOverrides { override def convertToGpu(): GpuExec = GpuProjectExec( // Force list to avoid recursive Java serialization of lazy list Seq implementation childExprs.map(_.convertToGpu()).toList, - childPlans(0).convertIfNeeded() + childPlans.head.convertIfNeeded() ) } }), @@ -2704,7 +2708,7 @@ object GpuOverrides { Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this))) override def convertToGpu(): GpuExec = - GpuBatchScanExec(p.output, childScans(0).convertToGpu()) + GpuBatchScanExec(p.output, childScans.head.convertToGpu()) }), exec[CoalesceExec]( "The backend for the dataframe coalesce method", @@ -2805,7 +2809,7 @@ object GpuOverrides { TypeSig.ARRAY + TypeSig.DECIMAL).nested(), TypeSig.all), (filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) { override def convertToGpu(): GpuExec = - GpuFilterExec(childExprs(0).convertToGpu(), childPlans(0).convertIfNeeded()) + GpuFilterExec(childExprs.head.convertToGpu(), childPlans.head.convertIfNeeded()) }), exec[ShuffleExchangeExec]( "The backend for most data being exchanged between processes", @@ -2849,12 +2853,14 @@ object GpuOverrides { override val childExprs: Seq[BaseExprMeta[_]] = condition.toSeq - override def convertToGpu(): GpuExec = + override def convertToGpu(): GpuExec = { + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) GpuCartesianProductExec( - childPlans.head.convertIfNeeded(), - childPlans(1).convertIfNeeded(), + left, + right, condition.map(_.convertToGpu()), conf.gpuTargetBatchSizeBytes) + } }) .disabledByDefault("large joins can cause out of memory errors"), exec[HashAggregateExec]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 90e6a17c2c3..6d8892f1d5c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -122,11 +122,10 @@ class GpuWindowExpressionMeta( /** * Convert what this wraps to a GPU enabled version. */ - override def convertToGpu(): GpuExpression = - GpuWindowExpression( - childExprs.head.convertToGpu(), - childExprs(1).convertToGpu().asInstanceOf[GpuWindowSpecDefinition] - ) + override def convertToGpu(): GpuExpression = { + val Seq(left, right) = childExprs.map(_.convertToGpu()) + GpuWindowExpression(left, right.asInstanceOf[GpuWindowSpecDefinition]) + } } case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindowSpecDefinition) @@ -556,9 +555,10 @@ class GpuSpecifiedWindowFrameMeta( } } - override def convertToGpu(): GpuExpression = - GpuSpecifiedWindowFrame(windowFrame.frameType, childExprs.head.convertToGpu(), - childExprs(1).convertToGpu()) + override def convertToGpu(): GpuExpression = { + val Seq(left, right) = childExprs.map(_.convertToGpu()) + GpuSpecifiedWindowFrame(windowFrame.frameType, left, right) + } } trait GpuWindowFrame extends GpuExpression with GpuUnevaluable { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 3fad5f5faf4..5265a37df06 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -531,7 +531,7 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, childPlans.foreach(_.tagForExplain()) } - override val childPlans: Seq[SparkPlanMeta[_]] = + override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = plan.children.map(GpuOverrides.wrapPlan(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = plan.expressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) @@ -636,7 +636,7 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, } else if (childPlans.size > 1) { throw new IllegalStateException("can't remove when plan has more than 1 child") } - childPlans(0).convertIfNeeded() + childPlans.head.convertIfNeeded() } else { if (canThisBeReplaced) { convertToGpu() @@ -802,7 +802,7 @@ abstract class UnaryExprMeta[INPUT <: UnaryExpression]( extends ExprMeta[INPUT](expr, conf, parent, rule) { override final def convertToGpu(): GpuExpression = - convertToGpu(childExprs(0).convertToGpu()) + convertToGpu(childExprs.head.convertToGpu()) def convertToGpu(child: Expression): GpuExpression } @@ -818,7 +818,7 @@ abstract class AggExprMeta[INPUT <: AggregateFunction]( extends ExprMeta[INPUT](expr, conf, parent, rule) { override final def convertToGpu(): GpuExpression = - convertToGpu(childExprs(0).convertToGpu()) + convertToGpu(childExprs.head.convertToGpu()) def convertToGpu(child: Expression): GpuExpression } @@ -849,8 +849,10 @@ abstract class BinaryExprMeta[INPUT <: BinaryExpression]( rule: DataFromReplacementRule) extends ExprMeta[INPUT](expr, conf, parent, rule) { - override final def convertToGpu(): GpuExpression = - convertToGpu(childExprs(0).convertToGpu(), childExprs(1).convertToGpu()) + override final def convertToGpu(): GpuExpression = { + val Seq(lhs, rhs) = childExprs.map(_.convertToGpu()) + convertToGpu(lhs, rhs) + } def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression } @@ -865,9 +867,10 @@ abstract class TernaryExprMeta[INPUT <: TernaryExpression]( rule: DataFromReplacementRule) extends ExprMeta[INPUT](expr, conf, parent, rule) { - override final def convertToGpu(): GpuExpression = - convertToGpu(childExprs(0).convertToGpu(), childExprs(1).convertToGpu(), - childExprs(2).convertToGpu()) + override final def convertToGpu(): GpuExpression = { + val Seq(child0, child1, child2) = childExprs.map(_.convertToGpu()) + convertToGpu(child0, child1, child2) + } def convertToGpu(val0: Expression, val1: Expression, val2: Expression): GpuExpression @@ -881,12 +884,8 @@ abstract class String2TrimExpressionMeta[INPUT <: String2TrimExpression]( extends ExprMeta[INPUT](expr, conf, parent, rule) { override final def convertToGpu(): GpuExpression = { - val trimParam = if (childExprs.size > 1) { - Some(childExprs(1).convertToGpu()) - } else { - None - } - convertToGpu(childExprs(0).convertToGpu(), trimParam) + val gpuCol :: gpuTrimParam = childExprs.map(_.convertToGpu()) + convertToGpu(gpuCol, gpuTrimParam.headOption) } def convertToGpu(column: Expression, target: Option[Expression] = None): GpuExpression diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 1e4fab6bfd8..150ae38064e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -161,7 +161,7 @@ class GpuHashAggregateMeta( aggregateAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], agg.initialInputBufferOffset, resultExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], - childPlans(0).convertIfNeeded()) + childPlans.head.convertIfNeeded()) } } @@ -276,7 +276,7 @@ class GpuSortAggregateMeta( aggregateAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], agg.initialInputBufferOffset, resultExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], - childPlans(0).convertIfNeeded()) + childPlans.head.convertIfNeeded()) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index a02756e85ae..def4f1fc718 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -173,7 +173,7 @@ case class GpuCaseWhen( override def nullable: Boolean = { // Result is nullable if any of the branch is nullable, or if the else value is nullable - branches.exists(_._2.nullable) || elseValue.map(_.nullable).getOrElse(true) + branches.exists(_._2.nullable) || elseValue.forall(_.nullable) } override def checkInputDataTypes(): TypeCheckResult = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSource.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSource.scala index 0e41488b3f6..f24cb481efd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSource.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSource.scala @@ -81,12 +81,10 @@ case class GpuDataSource( * Whether or not paths should be globbed before being used to access files. */ def globPaths: Boolean = { - options.get(GpuDataSource.GLOB_PATHS_KEY) - .map(_ == "true") - .getOrElse(true) + options.get(GpuDataSource.GLOB_PATHS_KEY).forall(_ == "true") } - bucketSpec.map { bucket => + bucketSpec.foreach { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) SchemaUtils.checkColumnNameDuplication( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index b72b84296c9..cf253500851 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -230,7 +230,7 @@ class GpuBroadcastMeta( override def convertToGpu(): GpuExec = { ShimLoader.getSparkShims.getGpuBroadcastExchangeExec( - exchange.mode, childPlans(0).convertIfNeeded()) + exchange.mode, childPlans.head.convertIfNeeded()) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index 42a962dcd8d..5a53c2be7e0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -56,9 +56,10 @@ class GpuBroadcastNestedLoopJoinMeta( } val gpuBuildSide = ShimLoader.getSparkShims.getBuildSide(join) + val Seq(leftPlan, rightPlan) = childPlans val buildSide = gpuBuildSide match { - case GpuBuildLeft => childPlans.head - case GpuBuildRight => childPlans(1) + case GpuBuildLeft => leftPlan + case GpuBuildRight => rightPlan } if (!canBuildSideBeReplaced(buildSide)) { @@ -72,8 +73,7 @@ class GpuBroadcastNestedLoopJoinMeta( } override def convertToGpu(): GpuExec = { - val left = childPlans.head.convertIfNeeded() - val right = childPlans(1).convertIfNeeded() + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val gpuBuildSide = ShimLoader.getSparkShims.getBuildSide(join) val buildSide = gpuBuildSide match { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala index 75ad76b7ea5..34dfc0a953c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala @@ -67,8 +67,8 @@ class GpuShuffleMeta( override def convertToGpu(): GpuExec = ShimLoader.getSparkShims.getGpuShuffleExchangeExec( - childParts(0).convertToGpu(), - childPlans(0).convertIfNeeded(), + childParts.head.convertToGpu(), + childPlans.head.convertIfNeeded(), Some(shuffle)) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala index 130eef3a5fc..4eeb544cd56 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala @@ -53,7 +53,7 @@ class CoalescedBatchPartitioner(val parent: Partitioner, val partitionStartIndic @transient private lazy val parentPartitionMapping = { val n = parent.numPartitions val result = new Array[Int](n) - for (i <- 0 until partitionStartIndices.length) { + for (i <- partitionStartIndices.indices) { val start = partitionStartIndices(i) val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n for (j <- start until end) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala index a063bccc047..120a0264106 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala @@ -49,13 +49,16 @@ class GpuFlatMapCoGroupsInPandasExecMeta( // Ignore the expressions since columnar way is not supported yet override val childExprs: Seq[BaseExprMeta[_]] = Seq.empty - override def convertToGpu(): GpuExec = + override def convertToGpu(): GpuExec = { + val Seq(left, right) = childPlans.map(_.convertIfNeeded()) GpuFlatMapCoGroupsInPandasExec( flatPandas.leftGroup, flatPandas.rightGroup, flatPandas.func, flatPandas.output, - childPlans.head.convertIfNeeded(), childPlans(1).convertIfNeeded() + left, + right ) + } } /* diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala index a984f488fde..7862b7d42d2 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala +++ b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala @@ -108,7 +108,7 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite { val splitRange = scala.Range(rowsAllowedInABatch.toInt, rows, rowsAllowedInABatch.toInt) scala.Range(0, cb.numCols()).indices.foreach { i => val spyCol = cb.column(i).asInstanceOf[GpuColumnVector].getBase - val splitCols0 = scala.Range(0, splitRange.length).map { _ => + val splitCols0 = splitRange.indices.map { _ => val spySplitCol = spy(ColumnVector.fromBytes(4, 5, 6)) when(spySplitCol.getRowCount()).thenReturn(rowsAllowedInABatch) spySplitCol diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala index 69728157214..47d32633062 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala @@ -104,8 +104,7 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { val blockRanges = wbi.next() assertResult(2)(blockRanges.size) - val firstBlock = blockRanges(0) - val secondBlock = blockRanges(1) + val Seq(firstBlock, secondBlock) = blockRanges assertResult(1000)(firstBlock.rangeSize()) assertResult(0)(firstBlock.rangeStart)