Skip to content

Commit

Permalink
Don't acquire the semaphore for empty input while scanning (#4476)
Browse files Browse the repository at this point in the history
* Don't acquire the semaphore for empty batches while scanning

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>

* Update copyrights

* Update comments in scans per review suggestion
  • Loading branch information
abellina authored Jan 11, 2022
1 parent 7564f22 commit b17c685
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -308,7 +308,6 @@ abstract class MultiFileCloudPartitionReaderBase(
private var filesToRead = 0
protected var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaDataBase] = None
private var isInitted = false
private var isFirstBatch = true
private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaDataBase]]()
private val tasksToRun = new Queue[Callable[HostMemoryBuffersWithMetaDataBase]]()
private[this] val inputMetrics = Option(TaskContext.get).map(_.taskMetrics().inputMetrics)
Expand Down Expand Up @@ -426,15 +425,12 @@ abstract class MultiFileCloudPartitionReaderBase(
next()
}

if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here. No downstream tasks should
// try to call next after `hasNext` returns false, and any task that produces some kind of
// data when `hasNext` is false is responsible to get the semaphore themselves.
batch.isDefined
}

Expand Down Expand Up @@ -564,7 +560,6 @@ abstract class MultiFileCoalescingPartitionReaderBase(
private val blockIterator: BufferedIterator[SingleDataBlockInfo] =
clippedBlocks.iterator.buffered
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
private[this] var isFirstBatch = true

private case class CurrentChunkMeta(
clippedSchema: SchemaBase,
Expand Down Expand Up @@ -709,15 +704,12 @@ abstract class MultiFileCoalescingPartitionReaderBase(
}
}

if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here. No downstream tasks should
// try to call next after `hasNext` returns false, and any task that produces some kind of
// data when `hasNext` is false is responsible to get the semaphore themselves.
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -611,7 +611,6 @@ class GpuOrcPartitionReader(
override val execMetrics : Map[String, GpuMetric],
isCaseSensitive: Boolean) extends FilePartitionReaderBase(conf, execMetrics)
with OrcPartitionReaderBase {
private[this] var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -621,15 +620,13 @@ class GpuOrcPartitionReader(
} else {
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
}
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here. No downstream tasks should
// try to call next after `hasNext` returns false, and any task that produces some kind of
// data when `hasNext` is false is responsible to get the semaphore themselves.
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1458,7 +1458,6 @@ class ParquetPartitionReader(
with ParquetPartitionReaderBase {

private val blockIterator: BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered
private var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -1471,14 +1470,13 @@ class ParquetPartitionReader(
batch = readBatch()
}
}
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here. No downstream tasks should
// try to call next after `hasNext` returns false, and any task that produces some kind of
// data when `hasNext` is false is responsible to get the semaphore themselves.
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ abstract class GpuTextBasedPartitionReader(
private val lineReader = new HadoopFileLinesReader(partFile, lineSeparatorInRead, conf)
private var isFirstChunkForIterator: Boolean = true
private var isExhausted: Boolean = false
private var isFirstBatch: Boolean = true
private var maxDeviceMemory: Long = 0

metrics = execMetrics
Expand Down Expand Up @@ -220,14 +219,13 @@ abstract class GpuTextBasedPartitionReader(
} else {
readBatch()
}
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here. No downstream tasks should
// try to call next after `hasNext` returns false, and any task that produces some kind of
// data when `hasNext` is false is responsible to get the semaphore themselves.
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ case class GpuHashAggregateMetrics(
computeAggTime: GpuMetric,
concatTime: GpuMetric,
sortTime: GpuMetric,
semWaitTime: GpuMetric,
spillCallback: SpillCallback)

/** Utility class to convey information on the aggregation modes being used */
Expand Down Expand Up @@ -484,6 +485,11 @@ class GpuHashAggregateIterator(
val aggregateFunctions = aggregateExpressions.map(_.aggregateFunction)
val defaultValues =
aggregateFunctions.flatMap(_.initialValues)
// We have to grab the semaphore in this scenario, since this is a reduction that produces
// rows on the GPU out of empty input, meaning that if a batch has 0 rows, a new single
// row is getting created with 0 as the count (if count is the operation), and other default
// values.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics.semWaitTime)
val vecs = defaultValues.safeMap { ref =>
withResource(GpuScalar.from(ref.asInstanceOf[GpuLiteral].value, ref.dataType)) {
scalar => GpuColumnVector.from(scalar, 1, ref.dataType)
Expand Down Expand Up @@ -1392,6 +1398,7 @@ case class GpuHashAggregateExec(
computeAggTime = gpuLongMetric(AGG_TIME),
concatTime = gpuLongMetric(CONCAT_TIME),
sortTime = gpuLongMetric(SORT_TIME),
semWaitTime = gpuLongMetric(SEMAPHORE_WAIT_TIME),
makeSpillCallback(allMetrics))

// cache in a local variable to avoid serializing the full child plan
Expand Down

0 comments on commit b17c685

Please sign in to comment.