Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into fix-finalize-field-…
Browse files Browse the repository at this point in the history
…postagg
  • Loading branch information
kgyrtkirk committed Jun 10, 2024
2 parents f0a1112 + 8e11adf commit 99d2977
Show file tree
Hide file tree
Showing 329 changed files with 50,715 additions and 5,614 deletions.
6 changes: 6 additions & 0 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-multi-stage-query</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ public Optional<LookupExtractorFactoryContainer> get(String lookupName)
return Optional.empty();
}
}

@Override
public String getCanonicalLookupName(String lookupName)
{
return lookupName;
}
}
)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.benchmark;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.key.KeyTestUtils;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
public class MsqSketchesBenchmark extends InitializedNullHandlingTest
{
private static final int MAX_BYTES = 1_000_000_000;
private static final int MAX_BUCKETS = 10_000;

private static final RowSignature SIGNATURE = RowSignature.builder()
.add("x", ColumnType.LONG)
.add("y", ColumnType.LONG)
.add("z", ColumnType.STRING)
.build();

private static final ClusterBy CLUSTER_BY_XYZ_BUCKET_BY_X = new ClusterBy(
ImmutableList.of(
new KeyColumn("x", KeyOrder.ASCENDING),
new KeyColumn("y", KeyOrder.ASCENDING),
new KeyColumn("z", KeyOrder.ASCENDING)
),
1
);

@Param({"1", "1000"})
private long numBuckets;

@Param({"100000", "1000000"})
private long numRows;

@Param({"true", "false"})
private boolean aggregate;

private ObjectMapper jsonMapper;
private ClusterByStatisticsSnapshot snapshot;

@Setup(Level.Trial)
public void setup()
{
jsonMapper = TestHelper.makeJsonMapper();
final Iterable<RowKey> keys = () ->
LongStream.range(0, numRows)
.mapToObj(n -> createKey(numBuckets, n))
.iterator();

ClusterByStatisticsCollectorImpl collector = makeCollector(aggregate);
keys.forEach(k -> collector.add(k, 1));
snapshot = collector.snapshot();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void benchmarkJacksonSketch(Blackhole blackhole) throws IOException
{
final byte[] serializedSnapshot = jsonMapper.writeValueAsBytes(snapshot);

final ClusterByStatisticsSnapshot deserializedSnapshot = jsonMapper.readValue(
serializedSnapshot,
ClusterByStatisticsSnapshot.class
);
blackhole.consume(deserializedSnapshot);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void benchmarkOctetSketch(Blackhole blackhole) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ClusterByStatisticsSnapshotSerde.serialize(byteArrayOutputStream, snapshot);
final ByteBuffer serializedSnapshot = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
final ClusterByStatisticsSnapshot deserializedSnapshot = ClusterByStatisticsSnapshotSerde.deserialize(serializedSnapshot);
blackhole.consume(deserializedSnapshot);
}

private ClusterByStatisticsCollectorImpl makeCollector(final boolean aggregate)
{
return (ClusterByStatisticsCollectorImpl) ClusterByStatisticsCollectorImpl.create(MsqSketchesBenchmark.CLUSTER_BY_XYZ_BUCKET_BY_X, SIGNATURE, MAX_BYTES, MAX_BUCKETS, aggregate, false);
}

private static RowKey createKey(final long numBuckets, final long keyNo)
{
final Object[] key = new Object[3];
key[0] = keyNo % numBuckets;
key[1] = keyNo % 5;
key[2] = StringUtils.repeat("*", 67);
return KeyTestUtils.createKey(KeyTestUtils.createKeySignature(MsqSketchesBenchmark.CLUSTER_BY_XYZ_BUCKET_BY_X.getColumns(), SIGNATURE), key);
}
}
70 changes: 14 additions & 56 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -429,10 +426,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -537,10 +531,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -735,10 +726,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -973,10 +961,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -1171,10 +1156,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -1610,10 +1592,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -1808,10 +1787,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -1978,10 +1954,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -2176,10 +2149,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -2534,10 +2504,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -2732,10 +2699,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -2958,10 +2922,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down Expand Up @@ -3156,10 +3117,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"ioConfig": {
"topic": "social_media",
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"replicas": 1,
"taskCount": 1,
Expand Down
5 changes: 1 addition & 4 deletions docs/api-reference/tasks-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"filter": "wikiticker-2015-09-12-sampled.json.gz"
},
"inputFormat": {
"type": "json",
"keepNullColumns": false,
"assumeNewlineDelimited": false,
"useJsonNodeReader": false
"type": "json"
},
"appendToExisting": false,
"dropExisting": false
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ Core extensions are maintained by Druid committers.
|druid-google-extensions|Google Cloud Storage deep storage.|[link](../development/extensions-core/google.md)|
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.md)|
|druid-histogram|Approximate histograms and quantiles aggregator. Deprecated, please use the [DataSketches quantiles aggregator](../development/extensions-core/datasketches-quantiles.md) from the `druid-datasketches` extension instead.|[link](../development/extensions-core/approximate-histograms.md)|
|druid-kafka-extraction-namespace|Apache Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.md)|
|druid-kafka-extraction-namespace|Apache Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../querying/kafka-extraction-namespace.md)|
|druid-kafka-indexing-service|Supervised exactly-once Apache Kafka ingestion for the indexing service.|[link](../ingestion/kafka-ingestion.md)|
|druid-kinesis-indexing-service|Supervised exactly-once Kinesis ingestion for the indexing service.|[link](../ingestion/kinesis-ingestion.md)|
|druid-kerberos|Kerberos authentication for druid processes.|[link](../development/extensions-core/druid-kerberos.md)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.md) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.md)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.md) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../querying/lookups-cached-global.md)|
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.md)|
|druid-multi-stage-query| Support for the multi-stage query architecture for Apache Druid and the multi-stage query task engine.|[link](../multi-stage-query/index.md)|
|druid-orc-extensions|Support for data in Apache ORC data format.|[link](../development/extensions-core/orc.md)|
Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.JvmThreadsMonitor`|Reports Thread statistics in the JVM, like numbers of total, daemon, started, died threads.|
|`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
Expand Down Expand Up @@ -626,7 +627,7 @@ the [HTTP input source](../ingestion/input-sources.md#http-input-source).

You can use the following properties to specify permissible JDBC options for:
- [SQL input source](../ingestion/input-sources.md#sql-input-source)
- [globally cached JDBC lookups](../development/extensions-core/lookups-cached-global.md#jdbc-lookup)
- [globally cached JDBC lookups](../querying/lookups-cached-global.md#jdbc-lookup)
- [JDBC Data Fetcher for per-lookup caching](../development/extensions-core/druid-lookups.md#data-fetcher-layer).

These properties do not apply to metadata storage connections.
Expand Down Expand Up @@ -1125,6 +1126,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself (for example, after a widespread network issue).|`PT1M`|
|`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue management throws an exception before trying again.|`PT30S`|
|`druid.indexer.queue.storageSyncRate`|Sync Overlord state this often with an underlying task persistence mechanism.|`PT1M`|
|`druid.indexer.queue.maxTaskPayloadSize`|Maximum allowed size in bytes of a single task payload accepted by the Overlord.|none (allow all task payload sizes)|

The following configs only apply if the Overlord is running in remote mode. For a description of local vs. remote mode, see [Overlord service](../design/overlord.md).

Expand Down
Loading

0 comments on commit 99d2977

Please sign in to comment.