Skip to content

Commit

Permalink
Add query context parameter to remove null bytes when writing frames (#…
Browse files Browse the repository at this point in the history
…16579)

MSQ cannot process null bytes in string fields, and the current workaround is to remove them using the REPLACE function. 'removeNullBytes' context parameter has been added which sanitizes the input string fields by removing these null bytes.
  • Loading branch information
LakshSingla committed Jun 26, 2024
1 parent d9bd022 commit 71b3b5a
Show file tree
Hide file tree
Showing 49 changed files with 470 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,11 @@ public void mergeChannels(Blackhole blackhole)
channels.stream().map(BlockingQueueFrameChannel::readable).collect(Collectors.toList()),
frameReader,
outputChannel.writable(),
FrameWriters.makeFrameWriterFactory(
FrameType.ROW_BASED,
FrameWriters.makeRowBasedFrameWriterFactory(
new ArenaMemoryAllocatorFactory(1_000_000),
signature,
sortKey
sortKey,
false
),
sortKey,
null,
Expand Down
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ The following table lists the context parameters for the MSQ task engine:
| `skipTypeVerification` | INSERT or REPLACE<br /><br />During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.<br /><br />Provide the column list as comma-separated values or as a JSON array in string form.| empty list |
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |
| `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` |
| `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` |

## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2914,7 +2914,8 @@ private void startQueryResultsReader()
inputChannelFactory,
() -> ArenaMemoryAllocator.createOnHeap(5_000_000),
resultReaderExec,
cancellationId
cancellationId,
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context())
);

resultsChannel = ReadableConcatFrameChannel.open(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
Expand Down Expand Up @@ -184,6 +183,8 @@ public class WorkerImpl implements Worker
private final ByteTracker intermediateSuperSorterLocalStorageTracker;
private final boolean durableStageStorageEnabled;
private final WorkerStorageParameters workerStorageParameters;
private final boolean isRemoveNullBytes;

/**
* Only set for select jobs.
*/
Expand Down Expand Up @@ -229,6 +230,7 @@ public WorkerImpl(MSQWorkerTask task, WorkerContext context, WorkerStorageParame
QueryContext queryContext = QueryContext.of(task.getContext());
this.durableStageStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(queryContext);
this.selectDestination = MultiStageQueryContext.getSelectDestinationOrNull(queryContext);
this.isRemoveNullBytes = MultiStageQueryContext.removeNullBytes(queryContext);
this.workerStorageParameters = workerStorageParameters;

long maxBytes = workerStorageParameters.isIntermediateStorageLimitConfigured()
Expand Down Expand Up @@ -1112,7 +1114,8 @@ private void makeInputSliceReader()
inputChannelFactory,
() -> ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()),
exec,
cancellationId
cancellationId,
MultiStageQueryContext.removeNullBytes(QueryContext.of(task.getContext()))
);

inputSliceReader = new MapInputSliceReader(
Expand Down Expand Up @@ -1206,7 +1209,8 @@ private <FactoryType extends FrameProcessorFactory<ProcessorReturnType, ManagerR
frameContext,
parallelism,
counterTracker,
e -> warningPublisher.publishException(kernel.getStageDefinition().getStageNumber(), e)
e -> warningPublisher.publishException(kernel.getStageDefinition().getStageNumber(), e),
isRemoveNullBytes
);

final ProcessorManager<ProcessorReturnType, ManagerReturnType> processorManager = processors.getProcessorManager();
Expand Down Expand Up @@ -1543,7 +1547,8 @@ public void globalSort(
memoryParameters.getSuperSorterMaxChannelsPerProcessor(),
-1,
cancellationId,
counterTracker.sortProgress()
counterTracker.sortProgress(),
isRemoveNullBytes
);

return FutureUtils.transform(
Expand Down Expand Up @@ -1575,11 +1580,11 @@ public void hashPartition(final OutputChannelFactory outputChannelFactory)
outputChannels.stream().map(OutputChannel::getWritableChannel).collect(Collectors.toList()),
kernel.getStageDefinition().getFrameReader(),
kernel.getStageDefinition().getClusterBy().getColumns().size(),
FrameWriters.makeFrameWriterFactory(
FrameType.ROW_BASED,
FrameWriters.makeRowBasedFrameWriterFactory(
new ArenaMemoryAllocatorFactory(frameContext.memoryParameters().getStandardFrameSize()),
kernel.getStageDefinition().getSignature(),
kernel.getStageDefinition().getSortKey()
kernel.getStageDefinition().getSortKey(),
isRemoveNullBytes
)
);

Expand Down Expand Up @@ -1672,7 +1677,8 @@ public OutputChannel openNilChannel(int expectedZero)
// Tracker is not actually tracked, since it doesn't quite fit into the way we report counters.
// There's a single SuperSorterProgressTrackerCounter per worker, but workers that do local
// sorting have a SuperSorter per partition.
new SuperSorterProgressTracker()
new SuperSorterProgressTracker(),
isRemoveNullBytes
);

return FutureUtils.transform(sorter.run(), r -> Iterables.getOnlyElement(r.getAllChannels()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ public static Map<String, Object> makeTaskContext(
{
final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder();
final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context());
final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context());

taskContextOverridesBuilder
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, queryKernelConfig.isDurableStorage())
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages());
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages())
.put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes);

// Put the lookup loading info in the task context to facilitate selective loading of lookups.
if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.msq.indexing;

import com.google.common.collect.Iterables;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
Expand Down Expand Up @@ -58,14 +57,16 @@ public class InputChannelsImpl implements InputChannels
private final FrameProcessorExecutor exec;
private final String cancellationId;
private final Map<StagePartition, ReadablePartition> readablePartitionMap;
private final boolean removeNullBytes;

public InputChannelsImpl(
final QueryDefinition queryDefinition,
final ReadablePartitions readablePartitions,
final InputChannelFactory channelFactory,
final Supplier<MemoryAllocator> allocatorMaker,
final FrameProcessorExecutor exec,
final String cancellationId
final String cancellationId,
final boolean removeNullBytes
)
{
this.queryDefinition = queryDefinition;
Expand All @@ -74,6 +75,7 @@ public InputChannelsImpl(
this.allocatorMaker = allocatorMaker;
this.exec = exec;
this.cancellationId = cancellationId;
this.removeNullBytes = removeNullBytes;

for (final ReadablePartition readablePartition : readablePartitions) {
readablePartitionMap.put(
Expand Down Expand Up @@ -128,13 +130,13 @@ private ReadableFrameChannel openSorted(
channels,
stageDefinition.getFrameReader(),
queueChannel.writable(),
FrameWriters.makeFrameWriterFactory(
FrameType.ROW_BASED,
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(allocatorMaker.get()),
stageDefinition.getFrameReader().signature(),

// No sortColumns, because FrameChannelMerger generates frames that are sorted all on its own
Collections.emptyList()
Collections.emptyList(),
removeNullBytes
),
stageDefinition.getSortKey(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public InvalidNullByteFault(
super(
CODE,
"Invalid null byte at source[%s], rowNumber[%d], column[%s], value[%s], position[%d]. "
+ "Consider sanitizing the input string column using REPLACE(\"%s\", U&'\\0000', '') AS %s",
+ "Consider sanitizing the input string column using \"REPLACE(\"%s\", U&'\\0000', '') AS %s\" or setting 'removeNullBytes' "
+ "to true in the query context.",
source,
rowNumber,
column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public ProcessorsAndChannels<DataSegment, Set<DataSegment>> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
)
{
if (extra == null || extra.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ ProcessorsAndChannels<T, R> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
) throws IOException;

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.ints.IntSets;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
Expand Down Expand Up @@ -355,17 +354,17 @@ public ClusterByStatisticsCollector createResultKeyStatisticsCollector(final int
*
* Calls {@link MemoryAllocatorFactory#newAllocator()} for each frame.
*/
public FrameWriterFactory createFrameWriterFactory(final MemoryAllocatorFactory memoryAllocatorFactory)
public FrameWriterFactory createFrameWriterFactory(final MemoryAllocatorFactory memoryAllocatorFactory, final boolean removeNullBytes)
{
return FrameWriters.makeFrameWriterFactory(
FrameType.ROW_BASED,
return FrameWriters.makeRowBasedFrameWriterFactory(
memoryAllocatorFactory,
signature,

// Main processor does not sort when there is a hash going on, even if isSort = true. This is because
// FrameChannelHashPartitioner is expected to be attached to the processor and do the sorting. We don't
// want to double-sort.
doesShuffle() && !shuffleSpec.kind().isHash() ? getClusterBy().getColumns() : Collections.emptyList()
doesShuffle() && !shuffleSpec.kind().isHash() ? getClusterBy().getColumns() : Collections.emptyList(),
removeNullBytes
);
}

Expand All @@ -374,9 +373,9 @@ public FrameWriterFactory createFrameWriterFactory(final MemoryAllocatorFactory
*
* Re-uses the same {@link MemoryAllocator} for each frame.
*/
public FrameWriterFactory createFrameWriterFactory(final MemoryAllocator allocator)
public FrameWriterFactory createFrameWriterFactory(final MemoryAllocator allocator, final boolean removeNullBytes)
{
return createFrameWriterFactory(new SingleMemoryAllocatorFactory(allocator));
return createFrameWriterFactory(new SingleMemoryAllocatorFactory(allocator), removeNullBytes);
}

public FrameReader getFrameReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
final boolean removeNullBytes
) throws IOException
{
// BaseLeafFrameProcessorFactory is used for native Druid queries, where the following input cases can happen:
Expand Down Expand Up @@ -125,7 +126,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
final OutputChannel outputChannel = outputChannelFactory.openChannel(0 /* Partition number doesn't matter */);
outputChannels.add(outputChannel);
channelQueue.add(outputChannel.getWritableChannel());
frameWriterFactoryQueue.add(stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator()));
frameWriterFactoryQueue.add(stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
final boolean removeNullBytes
)
{
// Expecting a single input slice from some prior stage.
Expand Down Expand Up @@ -152,7 +153,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
query,
readableInput.getChannel(),
outputChannel.getWritableChannel(),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator()),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),
readableInput.getChannelFrameReader(),
frameContext.jsonMapper(),
operatorList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
) throws IOException
{
if (workerNumber > 0) {
Expand Down Expand Up @@ -126,7 +127,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
ReadableConcatFrameChannel.open(Iterators.transform(readableInputs.iterator(), ReadableInput::getChannel)),
outputChannel.getWritableChannel(),
readableInputs.frameReader(),
stageDefinition.createFrameWriterFactory(HeapMemoryAllocator.unlimited()),
stageDefinition.createFrameWriterFactory(HeapMemoryAllocator.unlimited(), removeNullBytes),
offset,
// Limit processor will add limit + offset at various points; must avoid overflow
limit == null ? Long.MAX_VALUE - offset : limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
) throws IOException
{
if (inputSlices.size() != 2 || !inputSlices.stream().allMatch(slice -> slice instanceof StageInputSlice)) {
Expand Down Expand Up @@ -180,7 +181,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
readableInputs.get(LEFT),
readableInputs.get(RIGHT),
outputChannel.getWritableChannel(),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator()),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),
rightPrefix,
keyColumns,
requiredNonNullKeyParts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
)
{
// Expecting a single input slice from some prior stage.
Expand Down Expand Up @@ -116,7 +117,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
engine,
readableInput.getChannel(),
outputChannel.getWritableChannel(),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator()),
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),
readableInput.getChannelFrameReader(),
frameContext.jsonMapper()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public ProcessorsAndChannels<Object, Object> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
)
{
final StageInputSlice slice = (StageInputSlice) CollectionUtils.getOnlyElement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
FrameContext frameContext,
int maxOutstandingProcessors,
CounterTracker counters,
Consumer<Throwable> warningPublisher
Consumer<Throwable> warningPublisher,
boolean removeNullBytes
)
{
// Expecting a single input slice from some prior stage.
Expand Down
Loading

0 comments on commit 71b3b5a

Please sign in to comment.