Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3861][SQL] Avoid rebuilding hash tables for broadcast joins on each partition #2727

Closed
wants to merge 11 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Oct 9, 2014

No description provided.

BroadcastHashJoin builds a new hash table for each partition. We can build it once per node and reuse the hash table.
…sh-1

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21520/Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 9, 2014

QA tests have started for PR 2727 at commit 18eb214.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 9, 2014

QA tests have finished for PR 2727 at commit 18eb214.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 9, 2014

QA tests have started for PR 2727 at commit 7fcffb5.

  • This patch merges cleanly.


override def get(key: Row) = {
val v = hashTable.get(key)
if (v eq null) null else CompactBuffer(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that cause too many CompactBuffer object created if there are so many duplicated records in stream side with single match in build side? Or the GeneralHashedRelation performs great enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have a new operator that specializes for unique key joins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean for each row in stream side, will create a CompactBuffer instance if it finds a matched row in build side, this probably too heavy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. What I meant was we will add a new operator that specializes for unique key joins, and that operator would just call getValue, bypassing the creation of CompactBuffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even so can't we reuse the same compact buffer? Also should the semantic be to return null or an empty buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be null, since that's what a normal hashmap would return, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really a normal hashmap its for joins, and an empty compact buffer seems like a pretty clear way to indicate no matches found. Then you don't have to special case null on the other side. You just join with whatever rows are returned.

Though I guess that doesn't work great with your getValue idea below....

@SparkQA
Copy link

SparkQA commented Oct 9, 2014

QA tests have finished for PR 2727 at commit 7fcffb5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21523/Test PASSed.

@rxin
Copy link
Contributor Author

rxin commented Oct 9, 2014

Ok I updated the code to reuse the CompactBuffer.

@SparkQA
Copy link

SparkQA commented Oct 9, 2014

QA tests have started for PR 2727 at commit 97626a1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 10, 2014

QA tests have finished for PR 2727 at commit 97626a1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21557/Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21565/Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 10, 2014

QA tests have started for PR 2727 at commit 9c7b1a2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 10, 2014

QA tests have finished for PR 2727 at commit 9c7b1a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor Author

rxin commented Oct 10, 2014

I reverted the compact buffer reuse because it is not safe to do that with JoinedRow.

@rxin
Copy link
Contributor Author

rxin commented Oct 12, 2014

@marmbrus ready to merge?

@asfgit asfgit closed this in 39ccaba Oct 13, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants