Skip to content

Commit

Permalink
Support csv input format in Kafka ingestion with header (#16630)
Browse files Browse the repository at this point in the history
* Support ListBasedInputRow in Kafka ingestion with header
* Fix up buildBlendedEventMap
* Add new test for KafkaInputFormat with csv value and headers
* Do not use forbidden APIs
* Move utility method to TestUtils
  • Loading branch information
kfaraz committed Jun 25, 2024
1 parent 37a50e6 commit f1043d2
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -171,21 +170,8 @@ private CloseableIterator<InputRow> buildBlendedRows(
{
return valueParser.read().map(
r -> {
final MapBasedInputRow valueRow;
try {
// Return type for the value parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
valueRow = (MapBasedInputRow) r;
}
catch (ClassCastException e) {
throw new ParseException(
null,
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
);
}

final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList);
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
Expand Down Expand Up @@ -244,25 +230,18 @@ private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
}
List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
ParseException parseException = null;

for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
newRawRows.add(buildBlendedEventMap(raw, headerKeyList));
newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList));
}
for (InputRow r : rowAndValues.getInputRows()) {
MapBasedInputRow valueRow = null;
try {
valueRow = (MapBasedInputRow) r;
}
catch (ClassCastException e) {
parseException = new ParseException(
null,
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
if (r != null) {
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(
r::getRaw,
newDimensions,
headerKeyList
);
}
if (valueRow != null) {
final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
Expand All @@ -279,7 +258,7 @@ private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
);
}
}
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException);
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null);
}
);
}
Expand All @@ -302,22 +281,31 @@ private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
/**
* Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read
* from the first map and falling back to the second map if the value is not present.
*
* <p>
* This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening'
* machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is
* still in place to be lazily evaluated instead of eagerly copying.
*/
private static Map<String, Object> buildBlendedEventMap(Map<String, Object> map, Map<String, Object> fallback)
private static Map<String, Object> buildBlendedEventMap(
Function<String, Object> getRowValue,
Set<String> rowDimensions,
Map<String, Object> fallback
)
{
final Set<String> keySet = new HashSet<>(fallback.keySet());
keySet.addAll(map.keySet());
keySet.addAll(rowDimensions);

return new AbstractMap<String, Object>()
{
@Override
public Object get(Object key)
{
return map.getOrDefault((String) key, fallback.get(key));
final String skey = (String) key;
final Object val = getRowValue.apply(skey);
if (val == null) {
return fallback.get(skey);
}
return val;
}

@Override
Expand Down
Loading

0 comments on commit f1043d2

Please sign in to comment.