Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GpuSubqueryBroadcast for DPP [databricks] #4150

Merged
merged 17 commits into from
Dec 17, 2021

Conversation

sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Nov 18, 2021

Signed-off-by: sperlingxx lovedreamf@gmail.com

Closes #4027

Current PR is to support reusing broadcast exchange for SubqueryBroadcast (which inserted by DPP) on the GPU. To achieve this goal, following steps are essential:

  1. Transforms dynamic partition filters of FileSourceScanExec. Captures, tags and converts SubqueryBroadcastExec inside DynamicPruningExpression. We shall build independent RapidsMeta for SubqueryBroadcastExec inside dynamic partiton filters rather than adding them as the children of scan meta, because it is possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters are on the GPU. And vice versa.
  2. Converts SubqueryBroadcastExec and underlying exchange to GPU if possible. The rule PlanDynamicPruningFilters will insert SubqueryBroadcastExec if there exists available broadcast exchange for reuse. The plan stack looks like:
    SubqueryBroadcast -> BroadcastExchange -> executedPlan
    Since the GPU overrides rule has been applied on executedPlan, if the wrapped subquery can run on the GPU, the plan stack becomes:
    SubqueryBroadcast -> BroadcastExchange -> GpuColumnarToRow -> GpuPlanStack...
    To reuse BroadcastExchange on the GPU, we shall transform above pattern into:
    GpuSubqueryBroadcast -> GpuBroadcastExchange -> GpuPlanStack...
  3. Runs GpuSubqueryBroadcastExec, which is similiar to GpuBroadcastToCpuExec. The major difference is whether to reuse existing GpuBroadcastExec.

In addition, current PR can only reuse GpuBroadcast when AQE is off. We need to modify GpuBroadcastToCpuExec to reuse
GpuBroadcast with AQE on.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx sperlingxx changed the base branch from branch-21.12 to branch-22.02 November 24, 2021 04:12
@sperlingxx
Copy link
Collaborator Author

build

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@pxLi pxLi changed the title Support GpuSubqueryBroadcast for DPP Support GpuSubqueryBroadcast for DPP [databricks] Nov 26, 2021
@pxLi
Copy link
Collaborator

pxLi commented Nov 26, 2021

build

@pxLi
Copy link
Collaborator

pxLi commented Nov 26, 2021

add [databricks] to enable CI stages

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn)
@pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ AQE and DPP can be both enabled")
def test_dpp_reuse_broadcast_exchange(aqe_on, store_format, s_index, spark_tmp_table_factory):
def test_dpp_reuse_broadcast_exchange_aqe_on(store_format, s_index, spark_tmp_table_factory):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we miss the corresponding test with AQE off, or is that covered in some other existing test and was not really needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named the test of AQE off as test_dpp_reuse_broadcast_exchange. I appended the suffix _aqe_off to clarify the intention of the tests.

# When AQE enabled, the broadcast exchange can not be reused in current, because spark-rapids
# will plan GpuBroadcastToCpu for exchange reuse. Meanwhile, the original broadcast exchange is
# simply replaced by GpuBroadcastExchange. Therefore, the reuse can not work since
# GpuBroadcastToCpu is not semantically equal to GpuBroadcastExchange.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that we should fix? Should be combine the two classes together so that they are the same thing and it does not matter if you are reading the data on the CPU or the GPU?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. IMO, with the help of the new method SerializeConcatHostBuffersDeserializeBatch.hostBatches, we can change the role of GpuBroadcastToCpu, making it as a wrapper of GpuBroadcastExchangeExec. Therefore, we can reuse the GpuBroadcast in terms of serialized host buffers. I tried in my local environment, it works. I would like to create a separate PR for this change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good plan.

willNotWorkOnGpu("underlying BroadcastExchange can not run on the GPU.")
}
case _ =>
willNotWorkOnGpu("no available BroadcastExchange for reuse.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really confused by this. We cannot run the SubqueryBroadcastExec on the GPU because "no available BroadcastExchange for reuse."? Can we have a better explanation? Our end users will read this and get confused. I am also a little concerned that they will think it is an issue that they need to try and fix.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I refined the reason here.

override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil

override def tagPlanForGpu(): Unit = s.child match {
case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExecParent) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am more than a little confused. When exactly does this happen? Moving the comments from below up closer to the top would be good.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// GpuSubqueryBroadcast -> GpuBroadcastExchange -> GpuPlanStack...
override def convertToGpu(): GpuExec = s.child match {
case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExecParent) =>
val exMeta = new GpuBroadcastMeta(ex.copy(child = c2r.child), conf, p, r)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we do this twice. Once to tag and once here. It would be nice if we could cache it so we are not wasting work in the common case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refined.

SQLExecution.withExecutionId(sparkSession, executionId) {
withResource(new NvtxWithMetrics("broadcast collect", NvtxColor.GREEN,
collectTime)) { _ =>
val serBatch = child.executeBroadcast[SerializeConcatHostBuffersDeserializeBatch]().value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is running on the driver, but assumes that it has access to a GPU. It does not. We have to do any/all transformation on the CPU.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @revans2, I overlooked the access of GPU at the first time. I refactored the implementation. For now, the GpuSubqueryBroadcast is entirely on host.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please try to test this on the YARN cluster or some place where there is no GPU and ideally no CUDA on the nodes that are running the driver? I just want to be sure that we don't accidentally initialize the CUDA context when we try to touch the HostColumnVectors. I think in the other places we only touched buffers.

Copy link
Collaborator Author

@sperlingxx sperlingxx Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @revans2, I ran a spark yarn test with a driver image which didn't contain NVIDIA-driver and CUDA. The GpuSubqueryBroadcast didn't throw any exception.

@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx
Copy link
Collaborator Author

build

revans2
revans2 previously approved these changes Dec 7, 2021
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better than we have today, but because this is related to DPP and AQE I really would like more eyes on this. @jlowe and @andygrove could you also take a look?

# When AQE enabled, the broadcast exchange can not be reused in current, because spark-rapids
# will plan GpuBroadcastToCpu for exchange reuse. Meanwhile, the original broadcast exchange is
# simply replaced by GpuBroadcastExchange. Therefore, the reuse can not work since
# GpuBroadcastToCpu is not semantically equal to GpuBroadcastExchange.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good plan.

@sameerz sameerz added performance A performance related task/issue and removed performance A performance related task/issue labels Dec 10, 2021
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@andygrove
Copy link
Contributor

I think this is better than we have today, but because this is related to DPP and AQE I really would like more eyes on this. @jlowe and @andygrove could you also take a look?

Sorry, I missed this notification. I am going to review this today.

Copy link
Contributor

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I would like to review the follow-on PR related to refactoring how we handle GpuBroadcastToCpu as well.

@sperlingxx sperlingxx merged commit 0953911 into NVIDIA:branch-22.02 Dec 17, 2021
@sperlingxx sperlingxx deleted the gpu_subquery_broadcast branch December 17, 2021 01:45
@tgravescs tgravescs mentioned this pull request Dec 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Support SubqueryBroadcast on GPU to enable exchange reuse during DPP
5 participants