Skip to content

Commit

Permalink
fix(plugin): fix XMLFileInputReader should read text node attributes (#…
Browse files Browse the repository at this point in the history
…74)

Resolves: #74
  • Loading branch information
fhussonnois committed Sep 10, 2020
1 parent c83eb37 commit 99288ff
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public boolean hasNext() {
*/
private static class Node2StructConverter {

private static final String DEFAULT_TEXT_NODE_FIELD_NAME = "value";

/**
* Converts the given {@link Node} object tree into a new new {@link TypedStruct} instance.
*
Expand All @@ -212,11 +214,11 @@ private static TypedStruct convertNodeObjectTree(final Node node, final List<Str
Objects.requireNonNull(node, "node cannot be null");

TypedStruct container = TypedStruct.create();
readAllNodeAttributes(node, container);
addAllNodeAttributes(container, node.getAttributes());
for (Node child = node.getFirstChild(); child != null; child = child.getNextSibling()) {
Optional<?> optional = readNodeObject(child, forceArrayFields);
if (optional.isPresent()) {
final Object nodeValue = optional.get();
Object nodeValue = optional.get();
final String nodeName = isTextNode(child) ? determineNodeName(node) : determineNodeName(child);
final boolean isArray = forceArrayFields.contains(nodeName);
container = enrichStructWithObject(container, nodeName, nodeValue, isArray);
Expand Down Expand Up @@ -259,20 +261,32 @@ private static Optional<?> readNodeObject(final Node node, final List<String> fo
}

if (isTextNode(node)) {
return Optional.of(node.getNodeValue());
return readTextNode(node, node.getNodeValue());
}

if (isElementNode(node)) {
Optional<String> childTextContent = peekChildNodeTextContent(node);
if (childTextContent.isPresent()) {
return childTextContent;
return readTextNode(node, childTextContent.get());
} else {
return Optional.of(convertNodeObjectTree(node, forceArrayFields));
}
}
throw new ReaderException("Unsupported node type '" + node.getNodeType() + "'");
}

private static Optional<?> readTextNode(final Node node,
final String text) {
final NamedNodeMap attributes = node.getAttributes();
if (attributes != null && attributes.getLength() > 0) {
final TypedStruct container = TypedStruct.create();
addAllNodeAttributes(container, attributes);
container.put(DEFAULT_TEXT_NODE_FIELD_NAME, text);
return Optional.of(container);
}
return Optional.of(text);
}

private static Optional<String> peekChildNodeTextContent(final Node node) {
if (!node.hasChildNodes()) return Optional.empty();

Expand Down Expand Up @@ -313,15 +327,15 @@ private static boolean isNodeOfType(final Node node, int textNode) {
return node.getNodeType() == textNode;
}

private static void readAllNodeAttributes(final Node node, final TypedStruct values) {
final NamedNodeMap attributes = node.getAttributes();
private static void addAllNodeAttributes(final TypedStruct struct,
final NamedNodeMap attributes) {
if (attributes == null) return;

for (int i = 0; i < attributes.getLength(); i++) {
Node attr = attributes.item(i);
String attrName = determineNodeName(attr);
if (isNotXmlNamespace(attr)) {
values.put(attrName, attr.getNodeValue());
struct.put(attrName, attr.getNodeValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
Expand Down Expand Up @@ -187,6 +186,20 @@ public void should_read_record_given_valid_force_array_fields() {
Assert.assertEquals(1, records.get(0).value().get("topicPartition").getArray().size());
}

@Test
public void should_read_record_given_single_text_node_with_attrs() throws IOException {
try(XMLFileInputReader reader = createNewXMLFileInputReader(TEXT_NODE_TEST_XML_DOCUMENT)) {
reader.configure(new HashMap<String, String>());
FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(context);
List<FileRecord<TypedStruct>> records = new ArrayList<>();
iterator.forEachRemaining(r -> records.addAll(r.collect()));

Assert.assertEquals(1, records.size());
Assert.assertEquals("dummy text", records.get(0).value().find("ROOT.value").getString());
Assert.assertEquals("dummy attr", records.get(0).value().find("ROOT.attr").getString());
}
}


private XMLFileInputReader createNewXMLFileInputReader(final String xmlDocument) throws IOException {
File file = testFolder.newFile();
Expand All @@ -213,6 +226,8 @@ private static void assertTopicPartitionObject(final TypedStruct struct,
Assert.assertEquals("1", topicPartition.getString("numSegments"));
}

private static final String TEXT_NODE_TEST_XML_DOCUMENT = "<ROOT attr=\"dummy attr\">dummy text</ROOT>";

private static final String COMMENT_TEST_XML_DOCUMENT = "<ROOT><!-- This is a comment -->dummy text</ROOT>";

private static final String CDATA_TEST_XML_DOCUMENT = "<ROOT>\n\t<![CDATA[dummy text]]>\n</ROOT>";
Expand Down

0 comments on commit 99288ff

Please sign in to comment.