Skip to content

Commit

Permalink
Addressed a few more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 committed May 10, 2022
1 parent 645117e commit 62de08b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,11 @@ private case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: Seq[Bloc

/**
* A parquet compatible stream that allows reading from a HostMemoryBuffer to Parquet.
* The majority of the code here was copied from Parquet's DelegatingSeekableInputStream with
* minor modifications to have it be make it Scala and call into the
* HostMemoryInputStreamMixIn's state.
*/
class HMBSeekableInputStream(
val hmb: HostMemoryBuffer,
val hmbLength: Long) extends SeekableInputStream
with HostMemoryInputStreamMixIn {
private val temp = new Array[Byte](8192)

override def seek(offset: Long): Unit = {
pos = offset
Expand All @@ -361,7 +357,7 @@ class HMBSeekableInputStream(
val amountRead = read(buffer)
val remaining = buffer.length - amountRead
if (remaining > 0) {
throw new EOFException("Reached the end of stream with " + remaining + " bytes left to read")
throw new EOFException(s"Reached the end of stream with $remaining bytes left to read")
}
}

Expand All @@ -370,83 +366,32 @@ class HMBSeekableInputStream(
val amountRead = read(buffer, offset, length)
val remaining = length - amountRead
if (remaining > 0) {
throw new EOFException("Reached the end of stream with " + remaining + " bytes left to read")
throw new EOFException(s"Reached the end of stream with $remaining bytes left to read")
}
}

@throws[IOException]
override def read(buf: ByteBuffer): Int =
if (buf.hasArray) {
readHeapBuffer(buf)
if (pos >= hmbLength) {
-1
} else {
readDirectBuffer(buf)
val requested = buf.remaining()
val toRead = Math.min(requested, available())
val fromBuf = hmb.asByteBuffer(pos, toRead)
pos = pos + toRead
fromBuf.put(buf)
toRead
}

@throws[IOException]
override def readFully(buf: ByteBuffer): Unit = {
if (buf.hasArray) {
readFullyHeapBuffer(buf)
} else {
readFullyDirectBuffer(buf)
}
}

private def readHeapBuffer(buf: ByteBuffer) = {
val bytesRead = read(buf.array, buf.arrayOffset + buf.position, buf.remaining)
if (bytesRead < 0) {
bytesRead
} else {
buf.position(buf.position + bytesRead)
bytesRead
}
}

private def readFullyHeapBuffer(buf: ByteBuffer): Unit = {
readFully(buf.array, buf.arrayOffset + buf.position, buf.remaining)
buf.position(buf.limit)
}

private def readDirectBuffer(buf: ByteBuffer): Int = {
var nextReadLength = Math.min(buf.remaining, temp.length)
var totalBytesRead = 0
var bytesRead = 0
totalBytesRead = 0
bytesRead = read(temp, 0, nextReadLength)
while (bytesRead == temp.length) {
buf.put(temp)
totalBytesRead += bytesRead

nextReadLength = Math.min(buf.remaining, temp.length)
bytesRead = read(temp, 0, nextReadLength)
}
if (bytesRead < 0) {
if (totalBytesRead == 0) {
-1
} else {
totalBytesRead
}
} else {
buf.put(temp, 0, bytesRead)
totalBytesRead += bytesRead
totalBytesRead
}
}

private def readFullyDirectBuffer(buf: ByteBuffer): Unit = {
var nextReadLength = Math.min(buf.remaining, temp.length)
var bytesRead = 0
bytesRead = 0
bytesRead = read(temp, 0, nextReadLength)
while (nextReadLength > 0 && bytesRead >= 0) {
buf.put(temp, 0, bytesRead)

nextReadLength = Math.min(buf.remaining, temp.length)
bytesRead = read(temp, 0, nextReadLength)
}
if (bytesRead < 0 && buf.remaining > 0) {
throw new EOFException("Reached the end of stream with " +
buf.remaining + " bytes left to read")
val requested = buf.remaining()
val avail = available()
if (requested < avail) {
throw new EOFException(s"Not enough bytes remaining ($avail) to satisfy the " +
s"${buf.remaining} bytes requested.")
}
read(buf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
case "NATIVE" => ParquetFooterReaderType.NATIVE
case "JAVA" => ParquetFooterReaderType.JAVA
case other =>
throw new IllegalArgumentException(s"Internal Error $other is not a supported for " +
throw new IllegalArgumentException(s"Internal Error $other is not supported for " +
s"${PARQUET_READER_FOOTER_TYPE.key}")
}
}
Expand Down

0 comments on commit 62de08b

Please sign in to comment.