Skip to content

Commit

Permalink
Add columnMappings to explain plan output (#14187)
Browse files Browse the repository at this point in the history
* Add columnMappings to explain plan output

* * fix checkstyle
* add tests

* * improve test coverage

* * temporarily remove unit-test need to run ITs

* * depend on build

* * temporarily lower unit test threshold

* * add back dependency on unit-tests

* * add license headers

* * fix header order

* * review comments

* * fix intellij inspection errors

* * revert code coverage change
  • Loading branch information
zachjsh committed May 4, 2023
1 parent edfd46e commit 48cde23
Show file tree
Hide file tree
Showing 23 changed files with 201 additions and 37 deletions.
10 changes: 10 additions & 0 deletions docs/querying/sql-translation.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ The EXPLAIN PLAN statement returns the following result with plan, resources, an
"name": "a0",
"type": "LONG"
}
],
"columnMappings": [
{
"queryColumn": "d0",
"outputColumn": "channel"
},
{
"queryColumn": "a0",
"outputColumn": "EXPR$1"
}
]
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.ColumnMapping;
import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.InputChannelsImpl;
Expand Down Expand Up @@ -185,6 +183,8 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.timeline.DataSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.sql.calcite.planner.ColumnMappings;

import javax.annotation.Nullable;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Period;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.joda.time.DateTimeZone;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class QueryKitUtils
/**
* Enables QueryKit-generated processors to understand which output column will be mapped to
* {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME}. Necessary because {@link QueryKit}
* does not get direct access to {@link org.apache.druid.msq.indexing.ColumnMappings}.
* does not get direct access to {@link ColumnMappings}.
*/
public static final String CTX_TIME_COLUMN_NAME = "__timeColumn";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.indexing.ColumnMapping;
import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQDestination;
Expand All @@ -54,7 +52,10 @@
import org.apache.druid.server.QueryResponse;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.QueryUtils;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.Grouping;
import org.apache.druid.sql.calcite.run.QueryMaker;
Expand Down Expand Up @@ -173,11 +174,10 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
finalizeAggregations ? null /* Not needed */ : buildAggregationIntermediateTypeMap(druidQuery);

final List<SqlTypeName> sqlTypeNames = new ArrayList<>();
final List<ColumnMapping> columnMappings = new ArrayList<>();
final List<ColumnMapping> columnMappings = QueryUtils.buildColumnMappings(fieldMapping, druidQuery);

for (final Pair<Integer, String> entry : fieldMapping) {
final String queryColumn = druidQuery.getOutputRowSignature().getColumnName(entry.getKey());
final String outputColumns = entry.getValue();

final SqlTypeName sqlTypeName;

Expand All @@ -189,7 +189,6 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
}

sqlTypeNames.add(sqlTypeName);
columnMappings.add(new ColumnMapping(queryColumn, outputColumns));
}

final MSQDestination destination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.indexing.ColumnMapping;
import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
Expand Down Expand Up @@ -67,6 +65,8 @@
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.planner.JoinAlgorithm;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.msq.indexing;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.junit.Test;

public class ColumnMappingTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.junit.Assert;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.ColumnMapping;
import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.test.MSQTestBase;
Expand All @@ -42,6 +40,8 @@
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Assert;
import org.junit.Before;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.msq.indexing;
package org.apache.druid.sql.calcite.planner;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.msq.indexing;
package org.apache.druid.sql.calcite.planner;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
Expand All @@ -37,10 +37,10 @@
import java.util.stream.Collectors;

/**
* Maps column names from {@link MSQSpec#getQuery()} to output names desired by the user, in the order
* Maps column names from the query to output names desired by the user, in the order
* desired by the user.
*
* The {@link MSQSpec#getQuery()} is translated by {@link org.apache.druid.msq.querykit.QueryKit} into
* The query is translated by {@link org.apache.druid.msq.querykit.QueryKit} into
* a {@link org.apache.druid.msq.kernel.QueryDefinition}. So, this class also represents mappings from
* {@link org.apache.druid.msq.kernel.QueryDefinition#getFinalStageDefinition()} into the output names desired
* by the user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private PlannerResult planWithBindableConvention()
handlerContext.hook().captureBindableRel(bindableRel);
PlannerContext plannerContext = handlerContext.plannerContext();
if (explain != null) {
return planExplanation(bindableRel, false);
return planExplanation(rootQueryRel, bindableRel, false);
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = plannerContext.createDataContext(
Expand Down Expand Up @@ -352,9 +352,10 @@ public void cleanup(QueryHandler.EnumeratorIterator<Object[]> iterFromMake)
}

/**
* Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode}
* Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode} and root {@link RelRoot}
*/
protected PlannerResult planExplanation(
final RelRoot relRoot,
final RelNode rel,
final boolean isDruidConventionExplanation
)
Expand All @@ -368,7 +369,7 @@ protected PlannerResult planExplanation(
if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
DruidRel<?> druidRel = (DruidRel<?>) rel;
try {
explanation = explainSqlPlanAsNativeQueries(druidRel);
explanation = explainSqlPlanAsNativeQueries(relRoot, druidRel);
}
catch (Exception ex) {
log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan.");
Expand Down Expand Up @@ -407,11 +408,12 @@ protected PlannerResult planExplanation(
* and not indicative of the native Druid Queries which will get executed.
* This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it
*
* @param relRoot The rel root.
* @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it
* @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
* @throws JsonProcessingException
*/
private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
private String explainSqlPlanAsNativeQueries(final RelRoot relRoot, DruidRel<?> rel) throws JsonProcessingException
{
ObjectMapper jsonMapper = handlerContext.jsonMapper();
List<DruidQuery> druidQueryList;
Expand All @@ -431,6 +433,9 @@ private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcess
ObjectNode objectNode = jsonMapper.createObjectNode();
objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
objectNode.put(
"columnMappings",
jsonMapper.convertValue(QueryUtils.buildColumnMappings(relRoot.fields, druidQuery), ArrayNode.class));
nativeQueriesArrayNode.add(objectNode);
}

Expand Down Expand Up @@ -517,7 +522,7 @@ protected PlannerResult planWithDruidConvention() throws ValidationException
handlerContext.hook().captureDruidRel(druidRel);

if (explain != null) {
return planExplanation(druidRel, true);
return planExplanation(possiblyLimitedRoot, druidRel, true);
} else {
// Compute row type.
final RelDataType rowType = prepareResult.getReturnedRowType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.planner;

import org.apache.calcite.util.Pair;
import org.apache.druid.sql.calcite.rel.DruidQuery;

import java.util.ArrayList;
import java.util.List;

/**
* Utility class for queries
*/
public class QueryUtils
{

private QueryUtils()
{
}

/**
* Builds the mappings for queryColumn to outputColumn
* @param fieldMapping The field mappings
* @param druidQuery The Druid query
* @return Mappings for queryColumn to outputColumn
*/
public static List<ColumnMapping> buildColumnMappings(
final List<Pair<Integer, String>> fieldMapping,
final DruidQuery druidQuery
)
{
final List<ColumnMapping> columnMappings = new ArrayList<>();
for (final Pair<Integer, String> entry : fieldMapping) {
final String queryColumn = druidQuery.getOutputRowSignature().getColumnName(entry.getKey());
final String outputColumn = entry.getValue();
columnMappings.add(new ColumnMapping(queryColumn, outputColumn));
}

return columnMappings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public void testExplainSelectCount() throws SQLException
ImmutableMap.of(
"PLAN",
StringUtils.format(
"[{\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}},\"signature\":[{\"name\":\"a0\",\"type\":\"LONG\"}]}]",
"[{\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}},\"signature\":[{\"name\":\"a0\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"a0\",\"outputColumn\":\"cnt\"}]}]",
DUMMY_SQL_QUERY_ID
),
"RESOURCES",
Expand Down
Loading

0 comments on commit 48cde23

Please sign in to comment.