Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't acquire the semaphore for empty input while scanning #4476

Merged
merged 3 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ abstract class MultiFileCloudPartitionReaderBase(
private var filesToRead = 0
jlowe marked this conversation as resolved.
Show resolved Hide resolved
protected var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaDataBase] = None
private var isInitted = false
private var isFirstBatch = true
private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaDataBase]]()
private val tasksToRun = new Queue[Callable[HostMemoryBuffersWithMetaDataBase]]()
private[this] val inputMetrics = Option(TaskContext.get).map(_.taskMetrics().inputMetrics)
Expand Down Expand Up @@ -426,15 +425,11 @@ abstract class MultiFileCloudPartitionReaderBase(
next()
}

if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here (no downstream tasks should
// consume this empty batch)
abellina marked this conversation as resolved.
Show resolved Hide resolved
batch.isDefined
}

Expand Down Expand Up @@ -564,7 +559,6 @@ abstract class MultiFileCoalescingPartitionReaderBase(
private val blockIterator: BufferedIterator[SingleDataBlockInfo] =
clippedBlocks.iterator.buffered
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
private[this] var isFirstBatch = true

private case class CurrentChunkMeta(
clippedSchema: SchemaBase,
Expand Down Expand Up @@ -709,15 +703,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
}
}

if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here (no downstream tasks should
// consume this empty batch)
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ class GpuOrcPartitionReader(
override val execMetrics : Map[String, GpuMetric],
jlowe marked this conversation as resolved.
Show resolved Hide resolved
isCaseSensitive: Boolean) extends FilePartitionReaderBase(conf, execMetrics)
with OrcPartitionReaderBase {
private[this] var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -621,15 +620,12 @@ class GpuOrcPartitionReader(
} else {
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
}
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here (no downstream tasks should
// consume this empty batch)
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,6 @@ class ParquetPartitionReader(
with ParquetPartitionReaderBase {
jlowe marked this conversation as resolved.
Show resolved Hide resolved

private val blockIterator: BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered
private var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -1471,14 +1470,12 @@ class ParquetPartitionReader(
batch = readBatch()
}
}
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here (no downstream tasks should
// consume this empty batch)
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ abstract class GpuTextBasedPartitionReader(
private val lineReader = new HadoopFileLinesReader(partFile, lineSeparatorInRead, conf)
private var isFirstChunkForIterator: Boolean = true
private var isExhausted: Boolean = false
private var isFirstBatch: Boolean = true
private var maxDeviceMemory: Long = 0

metrics = execMetrics
Expand Down Expand Up @@ -220,14 +219,12 @@ abstract class GpuTextBasedPartitionReader(
} else {
readBatch()
}
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))
}
isFirstBatch = false
}

// NOTE: At this point, the task may not have yet acquired the semaphore if `batch` is `None`.
// We are not acquiring the semaphore here since this next() is getting called from
// the `PartitionReaderIterator` which implements a standard iterator pattern, and
// advertises `hasNext` as false when we return false here (no downstream tasks should
// consume this empty batch)
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ case class GpuHashAggregateMetrics(
computeAggTime: GpuMetric,
concatTime: GpuMetric,
sortTime: GpuMetric,
semWaitTime: GpuMetric,
spillCallback: SpillCallback)

/** Utility class to convey information on the aggregation modes being used */
Expand Down Expand Up @@ -484,6 +485,11 @@ class GpuHashAggregateIterator(
val aggregateFunctions = aggregateExpressions.map(_.aggregateFunction)
val defaultValues =
aggregateFunctions.flatMap(_.initialValues)
// We have to grab the semaphore in this scenario, since this is a reduction that produces
// rows on the GPU out of empty input, meaning that if a batch has 0 rows, a new single
// row is getting created with 0 as the count (if count is the operation), and other default
// values.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics.semWaitTime)
val vecs = defaultValues.safeMap { ref =>
withResource(GpuScalar.from(ref.asInstanceOf[GpuLiteral].value, ref.dataType)) {
scalar => GpuColumnVector.from(scalar, 1, ref.dataType)
Expand Down Expand Up @@ -1392,6 +1398,7 @@ case class GpuHashAggregateExec(
computeAggTime = gpuLongMetric(AGG_TIME),
concatTime = gpuLongMetric(CONCAT_TIME),
sortTime = gpuLongMetric(SORT_TIME),
semWaitTime = gpuLongMetric(SEMAPHORE_WAIT_TIME),
makeSpillCallback(allMetrics))

// cache in a local variable to avoid serializing the full child plan
Expand Down