-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Window function on msq #15470
Window function on msq #15470
Changes from 1 commit
4ef900d
5e8dab1
9ea01fc
9c4ac74
54f9ac3
d6cef47
40a18f1
3ecc96a
7e34aa8
f5a1f59
ab6e317
f1efec3
1dae450
ccfe473
500f54f
98f4ba5
ee61333
8f8bfdc
ec1f164
465598a
d490d78
836b693
aee40ba
3f3d1b0
aa1b753
634d5bc
bd5f27b
5338b76
b688755
ceff35d
fc2e2b6
751e947
d7840a3
681ee1b
b487d2a
c3baa1d
9768da2
584fa8f
0813cc8
d3755e1
c0c74a0
60c6290
91edf9e
399a78c
806c801
04424d7
f0946b1
ea7882c
83c96b9
cfca6a5
c3e2c29
1464dae
520ab4e
16b75ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,9 +27,9 @@ | |
import org.apache.druid.frame.channel.ReadableFrameChannel; | ||
import org.apache.druid.frame.channel.WritableFrameChannel; | ||
import org.apache.druid.frame.processor.FrameProcessors; | ||
import org.apache.druid.frame.processor.FrameRowTooLargeException; | ||
import org.apache.druid.frame.processor.ReturnOrAwait; | ||
import org.apache.druid.frame.read.FrameReader; | ||
import org.apache.druid.frame.util.SettableLongVirtualColumn; | ||
import org.apache.druid.frame.write.FrameWriter; | ||
import org.apache.druid.frame.write.FrameWriterFactory; | ||
import org.apache.druid.java.util.common.Unit; | ||
|
@@ -52,19 +52,15 @@ | |
import org.apache.druid.segment.ColumnSelectorFactory; | ||
import org.apache.druid.segment.ColumnValueSelector; | ||
import org.apache.druid.segment.Cursor; | ||
import org.apache.druid.segment.Segment; | ||
import org.apache.druid.segment.SegmentReference; | ||
import org.apache.druid.segment.SimpleAscendingOffset; | ||
import org.apache.druid.segment.SimpleSettableOffset; | ||
import org.apache.druid.segment.VirtualColumn; | ||
import org.apache.druid.segment.VirtualColumns; | ||
import org.apache.druid.segment.column.RowSignature; | ||
|
||
import javax.annotation.Nullable; | ||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
|
@@ -77,26 +73,17 @@ public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor | |
|
||
private final List<OperatorFactory> operatorFactoryList; | ||
private final ObjectMapper jsonMapper; | ||
private final SettableLongVirtualColumn partitionBoostVirtualColumn; | ||
private final VirtualColumns frameWriterVirtualColumns; | ||
private final Closer closer = Closer.create(); | ||
private final RowSignature outputStageSignature; | ||
private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE); | ||
|
||
private Segment segment; | ||
|
||
private final ArrayList<RowsAndColumns> frameRowsAndCols; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. frameRowsAndCols who clears this array list, I was expecting after we add stuff to the result, the frameRowsAndCols can be cleared no ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it is being cleared once the result is written There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to be cleared after we are done writing results to the frames which seems suspect. |
||
private final ArrayList<RowsAndColumns> resultRowAndCols; | ||
ArrayList<ResultRow> objectsOfASingleRac; | ||
somu-imply marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed | ||
private Cursor frameCursor = null; | ||
private Supplier<ResultRow> rowSupplierFromFrameCursor; | ||
private ResultRow outputRow = null; | ||
private FrameWriter frameWriter = null; | ||
|
||
private ArrayList<RowsAndColumns> frameRowsAndCols; | ||
|
||
private ArrayList<RowsAndColumns> resultRowAndCols; | ||
|
||
ArrayList<ResultRow> objectsOfASingleRac; | ||
|
||
public WindowOperatorQueryFrameProcessor( | ||
final WindowOperatorQuery query, | ||
final List<OperatorFactory> operatorFactoryList, | ||
|
@@ -116,21 +103,8 @@ public WindowOperatorQueryFrameProcessor( | |
); | ||
this.query = query; | ||
this.jsonMapper = jsonMapper; | ||
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); | ||
this.operatorFactoryList = operatorFactoryList; | ||
this.outputStageSignature = rowSignature; | ||
|
||
final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>(); | ||
frameWriterVirtualColumns.add(partitionBoostVirtualColumn); | ||
|
||
final VirtualColumn segmentGranularityVirtualColumn = | ||
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); | ||
|
||
if (segmentGranularityVirtualColumn != null) { | ||
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn); | ||
} | ||
|
||
this.frameWriterVirtualColumns = VirtualColumns.create(frameWriterVirtualColumns); | ||
this.frameRowsAndCols = new ArrayList<>(); | ||
this.resultRowAndCols = new ArrayList<>(); | ||
this.objectsOfASingleRac = new ArrayList<>(); | ||
|
@@ -191,7 +165,7 @@ protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChan | |
* | ||
* | ||
* Frame 2 -> rac2 | ||
* 3, 1 | ||
* 3, 1 --> key changed | ||
* 3, 2 | ||
* 3, 3 | ||
* 3, 4 | ||
|
@@ -200,8 +174,7 @@ protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChan | |
* | ||
* 3, 5 | ||
* 3, 6 | ||
* --------------- | ||
* 4, 1 | ||
* 4, 1 --> key changed | ||
* 4, 2 | ||
* | ||
* In case of empty OVER clause, all these racs need to be added to a single rows and columns | ||
|
@@ -240,7 +213,6 @@ protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChan | |
// eagerly validate presence of empty OVER() clause | ||
boolean status = checkEagerlyForEmptyWindow(operatorFactoryList); | ||
somu-imply marked this conversation as resolved.
Show resolved
Hide resolved
|
||
List<Integer> partitionColsIndex = null; | ||
//status = true; | ||
if (status) { | ||
// if OVER() found | ||
// convert each frame to rac | ||
|
@@ -250,7 +222,7 @@ protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChan | |
final Frame frame = inputChannel.read(); | ||
convertRowFrameToRowsAndColumns(frame, inputFrameReader.signature()); | ||
} else if (inputChannel.isFinished()) { | ||
runAllOpsOnRowsAndColumns(); | ||
runAllOpsOnMultipleRac(frameRowsAndCols); | ||
return ReturnOrAwait.returnObject(Unit.instance()); | ||
} else { | ||
return ReturnOrAwait.awaitAll(inputChannels().size()); | ||
|
@@ -287,12 +259,12 @@ protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChan | |
// reaached end of channel | ||
// if there is data remaining | ||
// write it into a rac | ||
// and run operators on it | ||
if (!objectsOfASingleRac.isEmpty()) { | ||
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow( | ||
objectsOfASingleRac, | ||
inputFrameReader.signature() | ||
); | ||
// run operators on this rac | ||
runAllOpsOnSingleRac(rac); | ||
objectsOfASingleRac.clear(); | ||
} | ||
|
@@ -308,15 +280,18 @@ protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChan | |
outputRow = currentRow.copy(); | ||
somu-imply marked this conversation as resolved.
Show resolved
Hide resolved
|
||
objectsOfASingleRac.add(currentRow); | ||
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) { | ||
// if they have the same partition key | ||
// keep adding them | ||
objectsOfASingleRac.add(currentRow); | ||
|
||
} else { | ||
// create the rac from the rows you have seen before | ||
// key change noted | ||
// create rac from the rows seen before | ||
// run the operators on this rows and columns | ||
// clean up the object to hold the new row only | ||
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow( | ||
objectsOfASingleRac, | ||
inputFrameReader.signature() | ||
); | ||
// run operators on this rac | ||
runAllOpsOnSingleRac(rac); | ||
objectsOfASingleRac.clear(); | ||
outputRow = currentRow.copy(); | ||
|
@@ -336,7 +311,7 @@ private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Intege | |
} else { | ||
int match = 0; | ||
for (int i : partitionIndices) { | ||
if (row1.get(i).equals(row2.get(i))) { | ||
if (Objects.equals(row1.get(i), row2.get(i))) { | ||
match++; | ||
} | ||
} | ||
|
@@ -369,19 +344,9 @@ private boolean checkEagerlyForEmptyWindow(List<OperatorFactory> operatorFactory | |
return false; | ||
} | ||
|
||
private void runAllOpsOnSingleRac(RowsAndColumns singleRac) | ||
|
||
private void runOperatorsAfterThis(Operator op) | ||
{ | ||
Operator op = new Operator() | ||
{ | ||
@Nullable | ||
@Override | ||
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver) | ||
{ | ||
receiver.push(singleRac); | ||
receiver.completed(); | ||
return null; | ||
} | ||
}; | ||
for (OperatorFactory of : operatorFactoryList) { | ||
op = of.wrap(op); | ||
} | ||
|
@@ -408,49 +373,39 @@ public void completed() | |
} | ||
} | ||
}); | ||
|
||
} | ||
|
||
private void runAllOpsOnRowsAndColumns() | ||
private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs) | ||
{ | ||
Operator op = new Operator() | ||
{ | ||
@Nullable | ||
@Override | ||
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver) | ||
{ | ||
RowsAndColumns rac = new ConcatRowsAndColumns(frameRowsAndCols); | ||
RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs); | ||
receiver.push(rac); | ||
receiver.completed(); | ||
return null; | ||
} | ||
}; | ||
for (OperatorFactory of : operatorFactoryList) { | ||
op = of.wrap(op); | ||
} | ||
Operator.go(op, new Operator.Receiver() | ||
{ | ||
@Override | ||
public Operator.Signal push(RowsAndColumns rac) | ||
{ | ||
resultRowAndCols.add(rac); | ||
return Operator.Signal.GO; | ||
} | ||
runOperatorsAfterThis(op); | ||
} | ||
|
||
private void runAllOpsOnSingleRac(RowsAndColumns singleRac) | ||
{ | ||
Operator op = new Operator() | ||
{ | ||
@Nullable | ||
@Override | ||
public void completed() | ||
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver) | ||
{ | ||
try { | ||
flushAllRowsAndCols(resultRowAndCols); | ||
} | ||
catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
finally { | ||
resultRowAndCols.clear(); | ||
} | ||
receiver.push(singleRac); | ||
receiver.completed(); | ||
return null; | ||
} | ||
}); | ||
}; | ||
runOperatorsAfterThis(op); | ||
} | ||
|
||
private void convertRowFrameToRowsAndColumns(Frame frame, RowSignature signature) | ||
|
@@ -470,47 +425,29 @@ private void convertRowFrameToRowsAndColumns(Frame frame, RowSignature signature | |
|
||
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException | ||
{ | ||
// add bells and whistles on framewriter row size etc | ||
RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols); | ||
rac.getColumnNames(); | ||
AtomicInteger rowId = new AtomicInteger(0); | ||
createFrameWriterIfNeeded(rac, rowId); | ||
materializeRacToRowFrames(rac, outputStageSignature, rowId); | ||
try { | ||
flushFrameWriter(); | ||
} | ||
catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
writeRacToFrame(rac, outputStageSignature, rowId); | ||
} | ||
|
||
public void materializeRacToRowFrames(RowsAndColumns rac, RowSignature outputSignature, AtomicInteger rowId) | ||
public void writeRacToFrame(RowsAndColumns rac, RowSignature outputSignature, AtomicInteger rowId) throws IOException | ||
|
||
{ | ||
final int numRows = rac.numRows(); | ||
|
||
|
||
// 3 rac | ||
/** | ||
* r1 r2 r3 | ||
* m1, m2, m3 | ||
* 1, 2, 3 | ||
* 1, 4, 5 | ||
* 2, 2, 3 | ||
* 2, 4, 5 | ||
* 3, 2, 3 | ||
* 3, 4, 5 | ||
* | ||
* 3 rac | ||
* | ||
* each rac1, rac2, rac3 2 rows each | ||
* 1, 2, 3, | ||
* 1, 4, 5 | ||
* | ||
*/ | ||
rowId.set(0); | ||
for (; rowId.get() < numRows; rowId.incrementAndGet()) { | ||
frameWriter.addSelection(); | ||
while (rowId.get() < numRows) { | ||
final boolean didAddToFrame = frameWriter.addSelection(); | ||
if (didAddToFrame) { | ||
rowId.incrementAndGet(); | ||
} else if (frameWriter.getNumRows() == 0) { | ||
throw new FrameRowTooLargeException(currentAllocatorCapacity); | ||
} else { | ||
flushFrameWriter(); | ||
return; | ||
} | ||
} | ||
flushFrameWriter(); | ||
} | ||
|
||
private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId) | ||
|
@@ -524,6 +461,14 @@ private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId) | |
} | ||
} | ||
|
||
@Override | ||
public void cleanup() throws IOException | ||
{ | ||
closer.register(frameWriter); | ||
closer.register(super::cleanup); | ||
closer.close(); | ||
} | ||
|
||
private long flushFrameWriter() throws IOException | ||
{ | ||
if (frameWriter != null && frameWriter.getNumRows() > 0) { | ||
|
@@ -537,7 +482,6 @@ private long flushFrameWriter() throws IOException | |
frameWriter.close(); | ||
frameWriter = null; | ||
} | ||
|
||
return 0; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the object mapper be declared static ?