Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44897][SQL] Propagating local properties to subquery broadcast exec #42587

Conversation

ChenMichael
Copy link

@ChenMichael ChenMichael commented Aug 21, 2023

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-32748 previously proposed propagating these local properties to the subquery broadcast exec threads but was then reverted since it was said that local properties would already be propagated to the broadcast threads.
I believe this is not always true. In the scenario where a separate BroadcastExchangeExec is the first to compute the broadcast, this is fine. However, in the scenario where the SubqueryBroadcastExec is the first to compute the broadcast, then the local properties that are propagated to the broadcast threads would not have been propagated correctly. This is because the local properties from the subquery broadcast exec were not propagated to its Future thread.
It is difficult to write a unit test that reproduces this behavior because usually BroadcastExchangeExec is the first computing the broadcast variable. However, by adding a Thread.sleep(10) to SubqueryBroadcastExec.doPrepare after relationFuture is initialized, the added test will consistently fail.

Why are the changes needed?

Local properties are not propagated correctly to SubqueryBroadcastExec

Does this PR introduce any user-facing change?

No

How was this patch tested?

Following test can reproduce the bug and test the solution by adding sleep to SubqueryBroadcastExec.doPrepare

protected override def doPrepare(): Unit = {
    relationFuture
    Thread.sleep(10)
}
    withSQLConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD.key -> "1") {
      withTable("a", "b") {
        val confKey = "spark.sql.y"
        val confValue1 = UUID.randomUUID().toString()
        val confValue2 = UUID.randomUUID().toString()
        Seq((confValue1, "1")).toDF("key", "value")
          .write
          .format("parquet")
          .partitionBy("key")
          .mode("overwrite")
          .saveAsTable("a")
        val df1 = spark.table("a")

        def generateBroadcastDataFrame(confKey: String, confValue: String): Dataset[String] = {
          val df = spark.range(1).mapPartitions { _ =>
            Iterator(TaskContext.get.getLocalProperty(confKey))
          }.filter($"value".contains(confValue)).as("c")
          df.hint("broadcast")
        }

        // set local property and assert
        val df2 = generateBroadcastDataFrame(confKey, confValue1)
        spark.sparkContext.setLocalProperty(confKey, confValue1)
        val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", $"c.value")
        val checks = checkDF.collect()
        assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))

        // change local property and re-assert
        Seq((confValue2, "1")).toDF("key", "value")
          .write
          .format("parquet")
          .partitionBy("key")
          .mode("overwrite")
          .saveAsTable("b")
        val df3 = spark.table("b")
        val df4 = generateBroadcastDataFrame(confKey, confValue2)
        spark.sparkContext.setLocalProperty(confKey, confValue2)
        val checks2DF = df3.join(df4).where($"b.key" === $"c.value").select($"b.key", $"c.value")
        val checks2 = checks2DF.collect()
        assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
        assert(checks2.nonEmpty)
      }
    }
  }

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Aug 21, 2023
@ChenMichael ChenMichael force-pushed the SPARK-44897-local-property-propagation-to-subquery-broadcast-exec branch from 72c3b5f to cb71086 Compare August 21, 2023 18:54
@ChenMichael ChenMichael force-pushed the SPARK-44897-local-property-propagation-to-subquery-broadcast-exec branch from cb71086 to 62b940c Compare August 21, 2023 20:01
@ChenMichael ChenMichael marked this pull request as draft August 21, 2023 20:02
@ChenMichael ChenMichael marked this pull request as ready for review August 22, 2023 23:03
@ChenMichael
Copy link
Author

ChenMichael commented Aug 24, 2023

cc @cloud-fan, @maropu
Is there some mechanism that is guaranteeing the separate BroadcastExchangeExec will be computing the relationFuture instead of the child of the SubqueryBroadcastExec?

@HyukjinKwon HyukjinKwon changed the title [SPARK-44897] - Propagating local properties to subquery broadcast exec [SPARK-44897][SQL] Propagating local properties to subquery broadcast exec Aug 25, 2023
@@ -191,6 +191,52 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
assert(checks2.forall(_.toSeq == Seq(true, true)))
}
}

test("SPARK-44897 propagate local properties to subquery broadcast execuction thread") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this test can't reproduce the bug directly, we can put it in the PR descriptions with instructions about the extra change to reproduce the bug (add sleep).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Does it make sense to leave this test in the code still or do you think putting it in the description with instructions is enough

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the test case from the code, @ChenMichael . PR description (with your reproducible instruction is enough).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.5!

@cloud-fan cloud-fan closed this in 4a48562 Aug 28, 2023
cloud-fan pushed a commit that referenced this pull request Aug 28, 2023
… exec

### What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-32748 previously proposed propagating these local properties to the subquery broadcast exec threads but was then reverted since it was said that local properties would already be propagated to the broadcast threads.
I believe this is not always true. In the scenario where a separate `BroadcastExchangeExec` is the first to compute the broadcast, this is fine. However, in the scenario where the `SubqueryBroadcastExec` is the first to compute the broadcast, then the local properties that are propagated to the broadcast threads would not have been propagated correctly. This is because the local properties from the subquery broadcast exec were not propagated to its Future thread.
It is difficult to write a unit test that reproduces this behavior because usually `BroadcastExchangeExec` is the first computing the broadcast variable. However, by adding a `Thread.sleep(10)` to `SubqueryBroadcastExec.doPrepare` after `relationFuture` is initialized, the added test will consistently fail.

### Why are the changes needed?
Local properties are not propagated correctly to `SubqueryBroadcastExec`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Following test can reproduce the bug and test the solution by adding sleep to `SubqueryBroadcastExec.doPrepare`
```
protected override def doPrepare(): Unit = {
    relationFuture
    Thread.sleep(10)
}
```

```test("SPARK-44897 propagate local properties to subquery broadcast execuction thread") {
    withSQLConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD.key -> "1") {
      withTable("a", "b") {
        val confKey = "spark.sql.y"
        val confValue1 = UUID.randomUUID().toString()
        val confValue2 = UUID.randomUUID().toString()
        Seq((confValue1, "1")).toDF("key", "value")
          .write
          .format("parquet")
          .partitionBy("key")
          .mode("overwrite")
          .saveAsTable("a")
        val df1 = spark.table("a")

        def generateBroadcastDataFrame(confKey: String, confValue: String): Dataset[String] = {
          val df = spark.range(1).mapPartitions { _ =>
            Iterator(TaskContext.get.getLocalProperty(confKey))
          }.filter($"value".contains(confValue)).as("c")
          df.hint("broadcast")
        }

        // set local property and assert
        val df2 = generateBroadcastDataFrame(confKey, confValue1)
        spark.sparkContext.setLocalProperty(confKey, confValue1)
        val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", $"c.value")
        val checks = checkDF.collect()
        assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))

        // change local property and re-assert
        Seq((confValue2, "1")).toDF("key", "value")
          .write
          .format("parquet")
          .partitionBy("key")
          .mode("overwrite")
          .saveAsTable("b")
        val df3 = spark.table("b")
        val df4 = generateBroadcastDataFrame(confKey, confValue2)
        spark.sparkContext.setLocalProperty(confKey, confValue2)
        val checks2DF = df3.join(df4).where($"b.key" === $"c.value").select($"b.key", $"c.value")
        val checks2 = checks2DF.collect()
        assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
        assert(checks2.nonEmpty)
      }
    }
  }
  ```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42587 from ChenMichael/SPARK-44897-local-property-propagation-to-subquery-broadcast-exec.

Authored-by: Michael Chen <mike.chen@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4a48562)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@ChenMichael ChenMichael deleted the SPARK-44897-local-property-propagation-to-subquery-broadcast-exec branch September 11, 2023 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants