Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] TPC-DS Query-93 on gpu is slow #622

Closed
chenrui17 opened this issue Aug 28, 2020 · 30 comments
Closed

[QST] TPC-DS Query-93 on gpu is slow #622

chenrui17 opened this issue Aug 28, 2020 · 30 comments
Assignees
Labels
question Further information is requested

Comments

@chenrui17
Copy link

this is follow on rapidsai/cudf#6107
@jlowe I will use nsight system to profile this query later and upload the file

@chenrui17 chenrui17 added ? - Needs Triage Need team to review and classify question Further information is requested labels Aug 28, 2020
@jlowe jlowe removed the ? - Needs Triage Need team to review and classify label Aug 28, 2020
@jlowe jlowe self-assigned this Aug 28, 2020
@jlowe jlowe changed the title TPC-DS Query-93 on gpu is slow [QST] [QST] TPC-DS Query-93 on gpu is slow Aug 28, 2020
@revans2
Copy link
Collaborator

revans2 commented Aug 28, 2020

@chenrui17 I have not been able to reproduce this yet, but I am going to keep trying.

I have tried with scale factor 200 on a single node on top of spark-3.0.0, but looking at the logs you put in the other issue, it looks like you are using 3.0.1 with adaptive query execution enabled and a 1TB data set.

But the detailed log you gave was for the CPU run, so I am not sure if these configs were the same for both runs.

    "spark.sql.adaptive.coalescePartitions.initialPartitionNum": "3000",
    "spark.sql.files.maxPartitionBytes": "1024m",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.shuffle.partitions": "3000"

Was the data partitioned? This can have a huge impact on GPU performance because of small files.

@revans2
Copy link
Collaborator

revans2 commented Aug 28, 2020

I have been playing around with AQE with this query and I am seeing a lot of slowness when reading the input data because it highly partitions the output data. The slowness you are seeing could be related to fetching lots of small partitions. If you are running with 3000 partitions on the GPU side it could be related to this.

@chenrui17
Copy link
Author

@revans2 i confirmed that the data is partitionned

@chenrui17
Copy link
Author

@chenrui17 I have not been able to reproduce this yet, but I am going to keep trying.

I have tried with scale factor 200 on a single node on top of spark-3.0.0, but looking at the logs you put in the other issue, it looks like you are using 3.0.1 with adaptive query execution enabled and a 1TB data set.

But the detailed log you gave was for the CPU run, so I am not sure if these configs were the same for both runs.

    "spark.sql.adaptive.coalescePartitions.initialPartitionNum": "3000",
    "spark.sql.files.maxPartitionBytes": "1024m",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.shuffle.partitions": "3000"

Was the data partitioned? This can have a huge impact on GPU performance because of small files.

it looks like you are using 3.0.1 with adaptive query execution enabled and a 1TB data set.

in fact, i use a 100GB data set ,why you say i use a 1TB data set . and about AQE configuration , i will attemp to turn off , About AQE, I'll try to turn it off for testing

@chenrui17
Copy link
Author

chenrui17 commented Aug 31, 2020

I have been playing around with AQE with this query and I am seeing a lot of slowness when reading the input data because it highly partitions the output data. The slowness you are seeing could be related to fetching lots of small partitions. If you are running with 3000 partitions on the GPU side it could be related to this.

about "spark.sql.shuffle.partitions " , I found that it did affect performance , but if i set it smaller , it often OOM on big data set ,like 1TB data set, even I use concurrentTask=1 ,and rapids.BatchSize=256m,but it also OOM to TPC-DS query 14a

@revans2
Copy link
Collaborator

revans2 commented Aug 31, 2020

in fact, i use a 100GB data set ,why you say i use a 1TB data set . and about AQE configuration , i will attemp to turn off , About AQE, I'll try to turn it off for testing

In the log file you attached the input path included TB in it so I assumed it was a 1TB data set.

I ran with a partitioned 100 GB data set on a single 16GB-V100. I can try and play around with the data size to see what I ended up with, but I had a much larger batch size then that too. It sounds like you might be running into issues with PTDS and memory fragmentation in RMM.

@jlowe what do you think. Because I have been running with a 1GB batch size a 1 GB input partition size, and I was able to run with a shuffle.partitions of 2 with PTDS disabled.

@jlowe
Copy link
Member

jlowe commented Aug 31, 2020

We've definitely seen increased OOM issues with per-thread default stream enabled, which lead us to turned that off in the recent cudf-0.15-SNAPSHOT builds.

@chenrui17 I would recommend retrying with smaller partitions with a cudf built with PTDS disabled. If you are getting the latest cudf-0.15-SNAPSHOT builds from Sonatype then it should have PTDS off. If you're building cudf on your own, make sure you do not specify PER_THREAD_DEFAULT_STREAM or specify PER_THREAD_DEFAULT_STREAM=OFF in both the cpp and java builds.

@chenrui17
Copy link
Author

In the log file you attached the input path included TB in it so I assumed it was a 1TB data set.

The amount of data displayed in the path is inconsistent with the actual data data size ;
@jlowe That's right ,I use the cudf jar from Sonatype recently , and before ,I use rapids-0.1 to run tpc-ds query, Basically, they can pass normally. so , now ,if I use the latest cudf-0.15-SNAPSHOT jar from Sonatype , its PTDS is off ?

@jlowe
Copy link
Member

jlowe commented Aug 31, 2020

if I use the latest cudf-0.15-SNAPSHOT jar from Sonatype , its PTDS is off ?

Yes, it has been disabled in the snapshot builds since the 20200828.210405-58 snapshot.

@chenrui17
Copy link
Author

@revans2 In addition, I would like to ask a question, do I need to switch the spark version from spark 3.0.1 to 3.0.0

@chenrui17
Copy link
Author

chenrui17 commented Sep 1, 2020

@jlowe my nsys file and history ui is about 100m + , how can i send it to you .
can you access this ?
url:https://pan.baidu.com/s/1BTnvHm6wbNpfVpEptuIIlA password:j4ra

@revans2
Copy link
Collaborator

revans2 commented Sep 1, 2020

@chenrui17 you should not need to switch from 3.0.1 to 3.0.0. We "support" both of them, but until 3.0.1 is released it is a moving target, but the Rapids Accelerator requires 3.0.1+ to be able to support Adaptive Query Execution.

@jlowe
Copy link
Member

jlowe commented Sep 3, 2020

@chenrui17 I looked at the nsys trace. I don't see any task taking 36 seconds, as reported in the GPU join metrics in the history UI. I'm guessing this trace was of an executor that did not run the task that hit that long build time, so it's not necessarily representative of what happened on that task.

None of the build or joins took very long in the trace once the task started processing. I did see what appears to be tasks spread out in the second stage, and I'm assuming this is waiting for the GPU semaphore. I can't know for sure since Java NVTX ranges were missing from the trace. To add them, you would need to build your own cudf jar, as the snapshot-published version has NVTX ranges turned off for performance (i.e.: published snapshot builds libcudf with -DUSE_NVTX=OFF).

We've tried to reproduce this behavior locally but have not seen such large discrepancies between the join build time metric and one of the coalesce batch collect metrics above it.

@jlowe
Copy link
Member

jlowe commented Sep 3, 2020

I spoke too soon. @revans2 was able to reproduce the issue, and we discovered that GpuCoalesceBatches can call its input iterator's hasNext() call outside of the block that measures the time for collectTime(). That hasNext() call chain is what's grabbing the GPU semaphore and leading to the large reported times in the join buildTime but not being reported in the collectTime of GpuCoalesceBatches.

@chenrui17 to answer your original question, I'm fairly confident this large amount of time is mostly time the task spent waiting for its turn on the GPU.

@chenrui17
Copy link
Author

I'm fairly confident this large amount of time is mostly time the task spent waiting for its turn on the GPU.

I mean what you said is task time is mainly wating for semaphore to fight for the right to use gpu. ?
if so ,the bottleneck is gpu resource ? if i use 2 gpu like T4, Performance will be significantly improved ?

@jlowe
Copy link
Member

jlowe commented Sep 4, 2020

The main bottleneck during this portion of the query is waiting to acquire the semaphore. However the tasks owning the semaphore are not making full use of the GPU. The main portion of time they are spending is decompressing shuffle data on the CPU and copying it down to the GPU. They need to own the GPU semaphore during this phase because they are placing data onto the GPU. The whole point of the GPU semaphore is to prevent too many tasks from placing data onto the GPU at the same time and exhausting the GPU's memory.

Essentially the main bottleneck in that stage is dealing with the shuffle data and transfer to the GPU, because that's what's taking so long for the tasks holding the GPU semaphore to release it. Once the shuffle data is loaded on the GPU, the rest of the stage processing is quite fast. The RapidsShuffleManager was designed explicitly to target this shuffle problem, as it tries to keep shuffle targets in GPU memory and not rely on the CPU for compression/decompression which can be a bottleneck. Unfortunately there are a number of issues with RapidsShuffleManager that prevent it from working well in all situations, but we're actively working on improving it. Our goal is to eventually have that shuffle manager be the preferred shuffle when using the RAPIDS Accelerator, even if the cluster does not have RDMA-capable hardware.

if i use 2 gpu like T4, Performance will be significantly improved ?

If you add more GPUs (and thus more executors, since an executor with the plugin can only control 1 GPU), yes, performance should be improved to a point. This would be similar to adding executors to a CPU job. If you have enough GPUs in your cluster so that CPU_cores_per_executor == concurrent_GPU_tasks then no task will ever wait on the GPU semaphore.

@JustPlay
Copy link

JustPlay commented Sep 6, 2020

The main bottleneck during this portion of the query is waiting to acquire the semaphore. However the tasks owning the semaphore are not making full use of the GPU. The main portion of time they are spending is decompressing shuffle data on the CPU and copying it down to the GPU. They need to own the GPU semaphore during this phase because they are placing data onto the GPU. The whole point of the GPU semaphore is to prevent too many tasks from placing data onto the GPU at the same time and exhausting the GPU's memory.

Essentially the main bottleneck in that stage is dealing with the shuffle data and transfer to the GPU, because that's what's taking so long for the tasks holding the GPU semaphore to release it. Once the shuffle data is loaded on the GPU, the rest of the stage processing is quite fast. The RapidsShuffleManager was designed explicitly to target this shuffle problem, as it tries to keep shuffle targets in GPU memory and not rely on the CPU for compression/decompression which can be a bottleneck. Unfortunately there are a number of issues with RapidsShuffleManager that prevent it from working well in all situations, but we're actively working on improving it. Our goal is to eventually have that shuffle manager be the preferred shuffle when using the RAPIDS Accelerator, even if the cluster does not have RDMA-capable hardware.

if i use 2 gpu like T4, Performance will be significantly improved ?

If you add more GPUs (and thus more executors, since an executor with the plugin can only control 1 GPU), yes, performance should be improved to a point. This would be similar to adding executors to a CPU job. If you have enough GPUs in your cluster so that CPU_cores_per_executor == concurrent_GPU_tasks then no task will ever wait on the GPU semaphore.

So, does it only a time counting problem? what it the root cause why tpc-ds 93 is so slow?

@JustPlay
Copy link

JustPlay commented Sep 6, 2020

The main bottleneck during this portion of the query is waiting to acquire the semaphore. However the tasks owning the semaphore are not making full use of the GPU. The main portion of time they are spending is decompressing shuffle data on the CPU and copying it down to the GPU

Essentially the main bottleneck in that stage is dealing with the shuffle data and transfer to the GPU, because that's what's taking so long for the tasks holding the GPU semaphore to release it. Once the shuffle data is loaded on the GPU, the rest of the stage processing is quite fast.

I think this is common for most query

but why waiting-on gpu semaphore hurt the performance?
I think,only when the time needed for cpu processing / cpu tasks that can run in parallel is bigger then the time needed for gpu processing, the gpu will go into starvation; (the cpu part worked as producer, the gpu part worked as consumer; and in this case cpu tasks that can run in parallel equals to concurrentGpuTasks because they are all within the scope of gpu semaphore, but since we use multi-core cpu, the cpu part can run in parallel, and the gpu part will still run in serial, because no PTDS)

increasing concurrentGpuTasks shoud improve the performance significantly if all the above analysis (include your and my) hold true

BTW:
based on our test, use bigger concurrentGpuTasks can improve the performance (concurrent=2, 4, 6, the query time is ~148s, ~114s, ~89s; concurrent=6 has high risc of OOM)
So, the RMM optimization (like reducing fragment) to lower the OOM risc shoud be in a higher priority,we need higher concurrentGpuTasks as much as possible

@JustPlay
Copy link

JustPlay commented Sep 8, 2020

@jlowe

Some questions,

  1. why GpuCoalesceBatches's number of output columnar batches is always same as shuffle partation when GpuCoalesceBatches is after GpuColumnarExchnage (the reduce stage)
  2. why some query has a GpuCustomShuffleReader between GpuColumnarExchange and GpuCoalesceBatches while others not?

BTW:
// spark-shell
--conf spark.sql.shuffle.partitions=n

// spark-defaults.conf
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.initialPartitionNum 64 // always set to be the same as spark.sql.shuffle.partitions
spark.sql.adaptive.coalescePartitions.minPartitionNum 1
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 2147483648
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5
spark.sql.adaptive.advisoryPartitionSizeInBytes 2147483648

@revans2
Copy link
Collaborator

revans2 commented Sep 8, 2020

@JustPlay

  1. GpuCoalesceBatches is trying to do one of two things.
    a. It is either trying to build up a larger batch so the GPU can process it efficiently. The target is to hit spark.rapids.sql.batchSizeBytes bytes, but it may cut the batch off early if it detects that a string column would be too long, etc. The default value for this is 2GB, and rarely does a single task get more than 2GB of data to process, unless there is data skew.
    b. It is trying to build a single batch so some types of processing can work correctly. This typically happens for sort, the build side for a hash join, and window functions. Because these require a single batch then you will always see a single batch, at least until we can improve sort and some of the others to work on data that is larger than would fit in GPU memory.

  2. GpuCustomShuffleReader is a replacement for the spark CustomShuffleReader which shows up as a part of some AQE shuffles. According to the docs it is A wrapper of shuffle query stage, which follows the given partition arrangement. I think that means it allows AQE to mess with shuffle a bit to improve performance.

@JustPlay
Copy link

JustPlay commented Sep 9, 2020

  1. GpuCoalesceBatches is trying to do one of two things.
    a. It is either trying to build up a larger batch so the GPU can process it efficiently. The target is to hit spark.rapids.sql.batchSizeBytes bytes,

I can not find the code snippet(s) used to compare the batch's size (or the coalesced batch's size) with spark.rapids.sql.batchSizeBytes (sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala); @revans2

@JustPlay
Copy link

JustPlay commented Sep 9, 2020


// pic1
image


// pic2
image


// pic3
image


// pic4
image


// pic5
image


Two questions:

why GpuCoalesceBatches exists only in one side of GpuShuffledHashJoin not both (e.g. pic1, pic2, pic3, pic3), while in some query it exists at both side (e.g pic5)

what is the strategy for judging whether to insert a GpuCoalesceBatches node?

@revans2 @jlowe

@revans2
Copy link
Collaborator

revans2 commented Sep 9, 2020

It should be on both sides, and is a bug if it is not. What query did you run and what config settings did you use for this? My guess is that it is probably related to AQE in some way, but I am just speculating at this point.

@revans2
Copy link
Collaborator

revans2 commented Sep 9, 2020

I can not find the code snippet(s) used to compare the batch's size (or the coalesced batch's size) with spark.rapids.sql.batchSizeBytes (sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala);

while (numRows < Int.MaxValue && onDeck.isEmpty && iter.hasNext) {
val cb = iter.next()
val nextRows = cb.numRows()
numInputBatches += 1
// filter out empty batches
if (nextRows > 0) {
numInputRows += nextRows
val nextColumnSizes = getColumnSizes(cb)
val nextBytes = nextColumnSizes.sum
// calculate the new sizes based on this input batch being added to the current
// output batch
val wouldBeRows = numRows + nextRows
val wouldBeBytes = numBytes + nextBytes
val wouldBeColumnSizes = columnSizes.zip(nextColumnSizes).map(pair => pair._1 + pair._2)
// CuDF has a hard limit on the size of string data in a column so we check to make
// sure that the string columns each use no more than Int.MaxValue bytes. This check is
// overly cautious because the calculated size includes the offset bytes. When nested
// types are supported, this logic will need to be enhanced to take offset and validity
// buffers into account since they could account for a larger percentage of overall
// memory usage.
val wouldBeStringColumnSizes =
stringFieldIndices.map(i => getColumnDataSize(cb, i, wouldBeColumnSizes(i)))
.zip(stringColumnSizes)
.map(pair => pair._1 + pair._2)
if (wouldBeRows > Int.MaxValue) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} rows. At least $wouldBeRows are in" +
s" this partition. Please try increasing your partition count.")
}
onDeck = Some(cb)
} else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) {
onDeck = Some(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
onDeck = Some(cb)
} else if (wouldBeStringColumnSizes.exists(size => size > Int.MaxValue)) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} bytes in a single string column." +
s" At least ${wouldBeStringColumnSizes.max} are in a single column in this" +
s" partition. Please try increasing your partition count.")
}
onDeck = Some(cb)
} else {
addBatch(cb)
numBatches += 1
numRows = wouldBeRows
numBytes = wouldBeBytes
columnSizes = wouldBeColumnSizes
stringColumnSizes = wouldBeStringColumnSizes
}

The code is in a generic place because we use it for both the host size and device side concat.

The target size is inserted in GpuTransitionOverrides so an operation can ask for a coalesce batch to be inserted after it, and also an operation can ask for a coalesce to be inserted for one of its inputs. Then the code decides if there are multiple requests which one wins (right now it is which is larger).

I should also clarify that the only time you should not see a GpuCoalesceBatch after a shuffle is if the data right after the shuffle is being sent directly to the host side. In all other cases it should be inserted.

@JustPlay
Copy link

JustPlay commented Sep 9, 2020

It should be on both sides, and is a bug if it is not. What query did you run and what config settings did you use for this? My guess is that it is probably related to AQE in some way, but I am just speculating at this point.

pic1-4 with GpuCoalesceBatches exists only in one side with AQE = ON (TPC-DS 18, 56, 33, 14a, 10, ....)
pic5 with GpuCoalesceBatches on both side with AQE=OFF (i forget which query,i only remember AQE=OFF) @revans2

@JustPlay
Copy link

JustPlay commented Sep 9, 2020

It should be on both sides, and is a bug if it is not. What query did you run and what config settings did you use for this? My guess is that it is probably related to AQE in some way, but I am just speculating at this point.

pic1-4 with GpuCoalesceBatches exists only in one side with AQE = ON (TPC-DS 18, 56, 33, 14a, 10, ....)
pic5 with GpuCoalesceBatches on both side with AQE=OFF (i forget which query,i only remember AQE=OFF)

but if AQE really take effect,should there exists a GpuCustomShuffleReader node? in my pics, no such node @revans2

@jlowe
Copy link
Member

jlowe commented Sep 9, 2020

Since this issue originated as a question about slowness on query 93, I filed #698 to track the missing coalesce issue separately.

@andygrove
Copy link
Contributor

@JustPlay Yes, I would expect to see a GpuCustomShuffleReader after each shuffle exchange when AQE is enabled. I have just been trying to reproduce the issue with the missing coalesce step and was not able to reproduce it with the latest plugin code from branch-0.2 and Spark 3.0.1-rc3.

@jlowe
Copy link
Member

jlowe commented Oct 21, 2020

@chenrui17 is there anything left to answer or can this be closed?

@jlowe
Copy link
Member

jlowe commented Nov 19, 2020

Closing since there was no response. Please reopen or file a new question if needed.

@jlowe jlowe closed this as completed Nov 19, 2020
@NVIDIA NVIDIA locked and limited conversation to collaborators Apr 28, 2022
@sameerz sameerz converted this issue into discussion #5394 Apr 28, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

5 participants