Skip to content

Commit

Permalink
chore: fix checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Mar 16, 2024
1 parent dbe079c commit 3a12e7e
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import java.net.URI;

/**
* The {@code AliyunOSSParquetFileInputReader} can be used to created records from a parquet file loaded from Aliyun OSS.
* The {@code AliyunOSSParquetFileInputReader} can be used to
* created records from a parquet file loaded from Aliyun OSS.
*/
public class AliyunOSSParquetFileInputReader extends BaseAliyunOSSInputReader {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
import io.streamthoughts.kafka.connect.filepulse.fs.BaseAmazonS3Test;
Expand All @@ -32,11 +37,6 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AmazonS3ParquetInputReaderTest extends BaseAmazonS3Test {

private static final String FILE_NAME = "src/test/resources/test.snappy.parquet";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.io.*;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet;

import java.io.*;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet;

import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.*;
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.bool;
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.float32;
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.float64;
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.int32;
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.int64;
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.string;

import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;

import java.util.*;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.function.TriFunction;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
Expand Down Expand Up @@ -66,7 +73,9 @@ public static TypedStruct fromParquetFileReader(final SimpleGroup simpleGroup) {
GroupType group = simpleGroup.getType();
for (int i = 0; i < group.getFieldCount(); i++) {
org.apache.parquet.schema.Type field = group.getType(i);
String filedType = field instanceof PrimitiveType ? field.asPrimitiveType().getPrimitiveTypeName().name() : field.getLogicalTypeAnnotation().toString();
String filedType = field instanceof PrimitiveType ?
field.asPrimitiveType().getPrimitiveTypeName().name() :
field.getLogicalTypeAnnotation().toString();
struct.put(field.getName(), getTypedValueFromSimpleGroup(filedType, simpleGroup, i));
}
return struct;
Expand All @@ -91,7 +100,10 @@ private static TypedValue list(String fieldName, SimpleGroup simpleGroup, int i)
//Get a group of a list element
Group subGroup = group.getGroup(0, k);
//Get the name of the field type of the list element
String fieldTypeString = subGroup.getType().getType(0).asPrimitiveType().getPrimitiveTypeName().name();
String fieldTypeString = subGroup
.getType()
.getType(0)
.asPrimitiveType().getPrimitiveTypeName().name();
//Convert list element into TypedValue
TypedValue value = getTypedValueFromSimpleGroup(fieldTypeString, (SimpleGroup) subGroup, 0);
//Get the array element type, which will be the list type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

import java.util.ArrayList;
import java.util.List;

import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.*;
import org.junit.Assert;
Expand All @@ -37,19 +44,19 @@ public class ParquetTypedStructConverterTest {

@Test
public void check_string_value_converter() {
var stringValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.BINARY, "test");
var stringValue = new PrimitiveType(REPEATED, BINARY, "test");
listType.add(stringValue);
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, STRING_VALUE);
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Assert.assertEquals(STRING_VALUE, typedStruct.get("test").getString());
}
@Test
public void check_int_value_converter() {
var integerValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.INT32, "integer");
var integerValue = new PrimitiveType(REPEATED, INT32, "integer");
listType.add(0, integerValue);
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, INT_VALUE);
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Expand All @@ -58,9 +65,9 @@ public void check_int_value_converter() {

@Test
public void check_double_value_converter() {
var doubleValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.DOUBLE, "double");
var doubleValue = new PrimitiveType(REPEATED, DOUBLE, "double");
listType.add(0, doubleValue);
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, DOUBLE_VALUE);
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Expand All @@ -69,9 +76,9 @@ public void check_double_value_converter() {

@Test
public void check_long_value_converter() {
var longValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.INT64, "long");
var longValue = new PrimitiveType(REPEATED, INT64, "long");
listType.add(0, longValue);
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, LONG_VALUE);
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Expand All @@ -80,9 +87,9 @@ public void check_long_value_converter() {

@Test
public void check_boolean_value_converter() {
var booleanValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.BOOLEAN, "boolean");
var booleanValue = new PrimitiveType(REPEATED, BOOLEAN, "boolean");
listType.add(0, booleanValue);
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, BOOLEAN_VALUE);
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Expand All @@ -91,9 +98,9 @@ public void check_boolean_value_converter() {

@Test
public void check_float_value_converter() {
var floatValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.FLOAT, "float");
var floatValue = new PrimitiveType(REPEATED, FLOAT, "float");
listType.add(0, floatValue);
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, FLOAT_VALUE);
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Expand All @@ -103,7 +110,7 @@ public void check_float_value_converter() {
@Test
public void check_array_value_converter() {
listType.add(0, generateArray());
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, baseArraySimpleGroup);

var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
Expand All @@ -114,7 +121,7 @@ public void check_array_value_converter() {
@Test
public void check_array_value_converter_when_array_is_empty() {
listType.add(0, generateEmptyArray());
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
simpleGroup.add(0, baseArraySimpleGroup);

var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
Expand All @@ -125,15 +132,15 @@ public void check_array_value_converter_when_array_is_empty() {
@Test
public void check_value_converter_when_repetition_count_equals_0() {
listType.add(0, generateFieldRepetitionCountEmpty());
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));

var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);

Assert.assertTrue(typedStruct.get("REPLICATION_EMPTY").isNull());
}

private GroupType generateArray() {
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "element");
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, INT32, "element");

var dataList = new GroupType(Type.Repetition.OPTIONAL, "LIST", elementList);

Expand All @@ -147,20 +154,20 @@ private GroupType generateArray() {
baseArraySimpleGroup = new SimpleGroup(schemaGroup);
baseArraySimpleGroup.add(0, dataGroup1);
baseArraySimpleGroup.add(0, dataGroup2);
return ConversionPatterns.listOfElements(Type.Repetition.REPEATED, "LIST", schemaGroup);
return ConversionPatterns.listOfElements(REPEATED, "LIST", schemaGroup);
}

private GroupType generateEmptyArray() {
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "element");
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, INT32, "element");
var dataList = new GroupType(Type.Repetition.OPTIONAL, "LIST", elementList);

var schemaGroup = new GroupType(Type.Repetition.OPTIONAL, "element", List.of(dataList));
baseArraySimpleGroup = new SimpleGroup(schemaGroup);
return ConversionPatterns.listOfElements(Type.Repetition.REPEATED, "EMPTY_LIST", schemaGroup);
return ConversionPatterns.listOfElements(REPEATED, "EMPTY_LIST", schemaGroup);
}

private GroupType generateFieldRepetitionCountEmpty() {
var schemaGroup = new GroupType(Type.Repetition.OPTIONAL, "element");
return ConversionPatterns.listOfElements(Type.Repetition.REPEATED, "REPLICATION_EMPTY", schemaGroup);
return ConversionPatterns.listOfElements(REPEATED, "REPLICATION_EMPTY", schemaGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,30 @@
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.source.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import io.streamthoughts.kafka.connect.filepulse.utils.MockFileCleaner;
import io.streamthoughts.kafka.connect.filepulse.utils.TemporaryFileInput;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.junit.Assert;
Expand All @@ -32,13 +49,6 @@
import org.junit.rules.TestRule;
import org.mockito.Mockito;

import java.io.File;
import java.util.*;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class DefaultFileSystemMonitorTest {

private static final SourceOffsetPolicy OFFSET_MANAGER = new SourceOffsetPolicy() {
Expand Down

0 comments on commit 3a12e7e

Please sign in to comment.