Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a PrioritySemaphore to back the GpuSemaphore #11376

Merged
merged 3 commits into from
Aug 22, 2024

Conversation

zpuller
Copy link
Collaborator

@zpuller zpuller commented Aug 22, 2024

Closes #8301

This PR adds a PrioritySemaphore which has a similar interface to the java Semaphore, but also uses a priority queue to determine the order in which to wake blocked threads. It changes the existing GpuSemaphore, which was using a regular Semaphore underneath, to use this new version, specifying a priority based on when a given task most recently held the semaphore. This has the effect of reducing spill as tasks with data loaded on the GPU will be more likely to resume processing on the data before it's evicted by a new task.

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller
Copy link
Collaborator Author

zpuller commented Aug 22, 2024

build

revans2
revans2 previously approved these changes Aug 22, 2024
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits

Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller
Copy link
Collaborator Author

zpuller commented Aug 22, 2024

build

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, I just had a question about a test that is a non blocker.

@zpuller zpuller merged commit 35d2163 into NVIDIA:branch-24.10 Aug 22, 2024
42 of 43 checks passed
Comment on lines +61 to +64
while (!canAcquire(numPermits)) {
waitingQueue.enqueue(ThreadInfo(priority, condition))
condition.await()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have an issue here with spurious wakeups? Seems like we'll enqueue multiple ThreadInfo instances. Seems like we should do something like this to protect against it:

var queued = false
while (!canAcquire(numPermits)) {
  if (!queued) {
    waitingQueue.enqueue(ThreadInfo(priority, condition))
    queued = true
  }
  condition.await()
}

Copy link
Collaborator Author

@zpuller zpuller Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say spurious wakeups, do you mean something triggered by another thread releaseing, or by another means? In the former case, the original ThreadInfo is dequeued. In the latter, I think I agree with you, just not sure how that can happen exactly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spurious wakeups can theoretically occur anytime one waits on a condition variable. It's often OS dependent, but it's documented that it can happen and that programmers need to account for it. See the documentation for java.util.concurrent.locks.Condition.awaitNanos (which Condition.await points to) which mentions spurious wakeup as a possibility.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I can push a fix for that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch @jlowe

@binmahone
Copy link
Collaborator

Hi @zpuller @revans2

PrioritySemaphore is a great idea, I like it. I am just wondering which threads should have a higher priority, the most recent one which held the semaphore or the most ancient one? Currently the choice is the most recent one?

I'm comparing the two approaches, and I found that the choosing the most ancient one is better in terms of the spill problem, as we well as in terms of end to end time. I don't know why yet, but I think it's worth to investigate. It's also worth mentioning that I deliberately limit the size of spillStorageSize to aggravate spilling.

The scripts I'm using:

12:50:31 › cat ./work_0909_compare_semaphore_priority.sh
#!/bin/zsh


# run 2410 latest jar for three times
for i in {1..3}
do
       echo "reproduce spill problem, at 2410 latest" && bin/spark-shell    \
       --master 'local[8]'  --driver-memory 20g  --conf spark.rapids.sql.concurrentGpuTasks=2  \
       --conf spark.celeborn.client.shuffle.compression.codec=zstd --conf spark.io.compression.codec=zstd \
       --conf spark.rapids.memory.pinnedPool.size=10G --conf spark.rapids.memory.host.spillStorageSize=10G \
       --conf spark.sql.files.maxPartitionBytes=2g \
       --conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=true \
       --conf spark.plugins=com.nvidia.spark.SQLPlugin \
       --conf  spark.rapids.sql.metrics.level='DEBUG' \
       --conf spark.eventLog.enabled=true \
       --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
       --conf spark.celeborn.master.endpoints=10.19.129.151:9097 \
       --jars /home/hongbin/develop/spark-3.2.1-bin-hadoop2.7/rapids_jars/2410_by0909.jar -i temp.scala   2>&1 | tee spill_`date +'%Y-%m-%d-%H-%M-%S'`.output
done

# run modified 2410 latest jar (with priority mofified to favor ancient) for three times

for i in {1..3}
do
       echo "reproduce spill problem, at 2410 latest + favor ancient" && bin/spark-shell    \
       --master 'local[8]'  --driver-memory 20g  --conf spark.rapids.sql.concurrentGpuTasks=2  \
       --conf spark.celeborn.client.shuffle.compression.codec=zstd --conf spark.io.compression.codec=zstd \
       --conf spark.rapids.memory.pinnedPool.size=10G --conf spark.rapids.memory.host.spillStorageSize=10G \
       --conf spark.sql.files.maxPartitionBytes=2g \
       --conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=true \
       --conf spark.plugins=com.nvidia.spark.SQLPlugin \
       --conf  spark.rapids.sql.metrics.level='DEBUG' \
       --conf spark.eventLog.enabled=true \
       --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
       --conf spark.celeborn.master.endpoints=10.19.129.151:9097 \
       --jars /home/hongbin/develop/spark-3.2.1-bin-hadoop2.7/rapids_jars/2410_fresh.jar -i temp.scala   2>&1 | tee spill_`date +'%Y-%m-%d-%H-%M-%S'`.output
done

with temp.scala being:

13:04:50 › cat  temp.scala
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.rapids.sql.agg.singlePassPartialSortEnabled", false)

spark.time(spark.range(0, 3000000000L, 1, 8).selectExpr("cast(CAST(rand(0) * 1000000000 AS LONG) DIV 1 as string) as id", "id % 2 as data").groupBy("id").agg(count(lit(1)), avg(col("data"))).orderBy("id").show())

System.exit(0)

duration time snapshot:

20240909-130759

my code changes:

image

@revans2
Copy link
Collaborator

revans2 commented Sep 9, 2024

@binmahone

The original ideas was to have a consistent priority for all tasks threads. The spill framework uses task-id followed by thread-id to give priority to different threads. https://github.com/NVIDIA/spark-rapids-jni/blob/bb696ae944f286b3fe5eb5774a9abe921c1425a4/src/main/cpp/src/SparkResourceAdaptorJni.cpp#L135-L190 The idea was to do the same for tasks here as the semaphore only lets a task in or not, not individual threads.

The code here is using the timestamp of the last access for the priority, but I don't know how much work was done to see which would be the ideal situation. Could you also try to run your tests with using the task-id for the priority instead? so that it is consistent with the spill code? I just want to get a few more data points.

@binmahone
Copy link
Collaborator

binmahone commented Sep 10, 2024

I didn't consider the case where a task a multiple threads. So in my previous description I was using "task priority" and "thread priority" interchangeably.

Sure I can do the test, I'll update it later. Just to make sure we're on the same page, "the task-id for the priority" is very similar to my "favor ancient", right?

@revans2
Copy link
Collaborator

revans2 commented Sep 10, 2024

@binmahone yes the task-id as the priority should be very similar to "favor ancient". Spark should hand out task ids in ascending order, so in theory the oldest tasks should have the lowest ids. But because of I/O on reads/etc it is not guaranteed for it to actually end up as the first task that runs on the GPU. The main reason I would like this is because it is a super simpler way to have all operations that might want a priority to be consistent. i.e. if we want to pick a buffer to spill, then we can avoid spilling from tasks that would be first in line to get the GPU, or if we want to pick a task to pause to avoid spilling more memory, pausing a task that is going to be last in line for the GPU and is more likely to have its data spilled feels like something we would want to do. I just want to know for your use case what the impact to performance would be.

@binmahone
Copy link
Collaborator

binmahone commented Oct 8, 2024

@revans2 I have tested the version using task-id as the priority (please check the code at https://github.com/binmahone/spark-rapids/tree/241008_taskid_as_prioriry, hope I understood your intention correctly)

  • latest 2412 in red box
  • latest 2412 + "favor ancient" in green box
  • latest 2412 + "task-id as the priority" in blue box

image

It's a surprise that "task-id as the priority" has the worst performance. I haven't investigated why. Please let me know if you need anything else.

@binmahone
Copy link
Collaborator

Note that even though "favor ancient" outperforms the other two implementations in my single test, I'm still not sure if it's a better choice in general scenarios. What kind of scenarios should we consider before we can confidently make it as default?

@revans2
Copy link
Collaborator

revans2 commented Oct 8, 2024

Note that even though "favor ancient" outperforms the other two implementations in my single test, I'm still not sure if it's a better choice in general scenarios. What kind of scenarios should we consider before we can confidently make it as default?

The cases we care about all revolve around situations where tasks end up processing more than a single batch of data. We often call it the round robin problem because a task will get on the GPU; do some processing, but not finish completely; release the GPU so others tasks can get on the GPU leaving partial data in the GPUs memory; do some I/O and then try to get back onto the GPU again. Here are a few that I have seen that would be good to try and cover. It would be nice if we could have a defined repeatable set of tests/benchmarks because there are other related changes that we want to be able to try out. Ideally we can use the metrics from the history files to see how much data was spilled as a part of this too.

  1. A join explodes and for most tasks there are multiple output batches going into a file write or a shuffle.
  2. A large window operation where the cardinality of the partition by keys make it so that most tasks end up with multiple batches worth of data. The window operation itself should ideally be a running window operation just to make that processing simpler.
  3. A large aggregation where there is a lot of input data, meaning multiple batches per task, but the output data is less than a single batch in size.
  4. Regular NDS run to check for regressions in the happy path
  5. A project where there are multiple batches being processed per task, but all of the batches are independent of each other.
  6. A parquet read + filter where the parquet read itself is likely to explode in size to multiple batches, and the filter does not filter much out.

Those are the ones that come to my mind. I think we could come up with others, but the general idea is that we want a few different patters of processing.

@binmahone
Copy link
Collaborator

Note that even though "favor ancient" outperforms the other two implementations in my single test, I'm still not sure if it's a better choice in general scenarios. What kind of scenarios should we consider before we can confidently make it as default?

The cases we care about all revolve around situations where tasks end up processing more than a single batch of data. We often call it the round robin problem because a task will get on the GPU; do some processing, but not finish completely; release the GPU so others tasks can get on the GPU leaving partial data in the GPUs memory; do some I/O and then try to get back onto the GPU again. Here are a few that I have seen that would be good to try and cover. It would be nice if we could have a defined repeatable set of tests/benchmarks because there are other related changes that we want to be able to try out. Ideally we can use the metrics from the history files to see how much data was spilled as a part of this too.

  1. A join explodes and for most tasks there are multiple output batches going into a file write or a shuffle.
  2. A large window operation where the cardinality of the partition by keys make it so that most tasks end up with multiple batches worth of data. The window operation itself should ideally be a running window operation just to make that processing simpler.
  3. A large aggregation where there is a lot of input data, meaning multiple batches per task, but the output data is less than a single batch in size.
  4. Regular NDS run to check for regressions in the happy path
  5. A project where there are multiple batches being processed per task, but all of the batches are independent of each other.
  6. A parquet read + filter where the parquet read itself is likely to explode in size to multiple batches, and the filter does not filter much out.

Those are the ones that come to my mind. I think we could come up with others, but the general idea is that we want a few different patters of processing.

this is a quite comprehensive list, thanks! We'll update to you when we have progress on this benchmark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] semaphore prioritization
5 participants