Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect comparator usage in FinalizingFieldAccessPostAggregator #16555

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
add serde for histogram/histogramvisual
  • Loading branch information
kgyrtkirk committed Jun 7, 2024
commit 32b3978b6e1a2986e1693ae4b6765f91689d7e2a
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public String getTypeName()
}

@Override
public ComplexMetricExtractor getExtractor()
public ComplexMetricExtractor<ApproximateHistogram> getExtractor()
{
return new ComplexMetricExtractor()
return new ComplexMetricExtractor<ApproximateHistogram>()
{
@Override
public Class<ApproximateHistogram> extractedClass()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.GroupingAggregatorFactory;
import org.apache.druid.query.aggregation.Histogram;
import org.apache.druid.query.aggregation.HistogramAggregatorFactory;
import org.apache.druid.query.aggregation.HistogramSerde;
import org.apache.druid.query.aggregation.HistogramVisual;
import org.apache.druid.query.aggregation.HistogramVisualSerde;
import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
Expand Down Expand Up @@ -103,6 +107,10 @@ public AggregatorsModule()
new SerializablePairLongLongComplexMetricSerde()
);

ComplexMetrics.registerSerde(Histogram.TYPE.getComplexTypeName(), new HistogramSerde());
ComplexMetrics.registerSerde(HistogramVisual.TYPE.getComplexTypeName(), new HistogramVisualSerde());


setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation;

import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;

import java.nio.ByteBuffer;

public abstract class BasicComplexMetricSerde<T> extends ComplexMetricSerde
{
private final ColumnType type;
private final ObjectStrategy<T> objectStrategy;

public BasicComplexMetricSerde(ColumnType type, ObjectStrategy<T> objectStrategy)
{
this.type = type;
this.objectStrategy = objectStrategy;
}

@Override
public final String getTypeName()
{
return type.getComplexTypeName();
}

@Override
public final ComplexMetricExtractor<T> getExtractor()
{
return new ObjectStrategyBasedComplexMetricExtractor<T>(objectStrategy);
}

@Override
public final void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}

@Override
public ObjectStrategy<T> getObjectStrategy()
{
return objectStrategy;
}

static class ObjectStrategyBasedComplexMetricExtractor<T> implements ComplexMetricExtractor<T>
{
private final ObjectStrategy<T> objectStrategy;
private final Class<T> clazz;

public ObjectStrategyBasedComplexMetricExtractor(ObjectStrategy<T> objectStrategy)
{
this.objectStrategy = objectStrategy;
this.clazz = (Class<T>) objectStrategy.getClazz();
}

@Override
public Class<T> extractedClass()
{
return clazz;
}

@Override
public T extractValue(InputRow inputRow, String metricName)
{
Object rawValue = inputRow.getRaw(metricName);
if (clazz.isInstance(rawValue)) {
return (T) rawValue;
}
if (rawValue instanceof byte[]) {
return fromBytes((byte[]) rawValue);
}
throw new ISE(
"Object is of type[%s] that can not deserialized to type[%s].",
rawValue.getClass().getName(),
Histogram.class.getName()
);
}

private T fromBytes(byte[] rawValue)
{
ByteBuffer bb = ByteBuffer.wrap(rawValue);
return objectStrategy.fromByteBuffer(bb, bb.limit());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.column.ColumnType;

import java.nio.ByteBuffer;
import java.util.Arrays;

public class Histogram
{
public static final ColumnType TYPE = ColumnType.ofComplex("histogram");

public float[] breaks;
public long[] bins;
public transient long count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@

public class HistogramAggregatorFactory extends AggregatorFactory
{
public static final ColumnType TYPE = ColumnType.ofComplex("histogram");
// todo: this isn't registered with serde, is it a lie? should we just report null as the type name? or register a serde?
public static final ColumnType TYPE_VISUAL = ColumnType.ofComplex("histogramVisual");

private final String name;
private final String fieldName;
private final List<Float> breaksList;
Expand Down Expand Up @@ -204,7 +200,7 @@ public byte[] getCacheKey()
@Override
public ColumnType getIntermediateType()
{
return TYPE;
return Histogram.TYPE;
}

/**
Expand All @@ -213,7 +209,7 @@ public ColumnType getIntermediateType()
@Override
public ColumnType getResultType()
{
return TYPE_VISUAL;
return HistogramVisual.TYPE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation;

import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;

import java.nio.ByteBuffer;

public class HistogramSerde extends ComplexMetricSerde
{

@Override
public String getTypeName()
{
return Histogram.TYPE.getComplexTypeName();
}

@Override
public ComplexMetricExtractor<Histogram> getExtractor()
{
return new ComplexMetricExtractor<Histogram>()
{
@Override
public Class<? extends Histogram> extractedClass()
{
return Histogram.class;
}

@Override
public Histogram extractValue(InputRow inputRow, String metricName)
{
Object rawValue = inputRow.getRaw(metricName);
if (rawValue instanceof Histogram) {
return (Histogram) rawValue;
}
if (rawValue instanceof byte[]) {
return fromBytes((byte[]) rawValue);
}
throw new ISE(
"Object is of type[%s] that can not deserialized to type[%s].",
rawValue.getClass().getName(),
Histogram.class.getName()
);
}
};
}

@Override
public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}

@Override
public ObjectStrategy<Histogram> getObjectStrategy()
{
return new ObjectStrategy<Histogram>()
{

@Override
public int compare(Histogram o1, Histogram o2)
{
return 0;
}

@Override
public Class<? extends Histogram> getClazz()
{
return Histogram.class;
}

@Override
public Histogram fromByteBuffer(ByteBuffer buffer, int numBytes)
{
return Histogram.fromBytes(buffer);
}

@Override
public byte[] toBytes(Histogram val)
{
return val.toBytes();
}
};
}

protected Histogram fromBytes(byte[] rawValue)
{
ByteBuffer bb = ByteBuffer.wrap(rawValue);
return getObjectStrategy().fromByteBuffer(bb, bb.limit());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.column.ColumnType;

import java.util.Arrays;

public class HistogramVisual
{
public static final ColumnType TYPE = ColumnType.ofComplex("histogramVisual");

@JsonProperty public final double[] breaks;
@JsonProperty
public final double[] counts;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation;

import org.apache.druid.error.DruidException;
import org.apache.druid.segment.data.ObjectStrategy;
import java.nio.ByteBuffer;

public class HistogramVisualSerde extends BasicComplexMetricSerde
{
public HistogramVisualSerde()
{
super(HistogramVisual.TYPE, new HistogramVisualSerdeObjectStrategy());
}

static class HistogramVisualSerdeObjectStrategy implements ObjectStrategy<HistogramVisual>
{

@Override
public int compare(HistogramVisual o1, HistogramVisual o2)
{
return 0;
}

@Override
public Class<? extends HistogramVisual> getClazz()
{
return HistogramVisual.class;
}

@Override
public HistogramVisual fromByteBuffer(ByteBuffer buffer, int numBytes)
{
throw DruidException.defensive("not implemeneted");
}

@Override
public byte[] toBytes(HistogramVisual val)
{
throw DruidException.defensive("not implemeneted");
}
}
}
Loading