Skip to content

Commit

Permalink
Refactor GroupBy and TopN code to relax the constraint of dimensions …
Browse files Browse the repository at this point in the history
…being comparable (#15559)

The code in the groupBy engine and the topN engine assume that the dimensions are comparable and can call dimA.compareTo(dimB) to sort the dimensions and group them together.
This works well for the primitive dimensions, because they are Comparable, however falls apart when the dimensions can be arrays (or in future scenarios complex columns). In cases when the dimensions are not comparable, Druid resorts to having a wrapper type ComparableStringArray and ComparableList, which is a Comparable, based on the list comparator.
  • Loading branch information
LakshSingla authored Feb 27, 2024
1 parent 51cc729 commit 17e4f3a
Show file tree
Hide file tree
Showing 41 changed files with 900 additions and 1,231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.query.DimensionComparisonUtils;
import org.apache.druid.query.Query;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
Expand All @@ -45,11 +46,8 @@
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -370,7 +368,7 @@ private static void validateQuery(final GroupByQuery query)
for (final OrderByColumnSpec column : defaultLimitSpec.getColumns()) {
final Optional<ColumnType> type = resultSignature.getColumnType(column.getDimension());

if (!type.isPresent() || !isNaturalComparator(type.get().getType(), column.getDimensionComparator())) {
if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator(type.get().getType(), column.getDimensionComparator())) {
throw new ISE(
"Must use natural comparator for column [%s] of type [%s]",
column.getDimension(),
Expand All @@ -381,19 +379,4 @@ private static void validateQuery(final GroupByQuery query)
}
}

/**
* Only allow ordering the queries from the MSQ engine, ignoring the comparator that is set in the query. This
* function checks if it is safe to do so, which is the case if the natural comparator is used for the dimension.
* Since MSQ executes the queries planned by the SQL layer, this is a sanity check as we always add the natural
* comparator for the dimensions there
*/
private static boolean isNaturalComparator(final ValueType type, final StringComparator comparator)
{
if (StringComparators.NATURAL.equals(comparator)) {
return true;
}
return ((type == ValueType.STRING && StringComparators.LEXICOGRAPHIC.equals(comparator))
|| (type.isNumeric() && StringComparators.NUMERIC.equals(comparator)))
&& !type.isArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.ComparableIntArray;
import org.apache.druid.segment.data.ComparableList;
import org.apache.druid.segment.data.ComparableStringArray;
import org.apache.druid.segment.data.IndexedInts;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -164,10 +162,6 @@ public static List<ByteBuffer> getUtf8ByteBuffersFromStringArraySelector(
for (Object value : (Object[]) row) {
retVal.add(getUtf8ByteBufferFromString((String) value));
}
} else if (row instanceof ComparableStringArray) {
for (String value : ((ComparableStringArray) row).getDelegate()) {
retVal.add(getUtf8ByteBufferFromString(value));
}
} else {
throw new ISE("Unexpected type %s found", row.getClass().getName());
}
Expand Down Expand Up @@ -201,10 +195,6 @@ public static List<? extends Number> getNumericArrayFromObject(Object row)
for (Object value : (Object[]) row) {
retVal.add((Number) value);
}
} else if (row instanceof ComparableList) {
for (Object value : ((ComparableList) row).getDelegate()) {
retVal.add((Number) value);
}
} else if (row instanceof ComparableIntArray) {
for (int value : ((ComparableIntArray) row).getDelegate()) {
retVal.add(value);
Expand Down
10 changes: 0 additions & 10 deletions processing/src/main/java/org/apache/druid/math/expr/ExprEval.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.druid.segment.column.TypeStrategies;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.data.ComparableList;
import org.apache.druid.segment.data.ComparableStringArray;
import org.apache.druid.segment.nested.StructuredData;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -505,14 +503,6 @@ public static ExprEval bestEffortOf(@Nullable Object val)
final List<?> theList = val instanceof List ? ((List<?>) val) : Arrays.asList((Object[]) val);
return bestEffortArray(theList);
}
// handle leaky group by array types
if (val instanceof ComparableStringArray) {
return new ArrayExprEval(ExpressionType.STRING_ARRAY, ((ComparableStringArray) val).getDelegate());
}
if (val instanceof ComparableList) {
return bestEffortArray(((ComparableList) val).getDelegate());
}

// in 'best effort' mode, we couldn't possibly use byte[] as a complex or anything else useful without type
// knowledge, so lets turn it into a base64 encoded string so at least something downstream can use it by decoding
// back into bytes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;

import java.util.Comparator;

/**
* Utility class to compare dimensions
*/
public class DimensionComparisonUtils
{

/**
* Checks if the comparator is the natural comparator for the given type. Natural comparator for a type is the
* comparator that will yield the same results as a.compareTo(b) for objects a, b belonging to that valueType. In such
* cases, it'll be faster to directly use the comparison methods provided by Java or the library, as using the
* StringComparator would be slow.
* MSQ only supports natural comparators
*/
public static boolean isNaturalComparator(final ValueType type, final StringComparator comparator)
{
if (StringComparators.NATURAL.equals(comparator)) {
return true;
}
return ((type == ValueType.STRING && StringComparators.LEXICOGRAPHIC.equals(comparator))
|| (type.isNumeric() && StringComparators.NUMERIC.equals(comparator)))
&& !type.isArray();
}

/**
* Creates a list comparator with custom comparator for the elements. This is used to support weird usecases where the
* list dimensions can have custom comparators, and cannot be handled with
* {@link ColumnCapabilities#getNullableStrategy()}. These queries can only be generated with native queries.
*/
public static class ArrayComparator<T> implements Comparator<Object[]>
{
/**
* Custom element comparator. The comparator should handle null types as well
*/
private final Comparator<T> elementComparator;

public ArrayComparator(Comparator<T> elementComparator)
{
this.elementComparator = elementComparator;
}

@Override
public int compare(Object[] lhs, Object[] rhs)
{
//noinspection ArrayEquality
if (lhs == rhs) {
return 0;
}

if (lhs == null) {
return -1;
}

if (rhs == null) {
return 1;
}

final int minSize = Math.min(lhs.length, rhs.length);

for (int index = 0; index < minSize; ++index) {
Object lhsElement = lhs[index];
Object rhsElement = rhs[index];
int cmp = elementComparator.compare(coerceElement(lhsElement), coerceElement(rhsElement));
if (cmp != 0) {
return cmp;
}
}

if (lhs.length == rhs.length) {
return 0;
} else if (lhs.length < rhs.length) {
return -1;
}
return 1;
}

protected T coerceElement(Object element)
{
return (T) element;
}
}

/**
* Array comparator that converts the elements to their string representation, before comparing the values using
* the provided {@link StringComparator}. It can be used when the user provides weird comparators for their
* string arrays
*/
public static class ArrayComparatorForUnnaturalStringComparator extends ArrayComparator<String>
{
public ArrayComparatorForUnnaturalStringComparator(StringComparator elementComparator)
{
super(elementComparator);
}

@Override
protected String coerceElement(Object element)
{
return String.valueOf(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.ComparableList;
import org.apache.druid.segment.data.ComparableStringArray;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -789,14 +787,14 @@ private static int compareDimsForLimitPushDown(
dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
}
} else if (dimensionType.equals(ColumnType.STRING_ARRAY)) {
final ComparableStringArray lhsArr = DimensionHandlerUtils.convertToComparableStringArray(lhsObj);
final ComparableStringArray rhsArr = DimensionHandlerUtils.convertToComparableStringArray(rhsObj);
dimCompare = Comparators.<Comparable>naturalNullsFirst().compare(lhsArr, rhsArr);
final Object[] lhsArr = DimensionHandlerUtils.coerceToStringArray(lhsObj);
final Object[] rhsArr = DimensionHandlerUtils.coerceToStringArray(rhsObj);
dimCompare = ColumnType.STRING_ARRAY.getNullableStrategy().compare(lhsArr, rhsArr);
} else if (dimensionType.equals(ColumnType.LONG_ARRAY)
|| dimensionType.equals(ColumnType.DOUBLE_ARRAY)) {
final ComparableList lhsArr = DimensionHandlerUtils.convertToList(lhsObj, dimensionType.getElementType().getType());
final ComparableList rhsArr = DimensionHandlerUtils.convertToList(rhsObj, dimensionType.getElementType().getType());
dimCompare = Comparators.<Comparable>naturalNullsFirst().compare(lhsArr, rhsArr);
final Object[] lhsArr = DimensionHandlerUtils.convertToArray(lhsObj, dimensionType.getElementType());
final Object[] rhsArr = DimensionHandlerUtils.convertToArray(rhsObj, dimensionType.getElementType());
dimCompare = dimensionType.getNullableStrategy().compare(lhsArr, rhsArr);
} else {
dimCompare = comparator.compare((String) lhsObj, (String) rhsObj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

package org.apache.druid.query.groupby.epinephelinae;

import it.unimi.dsi.fastutil.Hash;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.segment.DimensionDictionary;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
Expand All @@ -45,6 +51,10 @@ public static <T> List<T> createDictionary()
/**
* Creates a reverse dictionary (value -> dictionary ID). If a value is not present in the reverse dictionary,
* {@link Object2IntMap#getInt} will return {@link DimensionDictionary#ABSENT_VALUE_ID}.
*
* WARNING: This assumes that the .hashCode and the .equals of the method are implemented correctly. This does not
* apply for primitive array types, which donot consider new Object[]{1L, 2L} = new Object[]{1, 2}. For such objects,
* (especially arrays), a custom hash strategy must be passed.
*/
public static <T> Object2IntMap<T> createReverseDictionary()
{
Expand All @@ -53,6 +63,39 @@ public static <T> Object2IntMap<T> createReverseDictionary()
return m;
}

private static <T> Object2IntMap<T> createReverseDictionary(final Hash.Strategy<T> hashStrategy)
{
final Object2IntOpenCustomHashMap<T> m = new Object2IntOpenCustomHashMap<>(hashStrategy);
m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID);
return m;
}

/**
* Creates a reverse dictionary for arrays of primitive types.
*/
public static Object2IntMap<Object[]> createReverseDictionaryForPrimitiveArray(TypeSignature<ValueType> arrayType)
{
if (!arrayType.isPrimitiveArray()) {
throw DruidException.defensive("Dictionary building function expected an array of a primitive type");
}
return createReverseDictionary(new Hash.Strategy<Object[]>()
{
@Override
public int hashCode(Object[] o)
{
// We don't do a deep comparison, because the array type is primitive, therefore we don't need to incur the extra
// overhead of checking the nestings
return Arrays.hashCode(o);
}

@Override
public boolean equals(Object[] a, Object[] b)
{
return arrayType.getNullableStrategy().compare(a, b) == 0;
}
});
}

/**
* Estimated footprint of a new entry.
*/
Expand Down
Loading

0 comments on commit 17e4f3a

Please sign in to comment.