Skip to content

Commit

Permalink
feat(plugin): add parquet file reader
Browse files Browse the repository at this point in the history
add support for reading Parquet file

#215
  • Loading branch information
Marky110 authored and fhussonnois committed Mar 1, 2024
1 parent 13fac18 commit 89729eb
Show file tree
Hide file tree
Showing 19 changed files with 1,192 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2023 StreamThoughts.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet.ParquetFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import java.net.URI;

/**
* The {@code AliyunOSSParquetFileInputReader} can be used to created records from a parquet file loaded from Aliyun OSS.
*/
public class AliyunOSSParquetFileInputReader extends BaseAliyunOSSInputReader {

@Override
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(final URI objectURI,
final IteratorManager iteratorManager) {

try {
final FileObjectMeta metadata = storage().getObjectMetadata(objectURI);
return new ParquetFileInputIterator(
metadata,
iteratorManager,
storage().getInputStream(objectURI)
);

} catch (Exception e) {
throw new ReaderException("Failed to create ParquetFileInputIterator for: " + objectURI, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2019-2021 StreamThoughts.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet.ParquetFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import java.net.URI;

/**
* The {@code AmazonS3ParquetFileInputReader} can be used to created records from a parquet file loaded from Amazon S3.
*/
public class AmazonS3ParquetFileInputReader extends BaseAmazonS3InputReader {

/**
* {@inheritDoc}
*/
@Override
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(final URI objectURI,
final IteratorManager iteratorManager) {

try {
final FileObjectMeta metadata = storage().getObjectMetadata(objectURI);
return new ParquetFileInputIterator(
metadata,
iteratorManager,
storage().getInputStream(objectURI)
);
} catch (Exception e) {
throw new ReaderException("Failed to create ParquetFileInputIterator for: " + objectURI, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019-2021 StreamThoughts.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
import io.streamthoughts.kafka.connect.filepulse.fs.BaseAmazonS3Test;
import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AmazonS3ParquetInputReaderTest extends BaseAmazonS3Test {

private static final String FILE_NAME = "src/test/resources/test.snappy.parquet";


private File objectFile;

private AmazonS3ParquetFileInputReader reader;

@Before
public void setUp() throws Exception {
super.setUp();
objectFile = new File(FILE_NAME);
reader = new AmazonS3ParquetFileInputReader();
reader.setStorage(new AmazonS3Storage(client));
reader.configure(unmodifiableCommonsProperties);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
reader.close();
}

@Test
public void should_read_all_lines() {
client.createBucket(S3_TEST_BUCKET);
client.putObject(S3_TEST_BUCKET, "my_key", objectFile);

final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
.withUri(new S3BucketKey(S3_TEST_BUCKET, "my_key").toURI())
.build();
final FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(meta.uri());
List<FileRecord<TypedStruct>> results = new ArrayList<>();
while (iterator.hasNext()) {
final RecordsIterable<FileRecord<TypedStruct>> next = iterator.next();
results.addAll(next.collect());
}
Assert.assertEquals(4, results.size());
}

@Test
public void should_throw_reader_exception() {
try (AmazonS3ParquetFileInputReader reader = mock(AmazonS3ParquetFileInputReader.class)) {
when(reader.newIterator(any())).thenThrow(new ReaderException("exception"));

assertThrows(ReaderException.class, () -> reader.newIterator(new URI("test")));
}
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019-2021 StreamThoughts.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet.ParquetFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import java.net.URI;

public class AzureBlobStorageParquetFileInputReader extends AzureBlobStorageInputReader {

@Override
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(URI objectURI, IteratorManager iteratorManager) {
try {
final FileObjectMeta metadata = storage.getObjectMetadata(objectURI);
return new ParquetFileInputIterator(
metadata,
iteratorManager,
storage().getInputStream(objectURI)
);

} catch (Exception e) {
throw new ReaderException("Failed to create ParquetFileInputIterator for: " + objectURI, e);
}
}
}
10 changes: 8 additions & 2 deletions connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2023 StreamThoughts
Expand All @@ -23,7 +22,6 @@ limitations under the License.
<version>2.14.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
<name>Kafka Connect Source File Pulse Common FS</name>

Expand All @@ -37,6 +35,14 @@ limitations under the License.
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
Expand Down
Loading

0 comments on commit 89729eb

Please sign in to comment.