From 00c96432afc378fb68aa7ebe9cd115f88c350c5b Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Sun, 23 Jun 2024 23:15:45 +0530 Subject: [PATCH] Materialize scan results correctly when columns are not present in the segments (#16619) Fixes a bug causing maxSubqueryBytes not to work when segments have missing columns. --- .../scan/ScanResultValueFramesIterable.java | 120 ++++++++--- .../ScanResultValueFramesIterableTest.java | 192 +++++++++++++++-- .../sql/calcite/BaseCalciteQueryTest.java | 2 + .../sql/calcite/CalciteSubqueryTest.java | 202 ++++++++++++++++++ 4 files changed, 466 insertions(+), 50 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java index 42f57628461b..2f3b988d34aa 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java @@ -22,6 +22,9 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; @@ -36,6 +39,7 @@ import org.apache.druid.query.FrameSignaturePair; import org.apache.druid.query.IterableRowsCursorHelper; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import java.io.Closeable; @@ -157,17 +161,41 @@ private static class ScanResultValueFramesIterator implements Iterator currentRows = null; + /** + * Row index pointing to the current row in {@link #currentRows}. This is the exact same row that the {@link #currentCursor} + * is also pointing at. Therefore {@link #currentRows} + {@link #currentCursor} represent the same information as presented + * by {@link #currentCursor}. + */ + int currentRowIndex = -1; + + /** + * Full row signature of the ScanResultValue, used to extract the rows out of it. + */ + RowSignature currentInputRowSignature = null; + + /** + * Row signature of the ScanResultValue, with columns having unknown (null) types trimmed out. This is used to write + * the rows onto the frame. There's an implicit assumption (that we verify), that columns with null typed only + * contain null values, because the underlying segment didn't have the column. + */ + RowSignature currentOutputRowSignature = null; + + /** + * Columns of the currentRows with missing type information. As we materialize the rows onto the frames, we also + * verify that these columns only contain null values. + */ + IntList nullTypedColumns = null; public ScanResultValueFramesIterator( - Sequence resultSequence, - MemoryAllocatorFactory memoryAllocatorFactory, - boolean useNestedForUnknownTypes, - RowSignature defaultRowSignature, - Function> resultFormatMapper + final Sequence resultSequence, + final MemoryAllocatorFactory memoryAllocatorFactory, + final boolean useNestedForUnknownTypes, + final RowSignature defaultRowSignature, + final Function> resultFormatMapper ) { this.memoryAllocatorFactory = memoryAllocatorFactory; @@ -200,26 +228,35 @@ public FrameSignaturePair next() // start all the processing populateCursor(); boolean firstRowWritten = false; - // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature - // with which we have written the frames - final RowSignature writtenSignature = currentRowSignature; - FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( + + final FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( FrameType.COLUMNAR, memoryAllocatorFactory, - currentRowSignature, + currentOutputRowSignature, Collections.emptyList() ); - Frame frame; - try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory( - () -> currentCursor, - currentRowSignature - ))) { + final Frame frame; + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter( + new SettableCursorColumnSelectorFactory(() -> currentCursor, currentInputRowSignature))) { while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full break; } + + // Check that the columns with the null types are actually null before advancing + final Object[] currentRow = currentRows.get(currentRowIndex); + for (Integer columnNumber : nullTypedColumns) { + if (currentRow[columnNumber] != null) { + throw DruidException.defensive( + "Expected a null value for column [%s]", + frameWriterFactory.signature().getColumnName(columnNumber) + ); + } + } + firstRowWritten = true; currentCursor.advance(); + currentRowIndex++; } if (!firstRowWritten) { @@ -228,7 +265,9 @@ public FrameSignaturePair next() frame = Frame.wrap(frameWriter.toByteArray()); } - return new FrameSignaturePair(frame, writtenSignature); + // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore, we store the signature + // with which we have written the frames + return new FrameSignaturePair(frame, frameWriterFactory.signature()); } /** @@ -244,7 +283,7 @@ private boolean done() /** * This is the most important method of this iterator. This determines if two consecutive scan result values can - * be batched or not, populates the value of the {@link #currentCursor} and {@link #currentRowSignature}, + * be batched or not, populates the value of the {@link #currentCursor} and {@link #currentInputRowSignature}, * during the course of the iterator, and facilitates the {@link #next()} *

* Multiple calls to populateCursor, without advancing the {@link #currentCursor} is idempotent. This allows successive @@ -257,7 +296,9 @@ private boolean done() * if (hasNext()) was true before calling the method - * 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points * to the next row present in the sequence of the scan result values. This row would get materialized to frame - * 2. {@link #currentRowSignature} - Row signature of the row. + * 2. {@link #currentInputRowSignature} - Row signature of the row + * 3. {@link #currentRows} - Points to the group of rows underlying the currentCursor + * 4. {@link #currentRowIndex} - Reset to 0 if we modified the cursor, else untouched *

* Return value - * if (hasNext()) is false before calling the method - returns false @@ -275,25 +316,42 @@ private boolean populateCursor() // At this point, we know that we need to move to the next non-empty cursor, AND it exists, because // done() is not false - ScanResultValue scanResultValue = resultSequenceIterator.next(); + final ScanResultValue scanResultValue = resultSequenceIterator.next(); + final RowSignature rowSignature = scanResultValue.getRowSignature() != null ? scanResultValue.getRowSignature() : defaultRowSignature; - RowSignature modifiedRowSignature = useNestedForUnknownTypes + + final RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; - // currentRowSignature at this time points to the previous row's signature - final boolean compatible = modifiedRowSignature != null - && modifiedRowSignature.equals(currentRowSignature); + final IntList currentNullTypedColumns = new IntArrayList(); + final RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder(); + + for (int i = 0; i < modifiedRowSignature.size(); ++i) { + ColumnType columnType = modifiedRowSignature.getColumnType(i).orElse(null); + if (columnType == null) { + currentNullTypedColumns.add(i); + } else { + modifiedTrimmedRowSignatureBuilder.add(modifiedRowSignature.getColumnName(i), columnType); + } + } + + final RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build(); + + // currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature + // because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can + // write both the rows onto the same frame + final boolean compatible = modifiedTrimmedRowSignature.equals(currentOutputRowSignature); final List rows = (List) scanResultValue.getEvents(); - final Iterable formattedRows = Lists.newArrayList(Iterables.transform( + final List formattedRows = Lists.newArrayList(Iterables.transform( rows, (Function) resultFormatMapper.apply(modifiedRowSignature) )); - Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable( + final Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable( formattedRows, modifiedRowSignature ); @@ -306,7 +364,13 @@ private boolean populateCursor() return populateCursor(); } - currentRowSignature = modifiedRowSignature; + currentInputRowSignature = modifiedRowSignature; + currentOutputRowSignature = modifiedTrimmedRowSignature; + nullTypedColumns = currentNullTypedColumns; + currentRows = formattedRows; + currentRowIndex = 0; + + return compatible; } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java index bdd64c1c8bd8..8ffaa45de797 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java @@ -21,7 +21,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameSignaturePair; @@ -55,6 +57,18 @@ public class ScanResultValueFramesIterableTest extends InitializedNullHandlingTe .add("col2", ColumnType.LONG) .build(); + private static final RowSignature SIGNATURE3 = RowSignature.builder() + .add("col1", ColumnType.DOUBLE) + .add("col2", ColumnType.LONG) + .add("col3", null) + .build(); + + private static final RowSignature SIGNATURE4 = RowSignature.builder() + .add("col1", ColumnType.DOUBLE) + .add("col3", null) + .add("col2", ColumnType.LONG) + .build(); + @Test public void testEmptySequence() @@ -97,10 +111,10 @@ public void testBatchingWithHomogenousScanResultValues() Assert.assertEquals(1, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D}, - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D}, + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), new FrameBasedInlineDataSource(frames, SIGNATURE1).getRowsAsSequence() ); @@ -153,10 +167,10 @@ public void testBatchingWithHomogenousAndEmptyScanResultValues() Assert.assertEquals(1, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D}, - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D}, + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), new FrameBasedInlineDataSource(frames, SIGNATURE1).getRowsAsSequence() ); @@ -177,17 +191,43 @@ public void testBatchingWithHeterogenousScanResultValues() Assert.assertEquals(2, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() ); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{3.0D, 3L}, - new Object[]{4.0D, 4L} + new Object[]{2000.0D, 2100L}, + new Object[]{2001.0D, 2101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() + ); + } + + @Test + public void testBatchingWithHeterogenousScanResultValuesAndNullTypes() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(2), + scanResultValue3(2) + ) + ); + Assert.assertEquals(2, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} + ), + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{3000.0D, 3100L}, + new Object[]{3001.0D, 3101L} + ), + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() ); } @@ -208,20 +248,85 @@ public void testBatchingWithHeterogenousAndEmptyScanResultValues() Assert.assertEquals(2, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} + ), + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{2000.0D, 2100L}, + new Object[]{2001.0D, 2101L} + ), + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() + ); + } + + @Test + public void testBatchingWithHeterogenousAndEmptyScanResultValuesAndNullTypes() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(0), + scanResultValue2(0), + scanResultValue1(2), + scanResultValue1(0), + scanResultValue2(2), + scanResultValue2(0), + scanResultValue2(0) + ) + ); + Assert.assertEquals(2, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} + ), + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{2000.0D, 2100L}, + new Object[]{2001.0D, 2101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() ); + } + + @Test + public void testBatchingWithDifferentRowSignaturesButSameTrimmedRowSignature() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue3(0), + scanResultValue4(0), + scanResultValue3(2), + scanResultValue3(0), + scanResultValue4(2), + scanResultValue4(0), + scanResultValue3(0) + ) + ); + Assert.assertEquals(1, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{3.0D, 3L}, - new Object[]{4.0D, 4L} + new Object[]{3000.0D, 3100L}, + new Object[]{3001.0D, 3101L}, + new Object[]{4000.0D, 4100L}, + new Object[]{4001.0D, 4101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames, SIGNATURE2).getRowsAsSequence() ); } + @Test + public void testExceptionThrownWithMissingType() + { + Sequence frames = Sequences.simple(createIterable(incompleteTypeScanResultValue(1))); + Assert.assertThrows(DruidException.class, frames::toList); + } + + @Test public void testSplitting() { @@ -252,7 +357,9 @@ private static ScanResultValue scanResultValue1(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col2"), - IntStream.range(1, 1 + numRows).mapToObj(i -> new Object[]{i, (double) i}).collect(Collectors.toList()), + IntStream.range(1000, 1000 + numRows) + .mapToObj(i -> new Object[]{i, (double) i + 100}) + .collect(Collectors.toList()), SIGNATURE1 ); } @@ -263,8 +370,49 @@ private static ScanResultValue scanResultValue2(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col2"), - IntStream.range(3, 3 + numRows).mapToObj(i -> new Object[]{(double) i, i}).collect(Collectors.toList()), + IntStream.range(2000, 2000 + numRows) + .mapToObj(i -> new Object[]{(double) i, i + 100}) + .collect(Collectors.toList()), SIGNATURE2 ); } + + // Signature: col1: DOUBLE, col2: LONG, col3: null + private static ScanResultValue scanResultValue3(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col2", "col3"), + IntStream.range(3000, 3000 + numRows) + .mapToObj(i -> new Object[]{(double) i, i + 100, null}) + .collect(Collectors.toList()), + SIGNATURE3 + ); + } + + // Signature: col1: DOUBLE, col3: null, col2: LONG + private static ScanResultValue scanResultValue4(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col3", "col2"), + IntStream.range(4000, 4000 + numRows) + .mapToObj(i -> new Object[]{(double) i, null, i + 100}) + .collect(Collectors.toList()), + SIGNATURE4 + ); + } + + // Contains ScanResultValue with incomplete type, and non-null row + private static ScanResultValue incompleteTypeScanResultValue(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col3", "col2"), + IntStream.range(5000, 5000 + numRows) + .mapToObj(i -> new Object[]{(double) i, i + 100, i + 200}) + .collect(Collectors.toList()), + SIGNATURE4 + ); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index dfee7d0e3a22..606710ff53b5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -256,6 +256,8 @@ public static void setupNullValues() ImmutableMap.builder() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") + // Disallows the fallback to row based limiting + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1") .build(); // Add additional context to the given context map for when the diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 808ab5c36630..6269e2a5c8cc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -20,7 +20,15 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -31,6 +39,7 @@ import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -54,12 +63,21 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.hamcrest.CoreMatchers; import org.joda.time.DateTimeZone; import org.joda.time.Period; @@ -67,12 +85,14 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -84,6 +104,7 @@ * 1. Where the memory limit is not set. The intermediate results are materialized as inline rows * 2. Where the memory limit is set. The intermediate results are materialized as frames */ +@SqlTestFrameworkConfig.ComponentSupplier(CalciteSubqueryTest.SubqueryComponentSupplier.class) public class CalciteSubqueryTest extends BaseCalciteQueryTest { public static Iterable constructorFeeder() @@ -147,6 +168,57 @@ public void testExactCountDistinctUsingSubqueryWithWhereToOuterFilter(String tes ); } + @MethodSource("constructorFeeder") + @ParameterizedTest(name = "{0}") + public void testSubqueryOnDataSourceWithMissingColumnsInSegments(String testName, Map queryContext) + { + if (!queryContext.containsKey(QueryContexts.MAX_SUBQUERY_BYTES_KEY)) { + cannotVectorize(); + } + testQuery( + "SELECT\n" + + " __time,\n" + + " col1,\n" + + " col2,\n" + + " col3,\n" + + " COUNT(*)\n" + + "FROM (SELECT * FROM dsMissingCol LIMIT 10)\n" + + "GROUP BY 1, 2, 3, 4", + queryContext, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + newScanQueryBuilder() + .dataSource("dsMissingCol") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "col1", "col2", "col3") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .limit(10) + .build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), + new DefaultDimensionSpec("col1", "d1", ColumnType.STRING), + new DefaultDimensionSpec("col2", "d2", ColumnType.STRING), + new DefaultDimensionSpec("col3", "d3", ColumnType.STRING) + ) + .setAggregatorSpecs(aggregators( + new CountAggregatorFactory("a0") + )) + .setContext(queryContext) + .build() + ), + ImmutableList.of( + new Object[]{946684800000L, "abc", NullHandling.defaultStringValue(), "def", 1L}, + new Object[]{946684800000L, "foo", "bar", NullHandling.defaultStringValue(), 1L} + ) + ); + } + @MethodSource("constructorFeeder") @ParameterizedTest(name = "{0}") public void testExactCountDistinctOfSemiJoinResult(String testName, Map queryContext) @@ -1315,4 +1387,134 @@ public void testSingleValueEmptyInnerAgg(String testName, Map qu ImmutableList.of() ); } + + public static class SubqueryComponentSupplier extends SqlTestFramework.StandardComponentSupplier + { + + private final TempDirProducer tmpDirProducer; + + public SubqueryComponentSupplier(TempDirProducer tempDirProducer) + { + super(tempDirProducer); + this.tmpDirProducer = tempDirProducer; + } + + @Override + public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( + QueryRunnerFactoryConglomerate conglomerate, + JoinableFactoryWrapper joinableFactory, + Injector injector + ) + { + SpecificSegmentsQuerySegmentWalker walker = + super.createQuerySegmentWalker(conglomerate, joinableFactory, injector); + + final String datasource1 = "dsMissingCol"; + final File tmpFolder = tempDirProducer.newTempFolder(); + + final List> rawRows1 = ImmutableList.of( + ImmutableMap.builder() + .put("t", "2000-01-01") + .put("col1", "foo") + .put("col2", "bar") + .build() + ); + final List rows1 = + rawRows1 + .stream() + .map(mapInputRow -> MapInputRowParser.parse( + new InputRowSchema( + new TimestampSpec("t", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", "col2")) + ), + null + ), + mapInputRow + )) + .collect(Collectors.toList()); + final QueryableIndex queryableIndex1 = IndexBuilder + .create() + .tmpDir(new File(tmpFolder, datasource1)) + .segmentWriteOutMediumFactory(OnHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(new IncrementalIndexSchema.Builder() + .withRollup(false) + .withDimensionsSpec( + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("col1"), + new StringDimensionSchema("col2") + ) + ) + ) + .build() + ) + .rows(rows1) + .buildMMappedIndex(); + + final List> rawRows2 = ImmutableList.of( + ImmutableMap.builder() + .put("t", "2000-01-01") + .put("col1", "abc") + .put("col3", "def") + .build() + ); + final List rows2 = + rawRows2 + .stream() + .map(mapInputRow -> MapInputRowParser.parse( + new InputRowSchema( + new TimestampSpec("t", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", "col3")) + ), + null + ), + mapInputRow + )) + .collect(Collectors.toList()); + final QueryableIndex queryableIndex2 = IndexBuilder + .create() + .tmpDir(new File(tmpFolder, datasource1)) + .segmentWriteOutMediumFactory(OnHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(new IncrementalIndexSchema.Builder() + .withRollup(false) + .withDimensionsSpec( + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("col1"), + new StringDimensionSchema("col3") + ) + ) + ) + .build() + ) + .rows(rows2) + .buildMMappedIndex(); + + walker.add( + DataSegment.builder() + .dataSource(datasource1) + .interval(Intervals.ETERNITY) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + queryableIndex1 + ); + + walker.add( + DataSegment.builder() + .dataSource(datasource1) + .interval(Intervals.ETERNITY) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .size(0) + .build(), + queryableIndex2 + ); + + return walker; + } + } }