diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml index ec175a9501525..e9035ec14f252 100644 --- a/flink-table/flink-table-planner-blink/pom.xml +++ b/flink-table/flink-table-planner-blink/pom.xml @@ -280,6 +280,7 @@ under the License. test + org.reflections reflections @@ -418,6 +419,11 @@ under the License. org.apache.flink.table.shaded.org.codehaus --> + + org.reflections + org.apache.flink.table.shaded.org.reflections + + diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java index e8e394a00e1bf..2f41bc896fb4f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java @@ -21,11 +21,19 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.data.RowData; import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer; +import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer; import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor; import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + import java.util.List; /** @@ -33,9 +41,20 @@ * * @param The type of the elements that result from this node. */ +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "class") public interface ExecNode { + String FIELD_NAME_ID = "id"; + String FIELD_NAME_DESCRIPTION = "description"; + String FIELD_NAME_INPUT_PROPERTIES = "inputProperties"; + String FIELD_NAME_OUTPUT_TYPE = "outputType"; + + /** Gets the ID of this node. */ + @JsonProperty(value = FIELD_NAME_ID) + int getId(); + /** Returns a string which describes this node. */ + @JsonProperty(value = FIELD_NAME_DESCRIPTION) String getDescription(); /** @@ -46,6 +65,9 @@ public interface ExecNode { * to the JavaDoc of {@link RowData} for more info about mapping of logical types to internal * data structures. */ + @JsonProperty(value = FIELD_NAME_OUTPUT_TYPE) + @JsonSerialize(using = LogicalTypeJsonSerializer.class) + @JsonDeserialize(using = LogicalTypeJsonDeserializer.class) LogicalType getOutputType(); /** @@ -55,6 +77,7 @@ public interface ExecNode { * * @return List of this node's input properties. */ + @JsonProperty(value = FIELD_NAME_INPUT_PROPERTIES) List getInputProperties(); /** @@ -62,6 +85,7 @@ public interface ExecNode { * *

NOTE: If there are no inputs, returns an empty list, not null. */ + @JsonIgnore List getInputEdges(); /** @@ -71,6 +95,7 @@ public interface ExecNode { * * @param inputEdges the input {@link ExecEdge}s. */ + @JsonIgnore void setInputEdges(List inputEdges); /** diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java index 25cacdcf217f2..c8a7846661cd6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java @@ -25,6 +25,9 @@ import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + import java.util.ArrayList; import java.util.List; @@ -36,22 +39,53 @@ * * @param The type of the elements that result from this node. */ +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class ExecNodeBase implements ExecNode { - private final String description; - private final List inputProperties; - private final LogicalType outputType; - private List inputEdges; + /** The unique identifier for each ExecNode in the json plan. */ + @JsonIgnore private final int id; + + @JsonIgnore private final String description; + + @JsonIgnore private final LogicalType outputType; + + @JsonIgnore private final List inputProperties; - private transient Transformation transformation; + @JsonIgnore private List inputEdges; + + @JsonIgnore private transient Transformation transformation; + + /** This is used to assign a unique ID to every ExecNode. */ + private static Integer idCounter = 0; + + /** Generate an unique ID for ExecNode. */ + public static int getNewNodeId() { + idCounter++; + return idCounter; + } + // used for json creator protected ExecNodeBase( - List inputProperties, LogicalType outputType, String description) { + int id, + List inputProperties, + LogicalType outputType, + String description) { + this.id = id; this.inputProperties = checkNotNull(inputProperties); this.outputType = checkNotNull(outputType); this.description = checkNotNull(description); } + protected ExecNodeBase( + List inputProperties, LogicalType outputType, String description) { + this(getNewNodeId(), inputProperties, outputType, description); + } + + @Override + public final int getId() { + return id; + } + @Override public String getDescription() { return description; @@ -67,6 +101,7 @@ public List getInputProperties() { return inputProperties; } + @JsonIgnore @Override public List getInputEdges() { return checkNotNull( @@ -74,6 +109,7 @@ public List getInputEdges() { "inputEdges should not null, please call `setInputEdges(List)` first."); } + @JsonIgnore @Override public void setInputEdges(List inputEdges) { checkNotNull(inputEdges, "inputEdges should not be null."); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java index 527c06115e022..b7369265123fc 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java @@ -20,8 +20,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonDeserializer; +import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.Arrays; +import java.util.Objects; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -32,17 +42,53 @@ *

The input concept is not corresponding to the execution edge, but rather to the {@link Input}. */ @Internal +@JsonIgnoreProperties(ignoreUnknown = true) public class InputProperty { + /** The input does not require any specific data distribution. */ + public static final RequiredDistribution ANY_DISTRIBUTION = + new RequiredDistribution(DistributionType.ANY) {}; + + /** + * The input will read all records for each parallelism of the target node. All records appear + * in each parallelism. + */ + public static final RequiredDistribution BROADCAST_DISTRIBUTION = + new RequiredDistribution(DistributionType.BROADCAST) {}; + + /** The input will read all records, and the parallelism of the target node must be 1. */ + public static final RequiredDistribution SINGLETON_DISTRIBUTION = + new RequiredDistribution(DistributionType.SINGLETON) {}; + + /** + * Returns a place-holder required distribution. + * + *

Currently {@link InputProperty} is only used for deadlock breakup and multi-input in batch + * mode, so for {@link ExecNode}s not affecting the algorithm we use this place-holder. + * + *

We should fill out the detailed {@link InputProperty} for each sub-class of {@link + * ExecNode} in the future. + */ + public static final RequiredDistribution UNKNOWN_DISTRIBUTION = + new RequiredDistribution(DistributionType.UNKNOWN) {}; + public static final InputProperty DEFAULT = InputProperty.builder().build(); + public static final String FIELD_NAME_REQUIRED_DISTRIBUTION = "requiredDistribution"; + public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior"; + public static final String FIELD_NAME_PRIORITY = "priority"; + /** * The required input data distribution when the target {@link ExecNode} read data in from the * corresponding input. */ + @JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION) + @JsonSerialize(using = RequiredDistributionJsonSerializer.class) + @JsonDeserialize(using = RequiredDistributionJsonDeserializer.class) private final RequiredDistribution requiredDistribution; /** How does the input record trigger the output behavior of the target {@link ExecNode}. */ + @JsonProperty(FIELD_NAME_DAM_BEHAVIOR) private final DamBehavior damBehavior; /** @@ -51,27 +97,54 @@ public class InputProperty { *

The smaller the integer, the higher the priority. Same integer indicates the same * priority. */ + @JsonProperty(FIELD_NAME_PRIORITY) private final int priority; - private InputProperty( - RequiredDistribution requiredDistribution, DamBehavior damBehavior, int priority) { - this.requiredDistribution = requiredDistribution; - this.damBehavior = damBehavior; + @JsonCreator + public InputProperty( + @JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION) + RequiredDistribution requiredDistribution, + @JsonProperty(FIELD_NAME_DAM_BEHAVIOR) DamBehavior damBehavior, + @JsonProperty(FIELD_NAME_PRIORITY) int priority) { + this.requiredDistribution = checkNotNull(requiredDistribution); + this.damBehavior = checkNotNull(damBehavior); this.priority = priority; } + @JsonIgnore public RequiredDistribution getRequiredDistribution() { return requiredDistribution; } + @JsonIgnore public DamBehavior getDamBehavior() { return damBehavior; } + @JsonIgnore public int getPriority() { return priority; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InputProperty inputProperty = (InputProperty) o; + return priority == inputProperty.priority + && requiredDistribution.equals(inputProperty.requiredDistribution) + && damBehavior == inputProperty.damBehavior; + } + + @Override + public int hashCode() { + return Objects.hash(requiredDistribution, damBehavior, priority); + } + @Override public String toString() { return "InputProperty{" @@ -132,6 +205,23 @@ public DistributionType getType() { return type; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RequiredDistribution that = (RequiredDistribution) o; + return type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(type); + } + @Override public String toString() { return type.name(); @@ -155,39 +245,34 @@ public int[] getKeys() { return keys; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + HashDistribution that = (HashDistribution) o; + return Arrays.equals(keys, that.keys); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + Arrays.hashCode(keys); + return result; + } + @Override public String toString() { return "HASH" + Arrays.toString(keys); } } - /** The input does not require any specific data distribution. */ - public static final RequiredDistribution ANY_DISTRIBUTION = - new RequiredDistribution(DistributionType.ANY) {}; - - /** - * The input will read all records for each parallelism of the target node. All records appear - * in each parallelism. - */ - public static final RequiredDistribution BROADCAST_DISTRIBUTION = - new RequiredDistribution(DistributionType.BROADCAST) {}; - - /** The input will read all records, and the parallelism of the target node must be 1. */ - public static final RequiredDistribution SINGLETON_DISTRIBUTION = - new RequiredDistribution(DistributionType.SINGLETON) {}; - - /** - * Returns a place-holder required distribution. - * - *

Currently {@link InputProperty} is only used for deadlock breakup and multi-input in batch - * mode, so for {@link ExecNode}s not affecting the algorithm we use this place-holder. - * - *

We should fill out the detailed {@link InputProperty} for each sub-class of {@link - * ExecNode} in the future. - */ - public static final RequiredDistribution UNKNOWN_DISTRIBUTION = - new RequiredDistribution(DistributionType.UNKNOWN) {}; - /** * The input will read the records whose keys hash to a particular hash value. * diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java index 7b49c2814ca00..d34947f992769 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java @@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -37,8 +38,8 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan implements BatchExecNode { public BatchExecTableSourceScan( - ScanTableSource tableSource, RowType outputType, String description) { - super(tableSource, outputType, description); + DynamicTableSourceSpec tableSourceSpec, RowType outputType, String description) { + super(tableSourceSpec, getNewNodeId(), outputType, description); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java index 809c3597f6b9a..9a261a6f88a8c 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java @@ -33,30 +33,44 @@ import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.serde.DynamicTableSourceSpecJsonDeserializer; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; + import java.util.Collections; /** * Base {@link ExecNode} to read data from an external source defined by a {@link ScanTableSource}. */ public abstract class CommonExecTableSourceScan extends ExecNodeBase { - private final ScanTableSource tableSource; + public static final String FIELD_NAME_SCAN_TABLE_SOURCE = "scanTableSource"; + + @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) + @JsonDeserialize(using = DynamicTableSourceSpecJsonDeserializer.class) + private final DynamicTableSourceSpec tableSourceSpec; protected CommonExecTableSourceScan( - ScanTableSource tableSource, LogicalType outputType, String description) { - super(Collections.emptyList(), outputType, description); - this.tableSource = tableSource; + DynamicTableSourceSpec tableSourceSpec, + int id, + LogicalType outputType, + String description) { + super(id, Collections.emptyList(), outputType, description); + this.tableSourceSpec = tableSourceSpec; } @Override protected Transformation translateToPlanInternal(PlannerBase planner) { final StreamExecutionEnvironment env = planner.getExecEnv(); final String operatorName = getDescription(); - InternalTypeInfo outputTypeInfo = InternalTypeInfo.of((RowType) getOutputType()); + final InternalTypeInfo outputTypeInfo = + InternalTypeInfo.of((RowType) getOutputType()); + final ScanTableSource tableSource = tableSourceSpec.getScanTableSource(); ScanTableSource.ScanRuntimeProvider provider = tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); if (provider instanceof SourceFunctionProvider) { diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableJsonDeserializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableJsonDeserializer.java new file mode 100644 index 0000000000000..532fd19d05bd7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableJsonDeserializer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; +import java.util.Map; + +/** JSON deserializer for {@link CatalogTable}. */ +public class CatalogTableJsonDeserializer extends StdDeserializer { + private static final long serialVersionUID = 1L; + + public CatalogTableJsonDeserializer() { + super(CatalogTable.class); + } + + @Override + public CatalogTable deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException, JsonProcessingException { + Map catalogProperties = + jsonParser.readValueAs(new TypeReference>() {}); + return CatalogTableImpl.fromProperties(catalogProperties); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableJsonSerializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableJsonSerializer.java new file mode 100644 index 0000000000000..29fb8daf31828 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableJsonSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.Map; + +/** JSON serializer for {@link CatalogTable}. */ +public class CatalogTableJsonSerializer extends StdSerializer { + private static final long serialVersionUID = 1L; + + public CatalogTableJsonSerializer() { + super(CatalogTable.class); + } + + @Override + public void serialize( + CatalogTable catalogTable, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + if (!(catalogTable instanceof CatalogTableImpl)) { + throw new UnsupportedOperationException("Only CatalogTableImpl is supported now."); + } + + jsonGenerator.writeStartObject(); + for (Map.Entry entry : catalogTable.toProperties().entrySet()) { + jsonGenerator.writeStringField(entry.getKey(), entry.getValue()); + } + jsonGenerator.writeEndObject(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecJsonDeserializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecJsonDeserializer.java new file mode 100644 index 0000000000000..3bc15f0001114 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecJsonDeserializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +/** JSON deserializer for {@link DynamicTableSourceSpec}. */ +public class DynamicTableSourceSpecJsonDeserializer + extends StdDeserializer { + private static final long serialVersionUID = 1L; + + public DynamicTableSourceSpecJsonDeserializer() { + super(DynamicTableSourceSpec.class); + } + + @Override + public DynamicTableSourceSpec deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException, JsonProcessingException { + JsonNode jsonNode = jsonParser.readValueAsTree(); + // use new ObjectMapper to avoid endless recursive calls + ObjectMapper mapper = new ObjectMapper(); + JsonParser newParser = mapper.getFactory().createParser(jsonNode.toPrettyString()); + DynamicTableSourceSpec dynamicTableSourceSpec = + newParser.readValueAs(DynamicTableSourceSpec.class); + SerdeContext serdeCtx = ((FlinkDeserializationContext) ctx).getSerdeContext(); + dynamicTableSourceSpec.setReadableConfig(serdeCtx.getConfiguration()); + dynamicTableSourceSpec.setClassLoader(serdeCtx.getClassLoader()); + return dynamicTableSourceSpec; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkDeserializationContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkDeserializationContext.java new file mode 100644 index 0000000000000..c5acbd1d6ba1f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkDeserializationContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationConfig; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.InjectableValues; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerFactory; + +/** Custom JSON {@link DeserializationContext} which wraps a {@link SerdeContext}. */ +public class FlinkDeserializationContext extends DefaultDeserializationContext { + private static final long serialVersionUID = 1L; + private final SerdeContext serdeCtx; + + public FlinkDeserializationContext(DefaultDeserializationContext src, SerdeContext serdeCtx) { + super(src); + this.serdeCtx = serdeCtx; + } + + protected FlinkDeserializationContext( + FlinkDeserializationContext src, + DeserializationConfig config, + JsonParser jp, + InjectableValues values) { + super(src, config, jp, values); + this.serdeCtx = src.serdeCtx; + } + + protected FlinkDeserializationContext( + FlinkDeserializationContext src, DeserializerFactory factory) { + super(src, factory); + this.serdeCtx = src.serdeCtx; + } + + @Override + public DefaultDeserializationContext with(DeserializerFactory factory) { + return new FlinkDeserializationContext(this, factory); + } + + @Override + public DefaultDeserializationContext createInstance( + DeserializationConfig config, JsonParser p, InjectableValues values) { + return new FlinkDeserializationContext(this, config, p, values); + } + + public SerdeContext getSerdeContext() { + return serdeCtx; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java new file mode 100644 index 0000000000000..acd2bd3f0aef5 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Constructor; + +/** An utility class that provide abilities for JSON serialization and deserialization. */ +public class JsonSerdeUtil { + + /** Return true if the given class's constructors have @JsonCreator annotation, else false. */ + public static boolean hasJsonCreatorAnnotation(Class clazz) { + for (Constructor constructor : clazz.getDeclaredConstructors()) { + for (Annotation annotation : constructor.getAnnotations()) { + if (annotation instanceof JsonCreator) { + return true; + } + } + } + return false; + } + + /** Create an {@link ObjectMapper} which DeserializationContext wraps a {@link SerdeContext}. */ + public static ObjectMapper createObjectMapper(SerdeContext serdeCtx) { + ObjectMapper mapper = + new ObjectMapper( + null, // JsonFactory + null, // DefaultSerializerProvider + new FlinkDeserializationContext( + new DefaultDeserializationContext.Impl( + BeanDeserializerFactory.instance), + serdeCtx)); + mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false); + return mapper; + } + + private JsonSerdeUtil() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java new file mode 100644 index 0000000000000..f00c4a0cb6265 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TypeInformationRawType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; +import org.apache.flink.table.utils.EncodingUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_COMPARISION; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_FINAL; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IDENTIFIER; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IMPLEMENTATION_CLASS; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_INSTANTIABLE; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_LOGICAL_TYPE; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_NAME; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_NULLABLE; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_SOURCE_TYPE; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_SUPPER_TYPE; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_SYMBOL_CLASS; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_TYPE_INFO; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_TYPE_NAME; + +/** + * JSON deserializer for {@link LogicalType}. refer to {@link LogicalTypeJsonSerializer} for + * serializer. + */ +public class LogicalTypeJsonDeserializer extends StdDeserializer { + private static final long serialVersionUID = 1L; + + public LogicalTypeJsonDeserializer() { + super(LogicalType.class); + } + + @Override + public LogicalType deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException, JsonProcessingException { + final JsonNode logicalTypeNode = jsonParser.readValueAsTree(); + SerdeContext serdeCtx = ((FlinkDeserializationContext) ctx).getSerdeContext(); + return deserialize(logicalTypeNode, serdeCtx); + } + + public LogicalType deserialize(JsonNode logicalTypeNode, SerdeContext serdeCtx) { + if (logicalTypeNode.get(FIELD_NAME_SYMBOL_CLASS) != null) { + return deserializeSymbolType(logicalTypeNode); + } else if (logicalTypeNode.get(FIELD_NAME_TYPE_INFO) != null) { + return deserializeTypeInformationRawType(logicalTypeNode, serdeCtx); + } else if (logicalTypeNode.get(FIELD_NAME_TYPE_NAME) != null + && logicalTypeNode + .get(FIELD_NAME_TYPE_NAME) + .asText() + .toUpperCase() + .equals(LogicalTypeRoot.STRUCTURED_TYPE.name())) { + return deserializeStructuredType(logicalTypeNode, serdeCtx); + } else if (logicalTypeNode.get(FIELD_NAME_TYPE_NAME) != null + && logicalTypeNode + .get(FIELD_NAME_TYPE_NAME) + .asText() + .toUpperCase() + .equals(LogicalTypeRoot.DISTINCT_TYPE.name())) { + return deserializeDistinctType(logicalTypeNode, serdeCtx); + } else { + return LogicalTypeParser.parse(logicalTypeNode.asText()); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private SymbolType deserializeSymbolType(JsonNode logicalTypeNode) { + boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean(); + String className = logicalTypeNode.get(FIELD_NAME_SYMBOL_CLASS).asText(); + try { + Class clazz = Class.forName(className); + return new SymbolType(nullable, clazz); + } catch (ClassNotFoundException e) { + throw new TableException("Failed to deserialize symbol type", e); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private TypeInformationRawType deserializeTypeInformationRawType( + JsonNode logicalTypeNode, SerdeContext serdeCtx) { + boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean(); + String typeInfoString = logicalTypeNode.get(FIELD_NAME_TYPE_INFO).asText(); + final TypeInformation typeInfo = + EncodingUtils.decodeStringToObject( + typeInfoString, TypeInformation.class, serdeCtx.getClassLoader()); + return new TypeInformationRawType(nullable, typeInfo); + } + + private StructuredType deserializeStructuredType( + JsonNode logicalTypeNode, SerdeContext serdeCtx) { + StructuredType.Builder builder; + final ObjectIdentifier identifier; + if (logicalTypeNode.get(FIELD_NAME_IDENTIFIER) != null) { + JsonNode identifierNode = logicalTypeNode.get(FIELD_NAME_IDENTIFIER); + identifier = ObjectIdentifierJsonDeserializer.deserialize(identifierNode); + } else { + identifier = null; + } + final Class implementationClass; + if (logicalTypeNode.get(FIELD_NAME_IMPLEMENTATION_CLASS) != null) { + String classString = logicalTypeNode.get(FIELD_NAME_IMPLEMENTATION_CLASS).asText(); + try { + implementationClass = Class.forName(classString, true, serdeCtx.getClassLoader()); + } catch (ClassNotFoundException e) { + throw new TableException(classString + " is not found."); + } + } else { + implementationClass = null; + } + if (identifier != null && implementationClass != null) { + builder = StructuredType.newBuilder(identifier, implementationClass); + } else if (identifier != null) { + builder = StructuredType.newBuilder(identifier); + } else if (implementationClass != null) { + builder = StructuredType.newBuilder(implementationClass); + } else { + throw new TableException("This should not happen."); + } + + List attributes = new ArrayList<>(); + for (JsonNode attributeNode : logicalTypeNode.get(FIELD_NAME_ATTRIBUTES)) { + String name = attributeNode.get(FIELD_NAME_NAME).asText(); + LogicalType logicalType = + deserialize(attributeNode.get(FIELD_NAME_LOGICAL_TYPE), serdeCtx); + final String description; + if (attributeNode.get(FIELD_NAME_DESCRIPTION) != null) { + description = attributeNode.get(FIELD_NAME_DESCRIPTION).asText(); + } else { + description = null; + } + attributes.add(new StructuredType.StructuredAttribute(name, logicalType, description)); + } + builder.attributes(attributes); + + boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean(); + builder.setNullable(nullable); + + boolean isFinal = logicalTypeNode.get(FIELD_NAME_FINAL).asBoolean(); + builder.setFinal(isFinal); + + boolean isInstantiable = logicalTypeNode.get(FIELD_NAME_INSTANTIABLE).asBoolean(); + builder.setInstantiable(isInstantiable); + + StructuredType.StructuredComparision comparision = + StructuredType.StructuredComparision.valueOf( + logicalTypeNode.get(FIELD_NAME_COMPARISION).asText().toUpperCase()); + builder.comparision(comparision); + + if (logicalTypeNode.get(FIELD_NAME_SUPPER_TYPE) != null) { + StructuredType supperType = + deserializeStructuredType( + logicalTypeNode.get(FIELD_NAME_SUPPER_TYPE), serdeCtx); + builder.superType(supperType); + } + if (logicalTypeNode.get(FIELD_NAME_DESCRIPTION) != null) { + String description = logicalTypeNode.get(FIELD_NAME_DESCRIPTION).asText(); + builder.description(description); + } + return builder.build(); + } + + private DistinctType deserializeDistinctType(JsonNode logicalTypeNode, SerdeContext serdeCtx) { + JsonNode identifierNode = logicalTypeNode.get(FIELD_NAME_IDENTIFIER); + ObjectIdentifier identifier = ObjectIdentifierJsonDeserializer.deserialize(identifierNode); + LogicalType sourceType = deserialize(logicalTypeNode.get(FIELD_NAME_SOURCE_TYPE), serdeCtx); + DistinctType.Builder builder = DistinctType.newBuilder(identifier, sourceType); + if (logicalTypeNode.get(FIELD_NAME_DESCRIPTION) != null) { + String description = logicalTypeNode.get(FIELD_NAME_DESCRIPTION).asText(); + builder.description(description); + } + return builder.build(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java new file mode 100644 index 0000000000000..c0588ecbdd3f0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TypeInformationRawType; +import org.apache.flink.table.types.logical.UnresolvedUserDefinedType; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +/** + * JSON serializer for {@link LogicalType}. refer to {@link LogicalTypeJsonDeserializer} for + * deserializer. + */ +public class LogicalTypeJsonSerializer extends StdSerializer { + private static final long serialVersionUID = 1L; + + public static final String FIELD_NAME_NULLABLE = "nullable"; + // SymbolType's field + public static final String FIELD_NAME_SYMBOL_CLASS = "symbolClass"; + // TypeInformationRawType's field + public static final String FIELD_NAME_TYPE_INFO = "typeInfo"; + // StructuredType's fields + public static final String FIELD_NAME_TYPE_NAME = "type"; + public static final String FIELD_NAME_IDENTIFIER = "identifier"; + public static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass"; + public static final String FIELD_NAME_ATTRIBUTES = "attributes"; + public static final String FIELD_NAME_NAME = "name"; + public static final String FIELD_NAME_LOGICAL_TYPE = "logicalType"; + public static final String FIELD_NAME_DESCRIPTION = "description"; + public static final String FIELD_NAME_FINAL = "final"; + public static final String FIELD_NAME_INSTANTIABLE = "instantiable"; + public static final String FIELD_NAME_COMPARISION = "comparision"; + public static final String FIELD_NAME_SUPPER_TYPE = "supperType"; + // DistinctType's fields + public static final String FIELD_NAME_SOURCE_TYPE = "sourceType"; + + public LogicalTypeJsonSerializer() { + super(LogicalType.class); + } + + @Override + public void serialize( + LogicalType logicalType, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + if (logicalType instanceof SymbolType) { + // SymbolType does not support `asSerializableString` + serialize((SymbolType) logicalType, jsonGenerator); + } else if (logicalType instanceof TypeInformationRawType) { + // TypeInformationRawType does not support `asSerializableString` + serialize((TypeInformationRawType) logicalType, jsonGenerator); + } else if (logicalType instanceof StructuredType) { + // StructuredType does not full support `asSerializableString` + serialize((StructuredType) logicalType, jsonGenerator); + } else if (logicalType instanceof DistinctType) { + // DistinctType does not full support `asSerializableString` + serialize((DistinctType) logicalType, jsonGenerator); + } else if (logicalType instanceof UnresolvedUserDefinedType) { + throw new TableException( + "Can not serialize an UnresolvedUserDefinedType instance. \n" + + "It needs to be resolved into a proper user-defined type.\""); + } else { + jsonGenerator.writeObject(logicalType.asSerializableString()); + } + } + + private void serialize(SymbolType symbolType, JsonGenerator jsonGenerator) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable()); + jsonGenerator.writeStringField( + FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getName()); + jsonGenerator.writeEndObject(); + } + + private void serialize(TypeInformationRawType rawType, JsonGenerator jsonGenerator) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable()); + jsonGenerator.writeStringField( + FIELD_NAME_TYPE_INFO, + EncodingUtils.encodeObjectToString(rawType.getTypeInformation())); + jsonGenerator.writeEndObject(); + } + + private void serialize(StructuredType structuredType, JsonGenerator jsonGenerator) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField( + FIELD_NAME_TYPE_NAME, LogicalTypeRoot.STRUCTURED_TYPE.name()); + jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, structuredType.isNullable()); + if (structuredType.getObjectIdentifier().isPresent()) { + jsonGenerator.writeObjectField( + FIELD_NAME_IDENTIFIER, structuredType.getObjectIdentifier().get()); + } + if (structuredType.getImplementationClass().isPresent()) { + jsonGenerator.writeStringField( + FIELD_NAME_IMPLEMENTATION_CLASS, + structuredType.getImplementationClass().get().getName()); + } + jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTES); + jsonGenerator.writeStartArray(); + for (StructuredType.StructuredAttribute attribute : structuredType.getAttributes()) { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField(FIELD_NAME_NAME, attribute.getName()); + jsonGenerator.writeObjectField(FIELD_NAME_LOGICAL_TYPE, attribute.getType()); + if (attribute.getDescription().isPresent()) { + jsonGenerator.writeStringField( + FIELD_NAME_DESCRIPTION, attribute.getDescription().get()); + } + jsonGenerator.writeEndObject(); + } + jsonGenerator.writeEndArray(); + jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, structuredType.isFinal()); + jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, structuredType.isInstantiable()); + jsonGenerator.writeStringField( + FIELD_NAME_COMPARISION, structuredType.getComparision().name()); + if (structuredType.getSuperType().isPresent()) { + jsonGenerator.writeObjectField( + FIELD_NAME_SUPPER_TYPE, structuredType.getSuperType().get()); + } + if (structuredType.getDescription().isPresent()) { + jsonGenerator.writeStringField( + FIELD_NAME_DESCRIPTION, structuredType.getDescription().get()); + } + jsonGenerator.writeEndObject(); + } + + private void serialize(DistinctType distinctType, JsonGenerator jsonGenerator) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.DISTINCT_TYPE.name()); + Preconditions.checkArgument(distinctType.getObjectIdentifier().isPresent()); + jsonGenerator.writeObjectField( + FIELD_NAME_IDENTIFIER, distinctType.getObjectIdentifier().get()); + jsonGenerator.writeObjectField(FIELD_NAME_SOURCE_TYPE, distinctType.getSourceType()); + if (distinctType.getDescription().isPresent()) { + jsonGenerator.writeStringField( + FIELD_NAME_DESCRIPTION, distinctType.getDescription().get()); + } + jsonGenerator.writeEndObject(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java new file mode 100644 index 0000000000000..1162e7ec880d0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.catalog.ObjectIdentifier; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +import static org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonSerializer.FIELD_NAME_CATALOG_NAME; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonSerializer.FIELD_NAME_DATABASE_NAME; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonSerializer.FIELD_NAME_TABLE_NAME; + +/** JSON deserializer for {@link ObjectIdentifier}. */ +public class ObjectIdentifierJsonDeserializer extends StdDeserializer { + private static final long serialVersionUID = 1L; + + protected ObjectIdentifierJsonDeserializer() { + super(ObjectIdentifier.class); + } + + @Override + public ObjectIdentifier deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException, JsonProcessingException { + final JsonNode identifierNode = jsonParser.readValueAsTree(); + return deserialize(identifierNode); + } + + public static ObjectIdentifier deserialize(JsonNode identifierNode) { + return ObjectIdentifier.of( + identifierNode.get(FIELD_NAME_CATALOG_NAME).asText(), + identifierNode.get(FIELD_NAME_DATABASE_NAME).asText(), + identifierNode.get(FIELD_NAME_TABLE_NAME).asText()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java new file mode 100644 index 0000000000000..6bd5def674f2d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.catalog.ObjectIdentifier; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +/** JSON serializer for {@link ObjectIdentifier}. */ +public class ObjectIdentifierJsonSerializer extends StdSerializer { + private static final long serialVersionUID = 1L; + + public static final String FIELD_NAME_CATALOG_NAME = "catalogName"; + public static final String FIELD_NAME_DATABASE_NAME = "databaseName"; + public static final String FIELD_NAME_TABLE_NAME = "tableName"; + + protected ObjectIdentifierJsonSerializer() { + super(ObjectIdentifier.class); + } + + @Override + public void serialize( + ObjectIdentifier objectIdentifier, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField(FIELD_NAME_CATALOG_NAME, objectIdentifier.getCatalogName()); + jsonGenerator.writeStringField( + FIELD_NAME_DATABASE_NAME, objectIdentifier.getDatabaseName()); + jsonGenerator.writeStringField(FIELD_NAME_TABLE_NAME, objectIdentifier.getObjectName()); + jsonGenerator.writeEndObject(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java new file mode 100644 index 0000000000000..24db15abce57b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +/** JSON deserializer for {@link RequiredDistribution}. */ +public class RequiredDistributionJsonDeserializer extends StdDeserializer { + private static final long serialVersionUID = 1L; + + public RequiredDistributionJsonDeserializer() { + super(RequiredDistribution.class); + } + + @Override + public RequiredDistribution deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException, JsonProcessingException { + JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser); + DistributionType type = + DistributionType.valueOf(jsonNode.get("type").asText().toUpperCase()); + switch (type) { + case ANY: + return InputProperty.ANY_DISTRIBUTION; + case SINGLETON: + return InputProperty.SINGLETON_DISTRIBUTION; + case BROADCAST: + return InputProperty.BROADCAST_DISTRIBUTION; + case UNKNOWN: + return InputProperty.UNKNOWN_DISTRIBUTION; + case HASH: + JsonNode keysNode = jsonNode.get("keys"); + if (keysNode == null) { + throw new TableException("Hash distribution requires non-empty hash keys."); + } + int[] keys = new int[keysNode.size()]; + for (int i = 0; i < keysNode.size(); ++i) { + keys[i] = keysNode.get(i).asInt(); + } + return InputProperty.hashDistribution(keys); + default: + throw new TableException("Unsupported distribution type: " + type); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java new file mode 100644 index 0000000000000..df47b8072ae32 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.HashDistribution; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +/** JSON serializer for {@link RequiredDistribution}. */ +public class RequiredDistributionJsonSerializer extends StdSerializer { + private static final long serialVersionUID = 1L; + + public RequiredDistributionJsonSerializer() { + super(RequiredDistribution.class); + } + + @Override + public void serialize( + RequiredDistribution requiredDistribution, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeStartObject(); + DistributionType type = requiredDistribution.getType(); + jsonGenerator.writeStringField("type", type.name()); + switch (type) { + case ANY: + case SINGLETON: + case BROADCAST: + case UNKNOWN: + // do nothing, type name is enough + break; + case HASH: + HashDistribution hashDistribution = (HashDistribution) requiredDistribution; + jsonGenerator.writeFieldName("keys"); + jsonGenerator.writeArray( + hashDistribution.getKeys(), + 0, // offset + hashDistribution.getKeys().length); + break; + default: + throw new TableException("Unsupported distribution type: " + type); + } + jsonGenerator.writeEndObject(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java new file mode 100644 index 0000000000000..aabacee752357 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.configuration.Configuration; + +/** + * A context to allow the store user-defined data within ExecNode serialization and deserialization. + */ +public class SerdeContext { + private final Configuration configuration; + private final ClassLoader classLoader; + + public SerdeContext(Configuration configuration, ClassLoader classLoader) { + this.configuration = configuration; + this.classLoader = classLoader; + } + + public Configuration getConfiguration() { + return configuration; + } + + public ClassLoader getClassLoader() { + return classLoader; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/CatalogTableSpecBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/CatalogTableSpecBase.java new file mode 100644 index 0000000000000..5e5d3ad9c3ed9 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/CatalogTableSpecBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.spec; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.planner.plan.nodes.exec.serde.CatalogTableJsonDeserializer; +import org.apache.flink.table.planner.plan.nodes.exec.serde.CatalogTableJsonSerializer; +import org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonDeserializer; +import org.apache.flink.table.planner.plan.nodes.exec.serde.ObjectIdentifierJsonSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CatalogTableSpecBase { + public static final String FIELD_NAME_IDENTIFIER = "identifier"; + public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable"; + + @JsonProperty(value = FIELD_NAME_IDENTIFIER, required = true) + @JsonSerialize(using = ObjectIdentifierJsonSerializer.class) + @JsonDeserialize(using = ObjectIdentifierJsonDeserializer.class) + protected final ObjectIdentifier objectIdentifier; + + @JsonProperty(value = FIELD_NAME_CATALOG_TABLE, required = true) + @JsonSerialize(using = CatalogTableJsonSerializer.class) + @JsonDeserialize(using = CatalogTableJsonDeserializer.class) + protected final CatalogTable catalogTable; + + @JsonIgnore protected ClassLoader classLoader; + + @JsonIgnore protected ReadableConfig configuration; + + protected CatalogTableSpecBase(ObjectIdentifier objectIdentifier, CatalogTable catalogTable) { + this.objectIdentifier = checkNotNull(objectIdentifier); + this.catalogTable = checkNotNull(catalogTable); + } + + public void setClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + } + + public void setReadableConfig(ReadableConfig config) { + this.configuration = config; + } + + @JsonIgnore + public ObjectIdentifier getObjectIdentifier() { + return objectIdentifier; + } + + @JsonIgnore + public CatalogTable getCatalogTable() { + return catalogTable; + } + + @VisibleForTesting + @JsonIgnore + public ClassLoader getClassLoader() { + return classLoader; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CatalogTableSpecBase that = (CatalogTableSpecBase) o; + return objectIdentifier.equals(that.objectIdentifier) + && catalogTable.toProperties().equals(that.catalogTable.toProperties()); + } + + @Override + public int hashCode() { + return Objects.hash(objectIdentifier, catalogTable.toProperties()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java new file mode 100644 index 0000000000000..0e27ae39cb0bc --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.spec; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.factories.FactoryUtil; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table source table + * and create {@link DynamicTableSource} from the deserialization result. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class DynamicTableSourceSpec extends CatalogTableSpecBase { + + @JsonIgnore private DynamicTableSource tableSource; + + @JsonCreator + public DynamicTableSourceSpec( + @JsonProperty(FIELD_NAME_IDENTIFIER) ObjectIdentifier objectIdentifier, + @JsonProperty(FIELD_NAME_CATALOG_TABLE) CatalogTable catalogTable) { + super(objectIdentifier, catalogTable); + } + + @JsonIgnore + private DynamicTableSource getTableSource() { + checkNotNull(configuration); + if (tableSource == null) { + tableSource = + FactoryUtil.createTableSource( + null, // catalog, TODO support create Factory from catalog + objectIdentifier, + catalogTable, + configuration, + classLoader, + // isTemporary, it's always true since the catalog is always null now. + true); + } + return tableSource; + } + + @JsonIgnore + public ScanTableSource getScanTableSource() { + DynamicTableSource tableSource = getTableSource(); + if (tableSource instanceof ScanTableSource) { + return (ScanTableSource) tableSource; + } else { + throw new TableException( + String.format( + "%s is not a ScanTableSource.\nplease check it.", + tableSource.getClass().getName())); + } + } + + @JsonIgnore + public LookupTableSource getLookupTableSource() { + DynamicTableSource tableSource = getTableSource(); + if (tableSource instanceof LookupTableSource) { + return (LookupTableSource) tableSource; + } else { + throw new TableException( + String.format( + "%s is not a LookupTableSource.\nplease check it.", + tableSource.getClass().getName())); + } + } + + public void setTableSource(DynamicTableSource tableSource) { + this.tableSource = tableSource; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java index 5c278a8d88bf3..995b0218b0414 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java @@ -25,19 +25,34 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + /** * Stream {@link ExecNode} to read data from an external source defined by a {@link * ScanTableSource}. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class StreamExecTableSourceScan extends CommonExecTableSourceScan implements StreamExecNode { public StreamExecTableSourceScan( - ScanTableSource tableSource, RowType outputType, String description) { - super(tableSource, outputType, description); + DynamicTableSourceSpec tableSourceSpec, RowType outputType, String description) { + super(tableSourceSpec, getNewNodeId(), outputType, description); + } + + @JsonCreator + public StreamExecTableSourceScan( + @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec tableSourceSpec, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(tableSourceSpec, id, outputType, description); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/ReflectionsUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/ReflectionsUtil.java new file mode 100644 index 0000000000000..7ea14091c21c2 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/ReflectionsUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.reflections.Reflections; + +import java.lang.reflect.Modifier; +import java.util.Set; +import java.util.stream.Collectors; + +/** An utility class for reflection operations on classes. */ +public class ReflectionsUtil { + + public static Set> scanSubClasses( + String packageName, Class targetClass) { + return scanSubClasses(packageName, targetClass, false, false); + } + + public static Set> scanSubClasses( + String packageName, + Class targetClass, + boolean includingInterface, + boolean includingAbstractClass) { + Reflections reflections = new Reflections(packageName); + return reflections.getSubTypesOf(targetClass).stream() + .filter( + c -> { + if (c.isInterface()) { + return includingInterface; + } else if (Modifier.isAbstract(c.getModifiers())) { + return includingAbstractClass; + } else { + return true; + } + }) + .collect(Collectors.toSet()); + } + + private ReflectionsUtil() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala index 0de7951f19eb2..b19b392e97bb6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala @@ -21,8 +21,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.ExecNode import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan import org.apache.flink.table.planner.plan.schema.TableSourceTable +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode @@ -57,8 +59,15 @@ class BatchPhysicalTableSourceScan( } override def translateToExecNode(): ExecNode[_] = { + val tableSourceSpec = new DynamicTableSourceSpec( + tableSourceTable.tableIdentifier, + tableSourceTable.catalogTable) + tableSourceSpec.setTableSource(tableSourceTable.tableSource) + val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this) + tableSourceSpec.setReadableConfig(tableConfig.getConfiguration) + new BatchExecTableSourceScan( - tableSource, + tableSourceSpec, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala index 99340339cdbfa..22f2a9e10065a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala @@ -18,11 +18,14 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream +import org.apache.flink.configuration.Configuration import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.ExecNode +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan import org.apache.flink.table.planner.plan.schema.TableSourceTable +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode @@ -52,8 +55,15 @@ class StreamPhysicalTableSourceScan( } override def translateToExecNode(): ExecNode[_] = { + val tableSourceSpec = new DynamicTableSourceSpec( + tableSourceTable.tableIdentifier, + tableSourceTable.catalogTable) + tableSourceSpec.setTableSource(tableSource) + val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this) + tableSourceSpec.setReadableConfig(tableConfig.getConfiguration) + new StreamExecTableSourceScan( - tableSource, + tableSourceSpec, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java index 0e732460d09e9..7b8cfe39ddc88 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java @@ -59,6 +59,11 @@ public List> getInputNodes() { return inputEdges.stream().map(ExecEdge::getSource).collect(Collectors.toList()); } + @Override + public int getId() { + return 0; + } + @Override public String getDescription() { return description; diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java new file mode 100644 index 0000000000000..478bf351a6475 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +/** Tests for {@link DynamicTableSourceSpec} serialization and deserialization. */ +@RunWith(Parameterized.class) +public class DynamicTableSourceSpecSerdeTest { + + @Parameterized.Parameter public DynamicTableSourceSpec spec; + + @Test + public void testDynamicTableSourceSpecSerde() throws IOException { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + SerdeContext serdeCtx = new SerdeContext(new Configuration(), classLoader); + ObjectMapper mapper = JsonSerdeUtil.createObjectMapper(serdeCtx); + SimpleModule module = new SimpleModule(); + module.addDeserializer( + DynamicTableSourceSpec.class, new DynamicTableSourceSpecJsonDeserializer()); + mapper.registerModule(module); + StringWriter writer = new StringWriter(100); + try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) { + gen.writeObject(spec); + } + String json = writer.toString(); + DynamicTableSourceSpec actual = mapper.readValue(json, DynamicTableSourceSpec.class); + assertEquals(spec, actual); + assertSame(classLoader, actual.getClassLoader()); + assertNotNull(actual.getScanTableSource()); + } + + @Parameterized.Parameters(name = "{0}") + public static List testData() { + Map properties = new HashMap<>(); + properties.put("connector", "filesystem"); + properties.put("format", "testcsv"); + properties.put("path", "/tmp"); + properties.put("schema.0.name", "a"); + properties.put("schema.0.data-type", "BIGINT"); + + DynamicTableSourceSpec spec = + new DynamicTableSourceSpec( + ObjectIdentifier.of("default_catalog", "default_db", "MyTable"), + CatalogTableImpl.fromProperties(properties)); + + return Collections.singletonList(spec); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/InputPropertySerdeTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/InputPropertySerdeTest.java new file mode 100644 index 0000000000000..12c53e46d820f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/InputPropertySerdeTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link InputProperty} serialization and deserialization. */ +@RunWith(Parameterized.class) +public class InputPropertySerdeTest { + @Parameterized.Parameter public InputProperty inputProperty; + + @Test + public void testExecEdgeSerde() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + StringWriter writer = new StringWriter(100); + try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) { + gen.writeObject(inputProperty); + } + String json = writer.toString(); + InputProperty actual = mapper.readValue(json, InputProperty.class); + assertEquals(inputProperty, actual); + } + + @Parameterized.Parameters(name = "{0}") + public static List testData() { + return Arrays.asList( + InputProperty.DEFAULT, + InputProperty.builder() + .requiredDistribution(InputProperty.hashDistribution(new int[] {0, 1})) + .damBehavior(InputProperty.DamBehavior.BLOCKING) + .priority(0) + .build(), + InputProperty.builder() + .requiredDistribution(InputProperty.BROADCAST_DISTRIBUTION) + .damBehavior(InputProperty.DamBehavior.END_INPUT) + .priority(0) + .build(), + InputProperty.builder() + .requiredDistribution(InputProperty.SINGLETON_DISTRIBUTION) + .damBehavior(InputProperty.DamBehavior.END_INPUT) + .priority(1) + .build(), + InputProperty.builder() + .requiredDistribution(InputProperty.ANY_DISTRIBUTION) + .damBehavior(InputProperty.DamBehavior.PIPELINED) + .priority(2) + .build(), + InputProperty.builder() + .requiredDistribution(InputProperty.UNKNOWN_DISTRIBUTION) + .damBehavior(InputProperty.DamBehavior.PIPELINED) + .priority(0) + .build()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeCoverageTest.java new file mode 100644 index 0000000000000..224be524eef61 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeCoverageTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.planner.plan.utils.ReflectionsUtil; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.UnresolvedUserDefinedType; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; + +/** + * Test to check whether all {@link LogicalType}s have been tested in {@link LogicalTypeSerdeTest}. + */ +public class LogicalTypeSerdeCoverageTest { + + @Test + public void testLogicalTypeJsonSerdeCoverage() { + Set> allSubClasses = + ReflectionsUtil.scanSubClasses("org.apache.flink", LogicalType.class); + + Set> remainingSubClasses = new HashSet<>(allSubClasses); + // An unresolved user-defined type needs to be resolved into a proper user-defined type, + // so we ignore it here + remainingSubClasses.remove(UnresolvedUserDefinedType.class); + Set> testedSubClasses = new HashSet<>(); + for (LogicalType logicalType : LogicalTypeSerdeTest.testData()) { + testedSubClasses.add(logicalType.getClass()); + } + remainingSubClasses.removeAll(testedSubClasses); + assertTrue( + String.format( + "[%s] are not tested in LogicalTypeSerdeTest.", + remainingSubClasses.stream() + .map(Class::getSimpleName) + .collect(Collectors.joining(","))), + remainingSubClasses.isEmpty()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java new file mode 100644 index 0000000000000..0890ec16052e8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.expressions.TimeIntervalUnit; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.TypeInformationRawType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link LogicalType} serialization and deserialization. */ +@RunWith(Parameterized.class) +public class LogicalTypeSerdeTest { + + @Parameterized.Parameter public LogicalType logicalType; + + @Test + public void testLogicalTypeSerde() throws IOException { + SerdeContext serdeCtx = + new SerdeContext( + new Configuration(), Thread.currentThread().getContextClassLoader()); + ObjectMapper mapper = JsonSerdeUtil.createObjectMapper(serdeCtx); + SimpleModule module = new SimpleModule(); + + module.addSerializer(new LogicalTypeJsonSerializer()); + module.addSerializer(new ObjectIdentifierJsonSerializer()); + module.addDeserializer(LogicalType.class, new LogicalTypeJsonDeserializer()); + mapper.registerModule(module); + StringWriter writer = new StringWriter(100); + try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) { + gen.writeObject(logicalType); + } + String json = writer.toString(); + LogicalType actual = mapper.readValue(json, LogicalType.class); + assertEquals(logicalType, actual); + } + + @Parameterized.Parameters(name = "{0}") + public static List testData() { + List types = + Arrays.asList( + new BooleanType(), + new TinyIntType(), + new SmallIntType(), + new IntType(), + new BigIntType(), + new FloatType(), + new DoubleType(), + new DecimalType(10), + new DecimalType(15, 5), + new CharType(), + new CharType(5), + new VarCharType(), + new VarCharType(5), + new BinaryType(), + new BinaryType(100), + new VarBinaryType(), + new VarBinaryType(100), + new DateType(), + new TimeType(), + new TimeType(3), + new TimestampType(), + new TimestampType(3), + new TimestampType(false, TimestampKind.PROCTIME, 3), + new TimestampType(false, TimestampKind.ROWTIME, 3), + new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR), + new DayTimeIntervalType( + false, DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR, 3, 6), + new YearMonthIntervalType( + YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH), + new YearMonthIntervalType( + false, YearMonthIntervalType.YearMonthResolution.MONTH, 2), + new ZonedTimestampType(), + new ZonedTimestampType(false, TimestampKind.PROCTIME, 3), + new LocalZonedTimestampType(), + new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3), + new SymbolType<>(TimeIntervalUnit.class), + new TypeInformationRawType<>(), + new TypeInformationRawType<>(Types.STRING), + new LegacyTypeInformationType<>(LogicalTypeRoot.RAW, Types.STRING), + new ArrayType(new IntType(false)), + new MapType(new BigIntType(), new IntType(false)), + new MultisetType(new IntType(false)), + RowType.of(new BigIntType(), new IntType(false), new VarCharType(200)), + RowType.of( + new LogicalType[] { + new BigIntType(), new IntType(false), new VarCharType(200) + }, + new String[] {"f1", "f2", "f3"}), + StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "structuredType"), + PojoClass.class) + .attributes( + Arrays.asList( + new StructuredType.StructuredAttribute( + "f0", new IntType(true)), + new StructuredType.StructuredAttribute( + "f1", new BigIntType(true)), + new StructuredType.StructuredAttribute( + "f2", new VarCharType(200), "desc"))) + .comparision(StructuredType.StructuredComparision.FULL) + .setFinal(false) + .setInstantiable(false) + .superType( + StructuredType.newBuilder( + ObjectIdentifier.of( + "cat", "db", "structuredType2")) + .attributes( + Collections.singletonList( + new StructuredType + .StructuredAttribute( + "f0", + new BigIntType(false)))) + .build()) + .description("description for StructuredType") + .build(), + DistinctType.newBuilder( + ObjectIdentifier.of("cat", "db", "distinctType"), + new VarCharType(5)) + .build(), + DistinctType.newBuilder( + ObjectIdentifier.of("cat", "db", "distinctType"), + new VarCharType(false, 5)) + .build(), + new RawType<>( + PojoClass.class, + new KryoSerializer<>(PojoClass.class, new ExecutionConfig()))); + List newTypes = new ArrayList<>(); + // consider nullable + for (LogicalType type : types) { + newTypes.add(type.copy(true)); + newTypes.add(type.copy(false)); + } + // ignore nullable for NullType + newTypes.add(new NullType()); + return newTypes; + } + + /** Testing class. */ + public static class PojoClass { + private final int f0; + private final long f1; + private final String f2; + + public PojoClass(int f0, long f1, String f2) { + this.f0 = f0; + this.f1 = f1; + this.f2 = f2; + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java new file mode 100644 index 0000000000000..f157ebd9dcf01 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil; +import org.apache.flink.table.planner.plan.utils.ReflectionsUtil; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; + +/** + * Test to check whether all {@link StreamExecNode}s have been implemented json + * serialization/deserialization. + */ +public class JsonSerdeCoverageTest { + + private static final List UNSUPPORTED_JSON_SERDE_CLASSES = + Arrays.asList( + "StreamExecDataStreamScan", + "StreamExecLegacyTableSourceScan", + "StreamExecSink", + "StreamExecLegacySink", + "StreamExecChangelogNormalize", + "StreamExecJoin", + "StreamExecIntervalJoin", + "StreamExecLookupJoin", + "StreamExecTemporalJoin", + "StreamExecGroupAggregate", + "StreamExecPythonGroupAggregate", + "StreamExecLocalGroupAggregate", + "StreamExecGlobalGroupAggregate", + "StreamExecIncrementalGroupAggregate", + "StreamExecGroupWindowAggregate", + "StreamExecPythonGroupWindowAggregate", + "StreamExecGroupTableAggregate", + "StreamExecPythonGroupTableAggregate", + "StreamExecOverAggregate", + "StreamExecPythonOverAggregate", + "StreamExecCorrelate", + "StreamExecPythonCorrelate", + "StreamExecRank", + "StreamExecCalc", + "StreamExecPythonCalc", + "StreamExecLimit", + "StreamExecSortLimit", + "StreamExecDropUpdateBefore", + "StreamExecTemporalSort", + "StreamExecSort", + "StreamExecExpand", + "StreamExecMultipleInput", + "StreamExecWatermarkAssigner", + "StreamExecMiniBatchAssigner", + "StreamExecMatch", + "StreamExecUnion", + "StreamExecExchange", + "StreamExecValues", + "StreamExecDeduplicate"); + + @SuppressWarnings({"rawtypes"}) + @Test + public void testStreamExecNodeJsonSerdeCoverage() { + Set> subClasses = + ReflectionsUtil.scanSubClasses("org.apache.flink", StreamExecNode.class); + + List classes = new ArrayList<>(); + List classesWithoutJsonCreator = new ArrayList<>(); + List classesWithJsonCreatorInUnsupportedList = new ArrayList<>(); + for (Class clazz : subClasses) { + String className = clazz.getSimpleName(); + classes.add(className); + boolean hasJsonCreator = JsonSerdeUtil.hasJsonCreatorAnnotation(clazz); + if (hasJsonCreator && UNSUPPORTED_JSON_SERDE_CLASSES.contains(className)) { + classesWithJsonCreatorInUnsupportedList.add(className); + } + if (!hasJsonCreator && !UNSUPPORTED_JSON_SERDE_CLASSES.contains(className)) { + classesWithoutJsonCreator.add(className); + } + } + assertTrue( + String.format( + "%s do not support json serialization/deserialization, " + + "please refer the implementation of the other StreamExecNodes.", + String.join(",", classesWithoutJsonCreator)), + classesWithoutJsonCreator.isEmpty()); + assertTrue( + String.format( + "%s have support json serialization/deserialization, " + + "but still in UNSUPPORTED_JSON_SERDE_CLASSES list. " + + "please move them from UNSUPPORTED_JSON_SERDE_CLASSES.", + String.join(",", classesWithJsonCreatorInUnsupportedList)), + classesWithJsonCreatorInUnsupportedList.isEmpty()); + List notExistingClasses = + UNSUPPORTED_JSON_SERDE_CLASSES.stream() + .filter(c -> !classes.contains(c)) + .collect(Collectors.toList()); + assertTrue( + String.format( + "%s do not exist any more, please remove them from UNSUPPORTED_JSON_SERDE_CLASSES.", + String.join(",", notExistingClasses)), + notExistingClasses.isEmpty()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/utils/ReflectionsUtilTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/utils/ReflectionsUtilTest.java new file mode 100644 index 0000000000000..822442f37be2d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/utils/ReflectionsUtilTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** Test for {@link ReflectionsUtil}. */ +@RunWith(Parameterized.class) +public class ReflectionsUtilTest { + + @Parameterized.Parameter public boolean includingInterface; + + @Parameterized.Parameter(1) + public boolean includingAbstractClass; + + @Test + public void testScanSubClasses() { + Set> actual = + ReflectionsUtil.scanSubClasses( + ReflectionsUtilTest.class.getPackage().getName(), + TestInterface.class, + includingInterface, + includingAbstractClass); + Set> expected = new HashSet<>(); + expected.add(TestClass1.class); + expected.add(TestClass2.class); + expected.add(TestClass3.class); + if (includingInterface) { + expected.add(TestSubInterface.class); + } + if (includingAbstractClass) { + expected.add(TestAbstractClass.class); + } + assertEquals(expected, actual); + } + + @Parameterized.Parameters(name = "includingInterface={0}, includingAbstractClass={1}") + public static Object[][] testData() { + return new Object[][] { + new Object[] {false, false}, + new Object[] {true, false}, + new Object[] {false, true}, + new Object[] {true, true} + }; + } + + /** Testing interface. */ + public interface TestInterface {} + + /** Testing interface. */ + public interface TestSubInterface extends TestInterface {} + + /** Testing abstract class. */ + public abstract static class TestAbstractClass implements TestSubInterface {} + + /** Testing class. */ + public static class TestClass1 implements TestInterface {} + + /** Testing class. */ + public static class TestClass2 implements TestSubInterface {} + + /** Testing class. */ + public static class TestClass3 extends TestAbstractClass {} +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala index 327aebb4c8df0..f91c00c1622d6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate, BatchPhysicalGroupAggregateBase} +import org.apache.flink.table.planner.plan.utils.ReflectionsUtil import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.{Aggregate, Correlate} @@ -105,12 +106,9 @@ class MetadataHandlerConsistencyTest( * @return A list contains all subclasses of [[MetadataHandler]] in flink. */ private def fetchAllExtendedMetadataHandlers: Seq[Class[_ <: MetadataHandler[_]]] = { - val reflections = new Reflections( - new ConfigurationBuilder() - .useParallelExecutor(Runtime.getRuntime.availableProcessors) - .addUrls(ClasspathHelper.forPackage("org.apache.flink.table.planner.plan.cost"))) - reflections.getSubTypesOf(classOf[MetadataHandler[_]]).filter( - mdhClass => !mdhClass.isInterface && !Modifier.isAbstract(mdhClass.getModifiers)).toList + ReflectionsUtil.scanSubClasses( + "org.apache.flink.table.planner.plan.cost", + classOf[MetadataHandler[_]]).toSeq } /** diff --git a/pom.xml b/pom.xml index 0dce4054418c8..0b23135ebac2c 100644 --- a/pom.xml +++ b/pom.xml @@ -1457,6 +1457,7 @@ under the License. flink-table/flink-table-planner/src/test/scala/resources/*.out flink-table/flink-table-planner-blink/src/test/resources/digest/*.out flink-table/flink-table-planner-blink/src/test/resources/explain/*.out + flink-table/flink-table-planner-blink/src/test/resources/jsonplan/* flink-yarn/src/test/resources/krb5.keytab flink-end-to-end-tests/test-scripts/test-data/** flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks