diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle b/sdks/java/extensions/sql/datacatalog/build.gradle
index 4e95f46ac222b..20a91cec7981a 100644
--- a/sdks/java/extensions/sql/datacatalog/build.gradle
+++ b/sdks/java/extensions/sql/datacatalog/build.gradle
@@ -59,7 +59,6 @@ task runDataCatalogExample(type: JavaExec) {
]
}
-
task integrationTest(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsUtils.java
new file mode 100644
index 0000000000000..ae74a1ac74980
--- /dev/null
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/GcsUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.datacatalog.Entry;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/**
+ * Utils to handle GCS entries from Cloud Data Catalog.
+ *
+ *
At the moment everything here is a brittle hack because there doesn't seem to be an actual
+ * proto available for this yet in the client library, so we have to do something like parsing this
+ * manually, or generating our own protos/clients.
+ */
+class GcsUtils {
+
+ /**
+ * 'gcs_fileset_spec' field id, as defined in .proto.
+ *
+ *
Until we get the updated proto or a generated client this is the most straightforward way of
+ * extracting data from it.
+ */
+ private static final int GCS_FILESET_SPEC = 6;
+
+ /** Check if the entry represents a GCS fileset in Data Catalog. */
+ static boolean isGcs(Entry entry) {
+ // 'gcs_fileset_spec' is not in the generated client yet
+ // so we have to manually parse it from unknown fields
+ return entry.getUnknownFields().hasField(GCS_FILESET_SPEC);
+ }
+
+ /** Creates a Beam SQL table description from a GCS fileset entry. */
+ static Table.Builder tableBuilder(Entry entry) {
+ UnknownFieldSet.Field gcsFilesetSpec = entry.getUnknownFields().getField(GCS_FILESET_SPEC);
+ List filesetFields = gcsFilesetSpec.getLengthDelimitedList();
+
+ // We support exactly one 'file_patterns' field and nothing else at the moment
+ if (filesetFields.size() != 1) {
+ throw new UnsupportedOperationException(
+ "Unable to parse GCS entry '" + entry.getName() + "'");
+ }
+ return readStringField(filesetFields.get(0));
+ }
+
+ private static Table.Builder readStringField(ByteString filesetField) {
+ try {
+
+ // TODO: Fix as soon as updated Data Catalog proto becomes available,
+ // replace with actual field access on a generated class.
+
+ CodedInputStream codedInputStream = filesetField.newCodedInput();
+ codedInputStream.readRawVarint64(); // consume string length prefix
+
+ String filePattern = codedInputStream.readStringRequireUtf8();
+
+ if (!filePattern.startsWith("gs://")) {
+ throw new UnsupportedOperationException(
+ "Unsupported file pattern. "
+ + "Only file patterns with 'gs://' schema are supported at the moment.");
+ }
+
+ return Table.builder()
+ .type("text")
+ .location(filePattern)
+ .properties(new JSONObject())
+ .comment("");
+
+ } catch (IOException e) {
+ throw new UnsupportedOperationException(
+ "Unable to parse a GCS fileset Entry from Data Catalog");
+ }
+ }
+}
diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
index cd16615cd3eb7..6c0b62ec89397 100644
--- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
+++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
@@ -45,15 +45,24 @@ static Table toBeamTable(String tableName, Entry entry) {
+ "' in Data Catalog: "
+ entry.toString());
}
+ Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema());
String service = URI.create(entry.getLinkedResource()).getAuthority().toLowerCase();
- if (!TABLE_FACTORIES.containsKey(service)) {
- throw new UnsupportedOperationException(
- "Unsupported SQL source kind: " + entry.getLinkedResource());
+ Table.Builder table = null;
+ if (TABLE_FACTORIES.containsKey(service)) {
+ table = TABLE_FACTORIES.get(service).tableBuilder(entry);
}
- Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema());
- return TABLE_FACTORIES.get(service).tableBuilder(entry).schema(schema).name(tableName).build();
+ if (GcsUtils.isGcs(entry)) {
+ table = GcsUtils.tableBuilder(entry);
+ }
+
+ if (table != null) {
+ return table.schema(schema).name(tableName).build();
+ }
+
+ throw new UnsupportedOperationException(
+ "Unsupported SQL source kind: " + entry.getLinkedResource());
}
}
diff --git a/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogGCSIT.java b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogGCSIT.java
new file mode 100644
index 0000000000000..8a9c464f2f4a5
--- /dev/null
+++ b/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogGCSIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+
+import java.io.Serializable;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for DataCatalog+GCS. */
+@RunWith(JUnit4.class)
+public class DataCatalogGCSIT implements Serializable {
+
+ private static final Schema ID_NAME_TYPE_SCHEMA =
+ Schema.builder()
+ .addNullableField("id", INT32)
+ .addNullableField("name", STRING)
+ .addNullableField("type", STRING)
+ .build();
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testReadFromGCS() throws Exception {
+ String gcsEntryId =
+ "`datacatalog`" // this is part of the resource name in DataCatalog, so it has to be
+ + ".`entry`" // different from the table provider name ("dc" in this test)
+ + ".`apache-beam-testing`"
+ + ".`us-central1`"
+ + ".`samples`"
+ + ".`integ_test_small_csv_test_1`";
+
+ PCollection result =
+ pipeline.apply(
+ "query",
+ SqlTransform.query("SELECT id, name, type FROM " + gcsEntryId)
+ .withDefaultTableProvider(
+ "dc", DataCatalogTableProvider.create(pipeline.getOptions())));
+
+ pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(true);
+ PAssert.that(result)
+ .containsInAnyOrder(
+ row(1, "customer1", "test"),
+ row(2, "customer2", "test"),
+ row(3, "customer1", "test"),
+ row(4, "customer2", "test"));
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ private Row row(int id, String name, String type) {
+ return Row.withSchema(ID_NAME_TYPE_SCHEMA).addValues(id, name, type).build();
+ }
+}