Skip to content

Commit

Permalink
[FLINK-21808][table-planner-blink] Add RelNode for SORT/DISTRIBUTE/CL…
Browse files Browse the repository at this point in the history
…USTER BY

This closes apache#15253
  • Loading branch information
lirui-apache authored and wuchong committed Apr 1, 2021
1 parent c912c1e commit abd7054
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.hive;

import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;

import java.util.List;

/**
* LogicalDistribution is used to represent the expected distribution of the data, similar to Hive's
* SORT BY, DISTRIBUTE BY, and CLUSTER BY semantics.
*/
public class LogicalDistribution extends SingleRel {

// distribution keys
private final List<Integer> distKeys;
// sort collation
private final RelCollation collation;

private LogicalDistribution(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
RelCollation collation,
List<Integer> distKeys) {
super(cluster, traits, child);
this.distKeys = distKeys;
this.collation = collation;
}

public static LogicalDistribution create(
RelNode input, RelCollation collation, List<Integer> distKeys) {
RelOptCluster cluster = input.getCluster();
collation = RelCollationTraitDef.INSTANCE.canonize(collation);
RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE).replace(collation);
return new LogicalDistribution(cluster, traitSet, input, collation, distKeys);
}

public List<Integer> getDistKeys() {
return distKeys;
}

public RelCollation getCollation() {
return collation;
}

@Override
public LogicalDistribution copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new LogicalDistribution(getCluster(), traitSet, inputs.get(0), collation, distKeys);
}

@Override
public RelNode accept(RelShuttle shuttle) {
return shuttle.visit(this);
}

@Override
public RelWriter explainTerms(RelWriter pw) {
super.explainTerms(pw);
pw.item("collation", collation);
pw.item("dist", distKeys);
return pw;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
import org.apache.flink.table.planner.plan.utils.WindowUtil.groupingContainsWindowStartEnd
import org.apache.flink.table.types.logical.TimestampType

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.hint.RelHint
Expand All @@ -37,9 +36,9 @@ import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.fun.SqlStdOperatorTable.FINAL
import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution

import java.util.{Collections => JCollections}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -200,6 +199,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
sink.catalogTable,
sink.staticPartitions)

case distribution: LogicalDistribution =>
val newInput = distribution.getInput.accept(this)
distribution.copy(distribution.getTraitSet, JCollections.singletonList(newInput))

case _ =>
throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, SingleRel}

import java.util.{List => JList}
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution

/**
* A FlinkLogicalRel to represent Hive's SORT BY, DISTRIBUTE BY, and CLUSTER BY semantics.
*/
class FlinkLogicalDistribution(
cluster: RelOptCluster,
traits: RelTraitSet,
child: RelNode,
val collation: RelCollation,
val distKeys: JList[Integer])
extends SingleRel(cluster, traits, child)
with FlinkLogicalRel {

override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode =
new FlinkLogicalDistribution(getCluster, traitSet, inputs.get(0), collation, distKeys)
}

class FlinkLogicalDistributionBatchConverter extends ConverterRule(
classOf[LogicalDistribution],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalDistributionBatchConverter") {

override def convert(rel: RelNode): RelNode = {
val distribution = rel.asInstanceOf[LogicalDistribution]
val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
FlinkLogicalDistribution.create(
newInput, distribution.getCollation, distribution.getDistKeys)
}
}

object FlinkLogicalDistribution {
val BATCH_CONVERTER: RelOptRule = new FlinkLogicalDistributionBatchConverter

def create(
input: RelNode,
collation: RelCollation,
distKeys: JList[Integer]): FlinkLogicalDistribution = {
val cluster = input.getCluster
val collationTrait = RelCollationTraitDef.INSTANCE.canonize(collation)
val traitSet = if (distKeys.isEmpty) {
cluster.traitSetOf(FlinkConventions.LOGICAL)
.replace(collationTrait)
.replace(FlinkRelDistribution.ANY)
} else {
cluster.traitSetOf(FlinkConventions.LOGICAL)
.replace(collationTrait)
.replace(FlinkRelDistribution.hash(distKeys))
}
new FlinkLogicalDistribution(cluster, traitSet, input, collation, distKeys)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ object FlinkBatchRuleSets {
FlinkLogicalWindowAggregate.CONVERTER,
FlinkLogicalSnapshot.CONVERTER,
FlinkLogicalSink.CONVERTER,
FlinkLogicalLegacySink.CONVERTER
FlinkLogicalLegacySink.CONVERTER,
FlinkLogicalDistribution.BATCH_CONVERTER
)

/**
Expand Down Expand Up @@ -435,7 +436,9 @@ object FlinkBatchRuleSets {
BatchPhysicalPythonCorrelateRule.INSTANCE,
// sink
BatchPhysicalSinkRule.INSTANCE,
BatchPhysicalLegacySinkRule.INSTANCE
BatchPhysicalLegacySinkRule.INSTANCE,
// hive distribution
BatchPhysicalDistributionRule.INSTANCE
)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.physical.batch

import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalDistribution
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort

/**
* Rule that matches [[FlinkLogicalDistribution]].
*/
class BatchPhysicalDistributionRule extends ConverterRule(
classOf[FlinkLogicalDistribution],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
"BatchExecDistributionRule") {

override def convert(rel: RelNode): RelNode = {
val logicalDistribution = rel.asInstanceOf[FlinkLogicalDistribution]
val distribution = logicalDistribution.getTraitSet.getTrait(
FlinkRelDistributionTraitDef.INSTANCE)

val input = logicalDistribution.getInput
val requiredTraitSet = input.getTraitSet
.replace(distribution)
.replace(FlinkConventions.BATCH_PHYSICAL)
val newInput = RelOptRule.convert(input, requiredTraitSet)
if (logicalDistribution.collation.getFieldCollations.isEmpty) {
newInput
} else {
val providedTraitSet = logicalDistribution.getTraitSet.replace(
FlinkConventions.BATCH_PHYSICAL)
new BatchPhysicalSort(
logicalDistribution.getCluster,
providedTraitSet,
newInput,
logicalDistribution.collation)
}
}

}

object BatchPhysicalDistributionRule {
val INSTANCE: RelOptRule = new BatchPhysicalDistributionRule
}

0 comments on commit abd7054

Please sign in to comment.