Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fast-avro] FastGenericDatumReader forwards setSchema() to coldDeserializer #534

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"type": "record",
"name": "SimpleTestRecord",
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"doc": "Used in tests of fast-serde to verify writing records by DataFileWriter and reading by DataFileReader/DataFileStream",
"fields": [
{
"name": "text",
"type": "string",
"default": ""
},
{
"name": "fiveBytes",
"type": {
"name": "Fixed5",
"type": "fixed",
"size": 5
},
"default": "Fizyk"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.avro.fastserde.file;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.linkedin.avro.fastserde.FastGenericDatumReader;
import com.linkedin.avro.fastserde.FastSpecificDatumReader;
import com.linkedin.avro.fastserde.FastSpecificDatumWriter;
import com.linkedin.avro.fastserde.generated.avro.Fixed5;
import com.linkedin.avro.fastserde.generated.avro.SimpleTestRecord;
import com.linkedin.avroutil1.compatibility.AvroRecordUtil;

public class FastSerdeWithDataFileStreamTest {

@DataProvider
private Object[][] dataFileStreamDeserializationTestCases() {
Schema readerSchema = SimpleTestRecord.SCHEMA$;
return new Object[][]{
new Object[]{11, new FastSpecificDatumReader<>(null, readerSchema)},
new Object[]{12, new FastGenericDatumReader<GenericRecord>(null, readerSchema)},
};
}

@Test(groups = "deserializationTest", dataProvider = "dataFileStreamDeserializationTestCases")
<D extends IndexedRecord> void dataFileStreamShouldReadDataUsingSpecificReader(int recordsToWrite,
DatumReader<D> datumReader) throws IOException {
// given: records to be written to one file
List<SimpleTestRecord> records = new ArrayList<>(recordsToWrite);
for (byte i = 0; i < recordsToWrite; i++) {
Fixed5 fiveBytes = new Fixed5();
fiveBytes.bytes(new byte[]{'K', 'r', 'i', 's', i});

SimpleTestRecord simpleTestRecord = new SimpleTestRecord();
AvroRecordUtil.setField(simpleTestRecord, "fiveBytes", fiveBytes);
AvroRecordUtil.setField(simpleTestRecord, "text", "text-" + i);

records.add(simpleTestRecord);
}

// given: bytes array representing content of persistent file with schema and multiple records
byte[] bytes = writeTestRecordsToFile(records);

// when: pre-populated bytes array is consumed by DataFileStream (in tests more convenient than DataFileReader
// because SeekableByteArrayInput is not available for older Avro versions)
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
DataFileStream<D> dataFileStream = new DataFileStream<>(inputStream, datumReader);

// then: records read from file are the same as records sent to file
int idx = 0;
for (IndexedRecord recordReadFromFile : dataFileStream) {
Assert.assertEquals(recordReadFromFile.toString(), records.get(idx++).toString());
}
}

/**
* @return bytes array representing file content
*/
private static byte[] writeTestRecordsToFile(List<SimpleTestRecord> records) throws IOException {
Schema schema = SimpleTestRecord.SCHEMA$;
FastSpecificDatumWriter<SimpleTestRecord> datumWriter = new FastSpecificDatumWriter<>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

try (DataFileWriter<SimpleTestRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, outputStream);

for (SimpleTestRecord record : records) {
dataFileWriter.append(record);
}

dataFileWriter.flush();
}

return outputStream.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.FastGenericDeserializerGenerator;
import com.linkedin.avro.fastserde.FastGenericSerializerGenerator;
import com.linkedin.avro.fastserde.FastSerdeCache;
import com.linkedin.avro.fastserde.FastSerializer;
import com.linkedin.avro.fastserde.FastSpecificDeserializerGenerator;
import com.linkedin.avro.fastserde.FastSpecificSerializerGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;

import static com.linkedin.avro.fastserde.customized.DatumReaderCustomization.*;
Expand All @@ -19,5 +19,15 @@ default T deserialize(T reuse, Decoder d) throws IOException {
return deserialize(reuse, d, DEFAULT_DATUM_READER_CUSTOMIZATION);
}

/**
* Set the writer's schema.
* @see org.apache.avro.io.DatumReader#setSchema(Schema)
*/
default void setSchema(Schema writerSchema) {
// Implement this method only in vanilla-avro-based classes (e.g. fallback scenario).
// Normally for generated deserializers it doesn't make sense.
throw new UnsupportedOperationException("Can't change schema for already generated class.");
}

T deserialize(T reuse, Decoder d, DatumReaderCustomization customization) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void setSchema(Schema schema) {
if (readerSchema == null) {
readerSchema = writerSchema;
}

coldDeserializer.setSchema(schema);
}

@Override
Expand All @@ -99,7 +101,7 @@ public T read(T reuse, Decoder in) throws IOException {
fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema, modelData, customization);
if (fastDeserializer.hasDynamicClassGenerationDone()) {
if (fastDeserializer.isBackedByGeneratedClass()) {
/**
/*
* Runtime class generation is done successfully, so cache it.
*/
cachedFastDeserializer.compareAndSet(null, fastDeserializer);
Expand All @@ -108,7 +110,7 @@ public T read(T reuse, Decoder in) throws IOException {
+ readerSchema + "], writer schema: [" + writerSchema + "]");
}
} else {
/**
/*
* Runtime class generation fails, so this class will cache a newly generated cold deserializer, which will
* honer {@link FastSerdeCache#isFailFast()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ public void write(T data, Encoder out) throws IOException {

if (fastSerializer.hasDynamicClassGenerationDone()) {
if (fastSerializer.isBackedByGeneratedClass()) {
/**
/*
* Runtime class generation is done successfully, so cache it.
*/
cachedFastSerializer = fastSerializer;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("FastSerializer has been generated and cached for writer schema: [" + writerSchema + "]");
}
} else {
/**
/*
* Runtime class generation fails, so this class will cache a newly generated cold deserializer, which will
* honer {@link FastSerdeCache#isFailFast()}.
*/
Expand All @@ -95,7 +95,7 @@ public void write(T data, Encoder out) throws IOException {
}
fastSerializer = cachedFastSerializer;
} else {
/**
/*
* Don't use the cached serializer since it may not support the passed customization.
*/
fastSerializer = coldSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static boolean isSupportedForFastSerializer(Schema.Type schemaType) {
Schema.Type.ARRAY);
}

public static boolean isFastDeserializer(FastDeserializer deserializer) {
public static boolean isFastDeserializer(FastDeserializer<?> deserializer) {
return deserializer.isBackedByGeneratedClass();
}

Expand Down Expand Up @@ -476,7 +476,7 @@ private FastDeserializer<?> buildSpecificDeserializer(Schema writerSchema, Schem
LOGGER.error("Deserializer class instantiation exception", e);
}

return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl(writerSchema, readerSchema, modelData, customization, failFast, true);
return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true);
}

/**
Expand Down Expand Up @@ -536,7 +536,7 @@ private FastDeserializer<?> buildGenericDeserializer(Schema writerSchema, Schema
LOGGER.error("Deserializer class instantiation exception:", e);
}

return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema, modelData, customization, failFast, true);
return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true);
}

public FastSerializer<?> buildFastSpecificSerializer(Schema schema, SpecificData modelData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
import org.apache.avro.generic.CustomizedSpecificDatumWriter;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.customized.DatumWriterCustomization;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.ColdGenericDatumReader;
import org.apache.avro.generic.ColdSpecificDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -50,12 +46,17 @@ public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSc
SpecificData modelData, DatumReaderCustomization customization, boolean failFast, boolean runtimeClassGenerationDone) {
this.customization = customization == null ? DatumReaderCustomization.DEFAULT_DATUM_READER_CUSTOMIZATION : customization;
this.customizedDatumReader = Utils.isAvro14() ?
new CustomizedSpecificDatumReaderForAvro14(writerSchema, readerSchema, this.customization) :
new CustomizedSpecificDatumReaderForAvro14<>(writerSchema, readerSchema, this.customization) :
new CustomizedSpecificDatumReader<>(writerSchema, readerSchema, modelData, this.customization);
this.failFast = failFast;
this.runtimeClassGenerationDone = runtimeClassGenerationDone;
}

@Override
public void setSchema(Schema writerSchema) {
this.customizedDatumReader.setSchema(writerSchema);
}

@Override
public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException {
if (failFast) {
Expand Down Expand Up @@ -100,13 +101,18 @@ public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSch
GenericData modelData, DatumReaderCustomization customization, boolean failFast, boolean runtimeClassGenerationDone) {
this.customization = customization == null ? DatumReaderCustomization.DEFAULT_DATUM_READER_CUSTOMIZATION : customization;
this.customizedDatumReader = Utils.isAvro14() ?
new CustomizedGenericDatumReaderForAvro14(writerSchema, readerSchema, this.customization) :
new CustomizedGenericDatumReaderForAvro14<>(writerSchema, readerSchema, this.customization) :
new CustomizedGenericDatumReader<>(writerSchema, readerSchema, modelData, this.customization);

this.failFast = failFast;
this.runtimeClassGenerationDone = runtimeClassGenerationDone;
}

@Override
public void setSchema(Schema writerSchema) {
customizedDatumReader.setSchema(writerSchema);
}

@Override
public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException {
if (failFast) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private <T> T instantiate(Class<T> clazz, Schema of) {
//TODO - look for both old and new SchemaConstructable ctrs 1st
try {
Constructor<T> noArgCtr = clazz.getDeclaredConstructor(NO_ARGS);
return noArgCtr.newInstance(NO_ARGS);
return noArgCtr.newInstance();
} catch (Exception e) {
throw new IllegalStateException("while trying to instantiate a(n) " + clazz.getName(), e);
}
Expand Down
Loading