Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] [Databricks 12.2] Support GpuMergeIntoCommand notMatchedBySourceClauses on GPU #8415

Open
andygrove opened this issue May 26, 2023 · 1 comment
Labels
feature request New feature or request

Comments

@andygrove
Copy link
Contributor

andygrove commented May 26, 2023

Is your feature request related to a problem? Please describe.
PR #8282 adds basic support for Databricks 12.2 but did not add support for notMatchedBySourceClauses in GpuMergeIntoCommand and instead falls back to CPU.

Describe the solution you'd like
We should support notMatchedBySourceClauses on GPU.

Describe alternatives you've considered

Additional context
See the discussion at #8282 (comment) for more details.

@andygrove
Copy link
Contributor Author

andygrove commented Jun 14, 2023

Please ignore. This was unrelated to this issue and is now resolved.

One challenge in implementing this is that we replace JoinedRowProcessor with a RapidsProcessDeltaMergeJoin logical operator that involves a non-deterministic expression (monotonically_increasing_id). Spark has an analyzer rule that only allows non-deterministic expressions in a hard-coded set of operators, resulting in the following error:

23/06/14 16:55:53 ERROR GpuMergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found:
a,b,c,_target_row_id_,_row_dropped_,_incr_row_count_,_change_type,(_source_row_present_ IS NULL),(_target_row_present_ IS NULL),true,a,b,c,_target_row_id_,true,(UDF() AND UDF()),CAST(NULL AS STRING),a,b,c,_target_row_id_,false,true,'delete',a,b,c,_target_row_id_,false,UDF(),CAST(NULL AS STRING),a,b,c,_target_row_id_,true,true,CAST(NULL AS STRING)
in operator RapidsProcessDeltaMergeJoin [a#304727, b#304728, c#304729, _target_row_id_#304730L, _row_dropped_#304731, _incr_row_count_#304732, _change_type#304733], isnull(_source_row_present_#304699), isnull(_target_row_present_#304702), [true], [[[a#304425, b#304426, c#304427, _target_row_id_#304707L, true, (UDF() AND UDF()), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, true, delete]]], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, UDF(), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, true, true, null].; line 1 pos 0;

Here is the Spark code from CheckAnalysis:

          case o if o.expressions.exists(!_.deterministic) &&
            !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
            !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
            // Lateral join is checked in checkSubqueryExpression.
            !o.isInstanceOf[LateralJoin] =>
            // The rule above is used to check Aggregate operator.
            o.failAnalysis(
              errorClass = "_LEGACY_ERROR_TEMP_2439",
              messageParameters = Map(
                "sqlExprs" -> o.expressions.map(_.sql).mkString(","),
                "operator" -> operator.simpleString(SQLConf.get.maxToStringFields)))

For now, we avoid replacing the operator if notMatchedBySourceClauses is non-empty.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants