Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Low shuffle merge implementation. #10753

Conversation

liurenjie1024
Copy link
Collaborator

No description provided.

@liurenjie1024 liurenjie1024 marked this pull request as draft April 30, 2024 10:14
@sameerz sameerz added the performance A performance related task/issue label May 1, 2024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024 copyrights. Comment applies to other files as well.

Comment on lines +57 to +64
@JsonDeserialize(contentAs = classOf[java.lang.Long])
rows: Option[Long] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
files: Option[Long] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
bytes: Option[Long] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
partitions: Option[Long] = None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this reformatted? It doesn't match the project coding style. Comment applies to elsewhere in this file. A reformat of this file makes it much harder to see the actual changes. The reformat is not related to this PR, please revert it.

val newWrittenFiles = withStatusCode("DELTA", "Writing merged data") {
writeAllChanges(spark, deltaTxn, filesToRewrite)
}
val newWrittenFiles = lowShuffleMerge(spark, deltaTxn, filesToRewrite)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a config to control whether low shuffle merge is performed or not, especially on Delta Lake versions that do not have a low shuffle merge implementation (like OSS Delta Lake).

private def lowShuffleMerge(spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile]): Seq[FileAction] = {
val executor = new LowShuffleMergeExecutor(spark, deltaTxn, filesToRewrite, this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to consider using a GpuLowShuffleMergeIntoCommand separate from GpuMergeIntoCommand (both leveraging common code as appropriate) so this can appear in the SQL UI and make it clear when we're using low shuffle vs. not.

Copy link
Collaborator

@razajafri razajafri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert reformats for reviewers to see the actual changes

@liurenjie1024
Copy link
Collaborator Author

Closed by #10786

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants