From edd34daa82c2e1e8d2b6ecec61fd7c0e9b3dd35a Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 12 Oct 2020 22:17:49 -0500 Subject: [PATCH 1/6] Adds iterator to make it easier to work with ranges of blocks Signed-off-by: Alessandro Bellina --- .../shuffle/WindowedBlockIterator.scala | 150 ++++++++++++++++++ .../shuffle/WindowedBlockIteratorSuite.scala | 122 ++++++++++++++ 2 files changed, 272 insertions(+) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala new file mode 100644 index 00000000000..56e8080216a --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shuffle + +import scala.collection.mutable.ArrayBuffer + +// Helper trait that callers can use to add blocks to the iterator +// as long as they can provide a size +trait BlockWithSize { + /** + * Abstract method to return the size in bytes of this block + * @return Long - size in bytes + */ + def size: Long +} + +/** + * Specifies a start and end range of byes for a block. + * @param block - a BlockWithSize instance + * @param rangeStart - byte offset for the start of the range + * @param rangeEnd - byte offset for the end of the range + * @tparam T - the specific type of `BlockWithSize` + */ +case class BlockRange[T <: BlockWithSize]( + block: T, rangeStart: Long, rangeEnd: Long) { + + /** + * Returns the size of this range in bytes + * @return - Long - size in bytes + */ + def rangeSize(): Long = rangeEnd - rangeStart + 1 + + def isComplete(): Boolean = rangeEnd == block.size - 1 +} + +/** + * Given a set of blocks, this iterator returns BlockRanges + * of such blocks that fit `windowSize`. + * + * If a block is too large for the window, the block will be + * returned in `next()` until the full block could be covered. + * + * @param transferBlocks - sequence of blocks to manage + * @param windowSize - the size (in bytes) that block ranges should fit + * @tparam T - the specific type of `BlockWithSize` + * @note this class does not own `transferBlocks` + * @note this class is not thread safe + */ +class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long) + extends Iterator[Seq[BlockRange[T]]] { + + require(windowSize > 0, s"Invalid window size specified $windowSize") + + case class BlockWindow(start: Long, size: Long) { + val end = start + size - 1 + def move(): BlockWindow = { + val windowLength = size - start + BlockWindow(start + size, size) + } + } + + // start the window at byte 0 + private[this] var window = BlockWindow(0, windowSize) + private[this] var done = false + + // helper class that captures the start/end byte offset + // for `block` on creation + case class BlockWithOffset[T <: BlockWithSize]( + block: T, startOffset: Long, endOffset: Long) + + private var lastOffset = 0L + private[this] val blocksWithOffsets = blocks.map { block => + require(block.size > 0, "Invalid 0-byte block") + val startOffset = lastOffset + val endOffset = startOffset + block.size - 1 + lastOffset = endOffset + 1 // for next block + BlockWithOffset(block, startOffset, endOffset) + } + + case class BlocksForWindow(lastBlockIndex: Option[Int], + blockRanges: Seq[BlockRange[T]], + hasMoreBlocks: Boolean) + + private def getBlocksForWindow( + window: BlockWindow, + startingBlock: Int = 0): BlocksForWindow = { + val blockRangesInWindow = new ArrayBuffer[BlockRange[T]]() + var continue = true + var thisBlock = startingBlock + var lastBlockIndex: Option[Int] = None + while (continue && thisBlock < blocksWithOffsets.size) { + val b = blocksWithOffsets(thisBlock) + // if at least 1 byte fits within the window, this block should be included + if (window.start <= b.endOffset && window.end >= b.startOffset) { + var rangeStart = window.start - b.startOffset + if (rangeStart < 0) { + rangeStart = 0 + } + var rangeEnd = window.end - b.startOffset + if (window.end > b.endOffset) { + rangeEnd = b.endOffset - b.startOffset + } + blockRangesInWindow.append(BlockRange[T](b.block, rangeStart, rangeEnd)) + lastBlockIndex = Some(thisBlock) + } else { + // skip this block, unless it's before our window starts + continue = b.endOffset < window.start + } + thisBlock = thisBlock + 1 + } + val lastBlock = blockRangesInWindow.last + BlocksForWindow(lastBlockIndex, + blockRangesInWindow, + !continue || !lastBlock.isComplete()) + } + + private var lastSeenBlock = 0 + def next(): Seq[BlockRange[T]] = { + if (!hasNext) { + throw new NoSuchElementException(s"BounceBufferWindow $window has been exhausted.") + } + + val blocksForWindow = getBlocksForWindow(window, lastSeenBlock) + lastSeenBlock = blocksForWindow.lastBlockIndex.getOrElse(0) + + if (blocksForWindow.hasMoreBlocks) { + window = window.move() + } else { + done = true + } + + blocksForWindow.blockRanges + } + + override def hasNext: Boolean = !done && blocksWithOffsets.nonEmpty +} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala new file mode 100644 index 00000000000..9d245b4d2d5 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shuffle + +import java.util.NoSuchElementException + +import org.mockito.Mockito._ + +class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { + test ("empty iterator throws on next") { + val wbi = new WindowedBlockIterator[BlockWithSize](Seq.empty, 1024) + assertResult(false)(wbi.hasNext) + assertThrows[NoSuchElementException](wbi.next) + } + + test ("0-byte blocks are not allowed") { + val block = mock[BlockWithSize] + when(block.size).thenReturn(0) + assertThrows[IllegalArgumentException]( + new WindowedBlockIterator[BlockWithSize](Seq(block), 1024)) + } + + test ("1024 1-byte blocks all fit in 1 1024-byte window") { + val mockBlocks = (0 until 1024).map { i => + val block = mock[BlockWithSize] + when(block.size).thenReturn(1) + block + } + val wbi = new WindowedBlockIterator[BlockWithSize](mockBlocks, 1024) + assertResult(true)(wbi.hasNext) + val blockRange = wbi.next() + assertResult(1024)(blockRange.size) + blockRange.foreach { br => + assertResult(1)(br.rangeSize()) + assertResult(0)(br.rangeStart) + assertResult(0)(br.rangeEnd) + } + assertResult(false)(wbi.hasNext) + assertThrows[NoSuchElementException](wbi.next) + } + + test ("a block larger than the window is split between calls to next") { + val block = mock[BlockWithSize] + when(block.size).thenReturn(2049) + + val wbi = new WindowedBlockIterator[BlockWithSize](Seq(block), 1024) + assertResult(true)(wbi.hasNext) + val blockRanges = wbi.next() + assertResult(1)(blockRanges.size) + + val blockRange = blockRanges.head + assertResult(1024)(blockRange.rangeSize()) + assertResult(0)(blockRange.rangeStart) + assertResult(1023)(blockRange.rangeEnd) + assertResult(true)(wbi.hasNext) + + val blockRangesMiddle = wbi.next() + val blockRangeMiddle = blockRangesMiddle.head + assertResult(1024)(blockRangeMiddle.rangeSize()) + assertResult(1024)(blockRangeMiddle.rangeStart) + assertResult(2047)(blockRangeMiddle.rangeEnd) + assertResult(true)(wbi.hasNext) + + val blockRangesLastByte = wbi.next() + val blockRangeLastByte = blockRangesLastByte.head + assertResult(1)(blockRangeLastByte.rangeSize()) + assertResult(2048)(blockRangeLastByte.rangeStart) + assertResult(2048)(blockRangeLastByte.rangeEnd) + + assertResult(false)(wbi.hasNext) + assertThrows[NoSuchElementException](wbi.next) + } + + test ("a block fits entirely, but a subsequent block doesn't") { + val block = mock[BlockWithSize] + when(block.size).thenReturn(1000) + + val block2 = mock[BlockWithSize] + when(block2.size).thenReturn(1000) + + val wbi = new WindowedBlockIterator[BlockWithSize](Seq(block, block2), 1024) + assertResult(true)(wbi.hasNext) + val blockRanges = wbi.next() + assertResult(2)(blockRanges.size) + + val firstBlock = blockRanges(0) + val secondBlock = blockRanges(1) + + assertResult(1000)(firstBlock.rangeSize()) + assertResult(0)(firstBlock.rangeStart) + assertResult(999)(firstBlock.rangeEnd) + assertResult(true)(wbi.hasNext) + + assertResult(24)(secondBlock.rangeSize()) + assertResult(0)(secondBlock.rangeStart) + assertResult(23)(secondBlock.rangeEnd) + assertResult(true)(wbi.hasNext) + + val blockRangesLastByte = wbi.next() + val blockRangeLastByte = blockRangesLastByte.head + assertResult(976)(blockRangeLastByte.rangeSize()) + assertResult(24)(blockRangeLastByte.rangeStart) + assertResult(999)(blockRangeLastByte.rangeEnd) + + assertResult(false)(wbi.hasNext) + assertThrows[NoSuchElementException](wbi.next) + } +} From d56214f30705b22eaa8dac82f2731a8ca74a5895 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 19 Oct 2020 16:08:20 -0500 Subject: [PATCH 2/6] Add a require statement for range boundaries, tests, and comment fixes --- .../shuffle/WindowedBlockIterator.scala | 34 ++++++++++++++++--- .../shuffle/WindowedBlockIteratorSuite.scala | 7 ++++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala index 56e8080216a..42d2a0e5d77 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -29,7 +29,7 @@ trait BlockWithSize { } /** - * Specifies a start and end range of byes for a block. + * Specifies a start and end range of bytes for a block. * @param block - a BlockWithSize instance * @param rangeStart - byte offset for the start of the range * @param rangeEnd - byte offset for the end of the range @@ -37,6 +37,8 @@ trait BlockWithSize { */ case class BlockRange[T <: BlockWithSize]( block: T, rangeStart: Long, rangeEnd: Long) { + require(rangeStart <= rangeEnd, + s"Instantiated a BlockRange with invalid ranges: $rangeStart to $rangeEnd") /** * Returns the size of this range in bytes @@ -49,12 +51,35 @@ case class BlockRange[T <: BlockWithSize]( /** * Given a set of blocks, this iterator returns BlockRanges - * of such blocks that fit `windowSize`. + * of such blocks that fit `windowSize`. The ranges are just logical + * chunks of the blocks, so this class performs no memory management or copying. * * If a block is too large for the window, the block will be - * returned in `next()` until the full block could be covered. + * returned in `next()` until the full block can be covered. * - * @param transferBlocks - sequence of blocks to manage + * For example, given a block that is 4 window-sizes in length: + * block = [sb1, sb2, sb3, sb4] + * + * The window will return on `next()` four "sub-blocks", governed by `windowSize`: + * window.next() // sb1 + * window.next() // sb2 + * window.next() // sb3 + * window.next() // sb4 + * + * If blocks are smaller than the `windowSize`, they will be packed: + * block1 = [b1] + * block2 = [b2] + * window.next() // [b1, b2] + * + * A mix of both scenarios above is possible: + * block1 = [sb11, sb12, sb13] // where sb13 is smaller than window length + * block2 = [b2] + * + * window.next() // sb11 + * window.next() // sb12 + * window.next() // [sb13, b2] + * + * @param blocks - sequence of blocks to manage * @param windowSize - the size (in bytes) that block ranges should fit * @tparam T - the specific type of `BlockWithSize` * @note this class does not own `transferBlocks` @@ -68,7 +93,6 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long case class BlockWindow(start: Long, size: Long) { val end = start + size - 1 def move(): BlockWindow = { - val windowLength = size - start BlockWindow(start + size, size) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala index 9d245b4d2d5..fa5ccc11769 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala @@ -27,6 +27,13 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { assertThrows[NoSuchElementException](wbi.next) } + test ("1-byte+ ranges are allowed, but 0-byte or negative ranges are not") { + assertResult(1)(BlockRange(null, 123, 123).rangeSize()) + assertResult(2)(BlockRange(null, 123, 124).rangeSize()) + assertThrows[IllegalArgumentException](BlockRange(null, 123, 122)) + assertThrows[IllegalArgumentException](BlockRange(null, 123, 121)) + } + test ("0-byte blocks are not allowed") { val block = mock[BlockWithSize] when(block.size).thenReturn(0) From 2cb97718298dc371485930019754ccd982fd99fa Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 19 Oct 2020 16:09:55 -0500 Subject: [PATCH 3/6] Invalid ranges -> Invalid boundaries. Rephrase --- .../com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala index 42d2a0e5d77..4ece6c1c7a0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -38,7 +38,7 @@ trait BlockWithSize { case class BlockRange[T <: BlockWithSize]( block: T, rangeStart: Long, rangeEnd: Long) { require(rangeStart <= rangeEnd, - s"Instantiated a BlockRange with invalid ranges: $rangeStart to $rangeEnd") + s"Instantiated a BlockRange with invalid boundaries: $rangeStart to $rangeEnd") /** * Returns the size of this range in bytes From f6ced8e5a924637030393c937b373cd502e86841 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 19 Oct 2020 16:43:00 -0500 Subject: [PATCH 4/6] Make rangeEnd exclusive, clarify in javadocs, update tests Signed-off-by: Alessandro Bellina --- .../shuffle/WindowedBlockIterator.scala | 38 ++++++++++--------- .../shuffle/WindowedBlockIteratorSuite.scala | 20 +++++----- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala index 4ece6c1c7a0..c63f9d92fc6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -31,22 +31,22 @@ trait BlockWithSize { /** * Specifies a start and end range of bytes for a block. * @param block - a BlockWithSize instance - * @param rangeStart - byte offset for the start of the range - * @param rangeEnd - byte offset for the end of the range + * @param rangeStart - byte offset for the start of the range (inclusive) + * @param rangeEnd - byte offset for the end of the range (exclusive) * @tparam T - the specific type of `BlockWithSize` */ case class BlockRange[T <: BlockWithSize]( block: T, rangeStart: Long, rangeEnd: Long) { - require(rangeStart <= rangeEnd, + require(rangeStart < rangeEnd, s"Instantiated a BlockRange with invalid boundaries: $rangeStart to $rangeEnd") /** * Returns the size of this range in bytes * @return - Long - size in bytes */ - def rangeSize(): Long = rangeEnd - rangeStart + 1 + def rangeSize(): Long = rangeEnd - rangeStart - def isComplete(): Boolean = rangeEnd == block.size - 1 + def isComplete(): Boolean = rangeEnd == block.size } /** @@ -90,8 +90,8 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long require(windowSize > 0, s"Invalid window size specified $windowSize") - case class BlockWindow(start: Long, size: Long) { - val end = start + size - 1 + private case class BlockWindow(start: Long, size: Long) { + val end = start + size // non-exclusive end offset def move(): BlockWindow = { BlockWindow(start + size, size) } @@ -103,16 +103,18 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long // helper class that captures the start/end byte offset // for `block` on creation - case class BlockWithOffset[T <: BlockWithSize]( + private case class BlockWithOffset[T <: BlockWithSize]( block: T, startOffset: Long, endOffset: Long) - private var lastOffset = 0L - private[this] val blocksWithOffsets = blocks.map { block => - require(block.size > 0, "Invalid 0-byte block") - val startOffset = lastOffset - val endOffset = startOffset + block.size - 1 - lastOffset = endOffset + 1 // for next block - BlockWithOffset(block, startOffset, endOffset) + private[this] val blocksWithOffsets = { + var lastOffset = 0L + blocks.map { block => + require(block.size > 0, "Invalid 0-byte block") + val startOffset = lastOffset + val endOffset = startOffset + block.size + lastOffset = endOffset // for next block + BlockWithOffset(block, startOffset, endOffset) + } } case class BlocksForWindow(lastBlockIndex: Option[Int], @@ -129,20 +131,20 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long while (continue && thisBlock < blocksWithOffsets.size) { val b = blocksWithOffsets(thisBlock) // if at least 1 byte fits within the window, this block should be included - if (window.start <= b.endOffset && window.end >= b.startOffset) { + if (window.start < b.endOffset && window.end > b.startOffset) { var rangeStart = window.start - b.startOffset if (rangeStart < 0) { rangeStart = 0 } var rangeEnd = window.end - b.startOffset - if (window.end > b.endOffset) { + if (window.end >= b.endOffset) { rangeEnd = b.endOffset - b.startOffset } blockRangesInWindow.append(BlockRange[T](b.block, rangeStart, rangeEnd)) lastBlockIndex = Some(thisBlock) } else { // skip this block, unless it's before our window starts - continue = b.endOffset < window.start + continue = b.endOffset <= window.start } thisBlock = thisBlock + 1 } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala index fa5ccc11769..65211f89394 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala @@ -28,10 +28,10 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { } test ("1-byte+ ranges are allowed, but 0-byte or negative ranges are not") { - assertResult(1)(BlockRange(null, 123, 123).rangeSize()) - assertResult(2)(BlockRange(null, 123, 124).rangeSize()) + assertResult(1)(BlockRange(null, 123, 124).rangeSize()) + assertResult(2)(BlockRange(null, 123, 125).rangeSize()) + assertThrows[IllegalArgumentException](BlockRange(null, 123, 123)) assertThrows[IllegalArgumentException](BlockRange(null, 123, 122)) - assertThrows[IllegalArgumentException](BlockRange(null, 123, 121)) } test ("0-byte blocks are not allowed") { @@ -54,7 +54,7 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { blockRange.foreach { br => assertResult(1)(br.rangeSize()) assertResult(0)(br.rangeStart) - assertResult(0)(br.rangeEnd) + assertResult(1)(br.rangeEnd) } assertResult(false)(wbi.hasNext) assertThrows[NoSuchElementException](wbi.next) @@ -72,21 +72,21 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { val blockRange = blockRanges.head assertResult(1024)(blockRange.rangeSize()) assertResult(0)(blockRange.rangeStart) - assertResult(1023)(blockRange.rangeEnd) + assertResult(1024)(blockRange.rangeEnd) assertResult(true)(wbi.hasNext) val blockRangesMiddle = wbi.next() val blockRangeMiddle = blockRangesMiddle.head assertResult(1024)(blockRangeMiddle.rangeSize()) assertResult(1024)(blockRangeMiddle.rangeStart) - assertResult(2047)(blockRangeMiddle.rangeEnd) + assertResult(2048)(blockRangeMiddle.rangeEnd) assertResult(true)(wbi.hasNext) val blockRangesLastByte = wbi.next() val blockRangeLastByte = blockRangesLastByte.head assertResult(1)(blockRangeLastByte.rangeSize()) assertResult(2048)(blockRangeLastByte.rangeStart) - assertResult(2048)(blockRangeLastByte.rangeEnd) + assertResult(2049)(blockRangeLastByte.rangeEnd) assertResult(false)(wbi.hasNext) assertThrows[NoSuchElementException](wbi.next) @@ -109,19 +109,19 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { assertResult(1000)(firstBlock.rangeSize()) assertResult(0)(firstBlock.rangeStart) - assertResult(999)(firstBlock.rangeEnd) + assertResult(1000)(firstBlock.rangeEnd) assertResult(true)(wbi.hasNext) assertResult(24)(secondBlock.rangeSize()) assertResult(0)(secondBlock.rangeStart) - assertResult(23)(secondBlock.rangeEnd) + assertResult(24)(secondBlock.rangeEnd) assertResult(true)(wbi.hasNext) val blockRangesLastByte = wbi.next() val blockRangeLastByte = blockRangesLastByte.head assertResult(976)(blockRangeLastByte.rangeSize()) assertResult(24)(blockRangeLastByte.rangeStart) - assertResult(999)(blockRangeLastByte.rangeEnd) + assertResult(1000)(blockRangeLastByte.rangeEnd) assertResult(false)(wbi.hasNext) assertThrows[NoSuchElementException](wbi.next) From 831245a3b3f50269b2f8af4dd6debbdc83808719 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 19 Oct 2020 18:04:22 -0500 Subject: [PATCH 5/6] Non-exclusive -> exclusive, typo --- .../com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala index c63f9d92fc6..b8ef7364373 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -91,7 +91,7 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long require(windowSize > 0, s"Invalid window size specified $windowSize") private case class BlockWindow(start: Long, size: Long) { - val end = start + size // non-exclusive end offset + val end = start + size // exclusive end offset def move(): BlockWindow = { BlockWindow(start + size, size) } From 8ba316050a056f37ac603b799845e26f8b044a91 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 19 Oct 2020 22:04:03 -0500 Subject: [PATCH 6/6] Move lastSeenBlock up and make it private[this] --- .../nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala index b8ef7364373..4d9ab267133 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -117,6 +117,10 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long } } + // the last block index that made it into a window, which + // is an index into the `blocksWithOffsets` sequence + private[this] var lastSeenBlock = 0 + case class BlocksForWindow(lastBlockIndex: Option[Int], blockRanges: Seq[BlockRange[T]], hasMoreBlocks: Boolean) @@ -154,7 +158,6 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long !continue || !lastBlock.isComplete()) } - private var lastSeenBlock = 0 def next(): Seq[BlockRange[T]] = { if (!hasNext) { throw new NoSuchElementException(s"BounceBufferWindow $window has been exhausted.")