Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect comparator usage in FinalizingFieldAccessPostAggregator #16555

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation;

import org.apache.druid.error.DruidException;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ObjectStrategy;
import org.joda.time.DateTime;

import java.nio.ByteBuffer;

public class DateTimeSerde extends BasicComplexMetricSerde<DateTime>
{
public static final ColumnType TYPE = ColumnType.ofComplex("dateTime");

public DateTimeSerde()
{
super(TYPE, new DateTimeObjectStrategy());
}

static class DateTimeObjectStrategy implements ObjectStrategy<DateTime>
{

@Override
public int compare(DateTime o1, DateTime o2)
{
return o1.compareTo(o2);
}

@Override
public Class<? extends DateTime> getClazz()
{
return DateTime.class;
}

@Override
public DateTime fromByteBuffer(ByteBuffer buffer, int numBytes)
{
throw DruidException.defensive("not supported");
}

@Override
public byte[] toBytes(DateTime val)
{
throw DruidException.defensive("not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public abstract class TimestampAggregatorFactory extends AggregatorFactory
{
public static final ColumnType FINALIZED_TYPE = ColumnType.ofComplex("dateTime");
public static final ColumnType FINALIZED_TYPE = DateTimeSerde.TYPE;
final String name;
@Nullable
final String fieldName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.serde.ComplexMetrics;

import java.util.Collections;
import java.util.List;
Expand All @@ -45,6 +47,13 @@ public List<? extends Module> getJacksonModules()
@Override
public void configure(Binder binder)
{
registerSerde();

}

@VisibleForTesting
public static void registerSerde()
{
ComplexMetrics.registerSerde(DateTimeSerde.TYPE.getComplexTypeName(), new DateTimeSerde());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@

public class TimestampMinMaxAggregatorFactoryTest
{
{
TimestampMinMaxModule.registerSerde();
}

private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();

@Test
Expand Down Expand Up @@ -73,49 +77,48 @@ public void testSerde() throws JsonProcessingException
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(TimestampMinAggregatorFactory.class)
.withNonnullFields("name", "comparator", "initValue")
.withIgnoredFields("timestampSpec")
.usingGetClass()
.verify();
.withNonnullFields("name", "comparator", "initValue")
.withIgnoredFields("timestampSpec")
.usingGetClass()
.verify();
EqualsVerifier.forClass(TimestampMaxAggregatorFactory.class)
.withNonnullFields("name", "comparator", "initValue")
.withIgnoredFields("timestampSpec")
.usingGetClass()
.verify();
.withNonnullFields("name", "comparator", "initValue")
.withIgnoredFields("timestampSpec")
.usingGetClass()
.verify();
}

@Test
public void testResultArraySignature()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource("dummy")
.intervals("2000/3000")
.granularity(Granularities.HOUR)
.aggregators(
new CountAggregatorFactory("count"),
new TimestampMaxAggregatorFactory("timeMax", "__time", null),
new TimestampMinAggregatorFactory("timeMin", "__time", null)
)
.postAggregators(
new FieldAccessPostAggregator("timeMax-access", "timeMax"),
new FinalizingFieldAccessPostAggregator("timeMax-finalize", "timeMax"),
new FieldAccessPostAggregator("timeMin-access", "timeMin"),
new FinalizingFieldAccessPostAggregator("timeMin-finalize", "timeMin")
)
.build();
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("dummy")
.intervals("2000/3000")
.granularity(Granularities.HOUR)
.aggregators(
new CountAggregatorFactory("count"),
new TimestampMaxAggregatorFactory("timeMax", "__time", null),
new TimestampMinAggregatorFactory("timeMin", "__time", null)
)
.postAggregators(
new FieldAccessPostAggregator("timeMax-access", "timeMax"),
new FinalizingFieldAccessPostAggregator("timeMax-finalize", "timeMax"),
new FieldAccessPostAggregator("timeMin-access", "timeMin"),
new FinalizingFieldAccessPostAggregator("timeMin-finalize", "timeMin")
)
.build();

Assert.assertEquals(
RowSignature.builder()
.addTimeColumn()
.add("count", ColumnType.LONG)
.add("timeMax", null)
.add("timeMin", null)
.add("timeMax-access", ColumnType.LONG)
.add("timeMax-finalize", TimestampAggregatorFactory.FINALIZED_TYPE)
.add("timeMin-access", ColumnType.LONG)
.add("timeMin-finalize", TimestampAggregatorFactory.FINALIZED_TYPE)
.build(),
.addTimeColumn()
.add("count", ColumnType.LONG)
.add("timeMax", null)
.add("timeMin", null)
.add("timeMax-access", ColumnType.LONG)
.add("timeMax-finalize", TimestampAggregatorFactory.FINALIZED_TYPE)
.add("timeMin-access", ColumnType.LONG)
.add("timeMin-finalize", TimestampAggregatorFactory.FINALIZED_TYPE)
.build(),
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Floats;
import org.apache.druid.segment.column.ColumnType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -62,6 +63,7 @@ public class ApproximateHistogram
// use sign bit to indicate approximate bin and remaining bits for bin count
private static final long APPROX_FLAG_BIT = Long.MIN_VALUE;
private static final long COUNT_BITS = Long.MAX_VALUE;
public static final ColumnType TYPE = ColumnType.ofComplex("approximateHistogram");

@Override
public boolean equals(Object o)
Expand Down Expand Up @@ -840,20 +842,20 @@ protected int ruleCombineBins(

/**
* mergeBins performs the given number of bin merge operations on the given histogram
*
*
* It repeatedly merges the two closest bins until it has performed the requested number of merge operations.
* Merges are done in-place and unused bins have unknown state
*
*
* next / prev maintains a doubly-linked list of valid bin indices into the mergedBins array.
*
*
* Fast operation is achieved by building a min-heap of the deltas as opposed to repeatedly
* scanning the array of deltas to find the minimum. A reverse index into the heap is maintained
* to allow deleting and updating of specific deltas.
*
*
* next and prev arrays are used to maintain indices to the previous / next valid bin from a given bin index
*
*
* Its effect is equivalent to running the following code:
*
*
* <pre>
* ApproximateHistogram merged = new ApproximateHistogram(mergedBinCount, mergedPositions, mergedBins);
*
Expand Down Expand Up @@ -1198,7 +1200,7 @@ public void toBytes(ByteBuffer buf)

/**
* Writes the dense representation of this ApproximateHistogram object to the given byte-buffer
*
*
* Requires 16 + 12 * size bytes of storage
*
* @param buf ByteBuffer to write the ApproximateHistogram to
Expand All @@ -1219,7 +1221,7 @@ public void toBytesDense(ByteBuffer buf)

/**
* Writes the sparse representation of this ApproximateHistogram object to the given byte-buffer
*
*
* Requires 16 + 12 * binCount bytes of storage
*
* @param buf ByteBuffer to write the ApproximateHistogram to
Expand All @@ -1241,7 +1243,7 @@ public void toBytesSparse(ByteBuffer buf)
/**
* Returns a compact byte-buffer representation of this ApproximateHistogram object
* storing actual values as opposed to histogram bins
*
*
* Requires 3 + 4 * count bytes of storage with count &lt;= 127
*
* @param buf ByteBuffer to write the ApproximateHistogram to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.HistogramAggregatorFactory;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
Expand All @@ -43,6 +42,7 @@
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -52,7 +52,7 @@
@JsonTypeName("approxHistogram")
public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
{
public static final ColumnType TYPE = ColumnType.ofComplex("approximateHistogram");
public static final ColumnType TYPE = ApproximateHistogram.TYPE;
protected final String name;
protected final String fieldName;

Expand Down Expand Up @@ -322,7 +322,7 @@ public ColumnType getIntermediateType()
@Override
public ColumnType getResultType()
{
return finalizeAsBase64Binary ? TYPE : HistogramAggregatorFactory.TYPE;
return finalizeAsBase64Binary ? ApproximateHistogram.TYPE : Histogram.TYPE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,6 @@ public static void registerSerde()
{
ComplexMetrics.registerSerde(ApproximateHistogramFoldingSerde.TYPE_NAME, new ApproximateHistogramFoldingSerde());
ComplexMetrics.registerSerde(FixedBucketsHistogramAggregator.TYPE_NAME, new FixedBucketsHistogramSerde());
ComplexMetrics.registerSerde(Histogram.TYPE.getComplexTypeName(), new HistogramSerde2());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.HistogramAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -78,7 +77,7 @@ public Object compute(Map<String, Object> values)
@Override
public ColumnType getType(ColumnInspector signature)
{
return HistogramAggregatorFactory.TYPE;
return Histogram.TYPE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.HistogramAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -71,7 +70,7 @@ public Object compute(Map<String, Object> values)
@Override
public ColumnType getType(ColumnInspector signature)
{
return HistogramAggregatorFactory.TYPE;
return Histogram.TYPE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.HistogramAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -75,7 +74,7 @@ public Object compute(Map<String, Object> values)
@Override
public ColumnType getType(ColumnInspector signature)
{
return HistogramAggregatorFactory.TYPE;
return Histogram.TYPE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@
package org.apache.druid.query.aggregation.histogram;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.column.ColumnType;

import java.util.Arrays;

public class Histogram
{
public static final ColumnType TYPE = ColumnType.ofComplex("histogram2");

double[] breaks;
double[] counts;

public Histogram(float[] breaks, double[] counts)
{
double[] retVal = new double[breaks.length];
for (int i = 0; i < breaks.length; ++i) {
retVal[i] = (double) breaks[i];
retVal[i] = breaks[i];
}

this.breaks = retVal;
Expand Down
Loading
Loading