Skip to content

Commit

Permalink
[BEAM-14020] Adding SchemaTransform, SchemaTransformProvider, TypedSc…
Browse files Browse the repository at this point in the history
…hemaTransformProvider, and PCollectionRowTuple (apache#16958)

* Adding SchemaTransform, SchemaTransformProvider, TypedSchemaTransformProvider and PCollectionRowTuple.

Doc: https://s.apache.org/beam-schema-transform

* Update sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java

* Fixing the test.

Co-authored-by: Brian Hulette <hulettbh@gmail.com>
  • Loading branch information
laraschmidt and TheNeuralBit committed Mar 17, 2022
1 parent 280099a commit 5f7bb55
Show file tree
Hide file tree
Showing 6 changed files with 775 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollectionRowTuple;

/**
* An abstraction to create schema capable and aware transforms. The interface is intended to be
* used in conjunction with the interface {@link SchemaTransformProvider}.
*
* <p>The interfaces can be implemented to make transforms available in other SDKs in addition to
* Beam SQL.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
*/
@Internal
@Experimental(Kind.SCHEMAS)
public interface SchemaTransform {
PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;

/**
* Provider to create {@link SchemaTransform} instances for use in Beam SQL and other SDKs.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
*/
@Internal
@Experimental(Kind.SCHEMAS)
public interface SchemaTransformProvider {
/** Returns an id that uniquely represents this transform. */
String identifier();

/**
* Returns the expected schema of the configuration object. Note this is distinct from the schema
* of the transform itself.
*/
Schema configurationSchema();

/**
* Produce a SchemaTransform some transform-specific configuration object. Can throw a {@link
* InvalidConfigurationException} or a {@link InvalidSchemaException}.
*/
SchemaTransform from(Row configuration);

/** Returns the input collection names of this transform. */
List<String> inputCollectionNames();

/** Returns the output collection names of this transform. */
List<String> outputCollectionNames();

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
* Optional.empty() is returned.
*/
default Optional<List<String>> dependencies(Row configuration, PipelineOptions options) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.Row;

/**
* Like {@link SchemaTransformProvider} except uses a configuration object instead of Schema and
* Row.
*
* <p>ConfigT should be available in the SchemaRegistry.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
*/
@Internal
@Experimental(Kind.SCHEMAS)
public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {

abstract Class<ConfigT> configurationClass();

/**
* Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
* {@link InvalidSchemaException}.
*/
abstract SchemaTransform from(ConfigT configuration);

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
* Optional.empty() is returned.
*/
Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions options) {
return Optional.empty();
}

@Override
public final Schema configurationSchema() {
try {
return SchemaRegistry.createDefault().getSchema(configurationClass());
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
+ identifier()
+ " SchemaTransformProvider's configuration.");
}
}

@Override
public final SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}

@Override
public final Optional<List<String>> dependencies(Row configuration, PipelineOptions options) {
return dependencies(configFromRow(configuration), options);
}

private ConfigT configFromRow(Row configuration) {
try {
return SchemaRegistry.createDefault()
.getFromRowFunction(configurationClass())
.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
}
}
}
Loading

0 comments on commit 5f7bb55

Please sign in to comment.