Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
[SQL] Add support for GCS entries in DataCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
akedin authored and kennknowles committed Oct 4, 2019
1 parent 088a502 commit be5df76
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 6 deletions.
1 change: 0 additions & 1 deletion sdks/java/extensions/sql/datacatalog/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ task runDataCatalogExample(type: JavaExec) {
]
}


task integrationTest(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;

import com.alibaba.fastjson.JSONObject;
import com.google.cloud.datacatalog.Entry;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.UnknownFieldSet;
import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.meta.Table;

/**
* Utils to handle GCS entries from Cloud Data Catalog.
*
* <p>At the moment everything here is a brittle hack because there doesn't seem to be an actual
* proto available for this yet in the client library, so we have to do something like parsing this
* manually, or generating our own protos/clients.
*/
class GcsUtils {

/**
* 'gcs_fileset_spec' field id, as defined in .proto.
*
* <p>Until we get the updated proto or a generated client this is the most straightforward way of
* extracting data from it.
*/
private static final int GCS_FILESET_SPEC = 6;

/** Check if the entry represents a GCS fileset in Data Catalog. */
static boolean isGcs(Entry entry) {
// 'gcs_fileset_spec' is not in the generated client yet
// so we have to manually parse it from unknown fields
return entry.getUnknownFields().hasField(GCS_FILESET_SPEC);
}

/** Creates a Beam SQL table description from a GCS fileset entry. */
static Table.Builder tableBuilder(Entry entry) {
UnknownFieldSet.Field gcsFilesetSpec = entry.getUnknownFields().getField(GCS_FILESET_SPEC);
List<ByteString> filesetFields = gcsFilesetSpec.getLengthDelimitedList();

// We support exactly one 'file_patterns' field and nothing else at the moment
if (filesetFields.size() != 1) {
throw new UnsupportedOperationException(
"Unable to parse GCS entry '" + entry.getName() + "'");
}
return readStringField(filesetFields.get(0));
}

private static Table.Builder readStringField(ByteString filesetField) {
try {

// TODO: Fix as soon as updated Data Catalog proto becomes available,
// replace with actual field access on a generated class.

CodedInputStream codedInputStream = filesetField.newCodedInput();
codedInputStream.readRawVarint64(); // consume string length prefix

String filePattern = codedInputStream.readStringRequireUtf8();

if (!filePattern.startsWith("gs://")) {
throw new UnsupportedOperationException(
"Unsupported file pattern. "
+ "Only file patterns with 'gs://' schema are supported at the moment.");
}

return Table.builder()
.type("text")
.location(filePattern)
.properties(new JSONObject())
.comment("");

} catch (IOException e) {
throw new UnsupportedOperationException(
"Unable to parse a GCS fileset Entry from Data Catalog");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,24 @@ static Table toBeamTable(String tableName, Entry entry) {
+ "' in Data Catalog: "
+ entry.toString());
}
Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema());

String service = URI.create(entry.getLinkedResource()).getAuthority().toLowerCase();

if (!TABLE_FACTORIES.containsKey(service)) {
throw new UnsupportedOperationException(
"Unsupported SQL source kind: " + entry.getLinkedResource());
Table.Builder table = null;
if (TABLE_FACTORIES.containsKey(service)) {
table = TABLE_FACTORIES.get(service).tableBuilder(entry);
}

Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema());
return TABLE_FACTORIES.get(service).tableBuilder(entry).schema(schema).name(tableName).build();
if (GcsUtils.isGcs(entry)) {
table = GcsUtils.tableBuilder(entry);
}

if (table != null) {
return table.schema(schema).name(tableName).build();
}

throw new UnsupportedOperationException(
"Unsupported SQL source kind: " + entry.getLinkedResource());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;

import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;

import java.io.Serializable;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Integration tests for DataCatalog+GCS. */
@RunWith(JUnit4.class)
public class DataCatalogGCSIT implements Serializable {

private static final Schema ID_NAME_TYPE_SCHEMA =
Schema.builder()
.addNullableField("id", INT32)
.addNullableField("name", STRING)
.addNullableField("type", STRING)
.build();

@Rule public transient TestPipeline pipeline = TestPipeline.create();

@Test
public void testReadFromGCS() throws Exception {
String gcsEntryId =
"`datacatalog`" // this is part of the resource name in DataCatalog, so it has to be
+ ".`entry`" // different from the table provider name ("dc" in this test)
+ ".`apache-beam-testing`"
+ ".`us-central1`"
+ ".`samples`"
+ ".`integ_test_small_csv_test_1`";

PCollection<Row> result =
pipeline.apply(
"query",
SqlTransform.query("SELECT id, name, type FROM " + gcsEntryId)
.withDefaultTableProvider(
"dc", DataCatalogTableProvider.create(pipeline.getOptions())));

pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(true);
PAssert.that(result)
.containsInAnyOrder(
row(1, "customer1", "test"),
row(2, "customer2", "test"),
row(3, "customer1", "test"),
row(4, "customer2", "test"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}

private Row row(int id, String name, String type) {
return Row.withSchema(ID_NAME_TYPE_SCHEMA).addValues(id, name, type).build();
}
}

0 comments on commit be5df76

Please sign in to comment.