Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
[SQL] Support complex identifiers in DataCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
akedin committed Aug 15, 2019
1 parent a44f7d8 commit 989c928
Show file tree
Hide file tree
Showing 10 changed files with 538 additions and 144 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ task javaPostCommit() {
task sqlPostCommit() {
dependsOn ":sdks:java:extensions:sql:postCommit"
dependsOn ":sdks:java:extensions:sql:jdbc:postCommit"
dependsOn ":sdks:java:extensions:sql:datacatalog:postCommit"
}

task javaPostCommitPortabilityApi () {
Expand Down
36 changes: 34 additions & 2 deletions sdks/java/extensions/sql/datacatalog/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import groovy.json.JsonOutput

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -18,6 +16,8 @@ import groovy.json.JsonOutput
* limitations under the License.
*/

import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }

applyJavaNature()
Expand Down Expand Up @@ -59,3 +59,35 @@ task runDataCatalogExample(type: JavaExec) {
"--tempLocation=${gcsTempRoot}",
]
}


task integrationTest(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/'

// Disable Gradle cache (it should not be used because the IT's won't run).
outputs.upToDateWhen { false }

def pipelineOptions = [
"--project=${gcpProject}",
"--tempLocation=${gcsTempRoot}",
"--blockOnRun=false"]

systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)

include '**/*IT.class'
maxParallelForks 4
classpath = project(":sdks:java:extensions:sql:datacatalog")
.sourceSets
.test
.runtimeClasspath
testClassesDirs = files(project(":sdks:java:extensions:sql:datacatalog").sourceSets.test.output.classesDirs)
useJUnit {}
}

task postCommit {
group = "Verification"
description = "Various integration tests"
dependsOn integrationTest
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Uses DataCatalog to get the source type and schema for a table. */
public class DataCatalogTableProvider implements TableProvider {
public class DataCatalogTableProvider extends FullNameTableProvider {

private Map<String, TableProvider> delegateProviders;
private Map<String, Table> tableCache;
Expand Down Expand Up @@ -92,8 +95,23 @@ public Map<String, Table> getTables() {
}

@Override
public @Nullable Table getTable(String tableName) {
return loadTable(tableName);
public @Nullable Table getTable(String tableNamePart) {
throw new UnsupportedOperationException(
"Loading a table by partial name '" + tableNamePart + "' is unsupported");
}

@Override
public @Nullable Table getTableByFullName(TableName fullTableName) {

ImmutableList<String> allNameParts =
ImmutableList.<String>builder()
.addAll(fullTableName.getPath())
.add(fullTableName.getTableName())
.build();

String fullEscapedTableName = ZetaSqlIdUtils.escapeAndJoin(allNameParts);

return loadTable(fullEscapedTableName);
}

private @Nullable Table loadTable(String tableName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;

import static java.util.stream.Collectors.joining;

import java.util.List;
import java.util.regex.Pattern;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Utils to work with ZetaSQL-compatible IDs. */
class ZetaSqlIdUtils {

/**
* Some special characters we explicitly handle.
*
* <p>Everything else is ignored, e.g. tabs, newlines, etc.
*/
private static final Pattern SPECIAL_CHARS_ESCAPE =
Pattern.compile(
"(?<SpecialChar>["
+ "\\\\" // slash
+ "`" // backtick
+ "'" // single quote
+ "\"" // double quote
+ "?" // question mark
+ "])");

private static final ImmutableMap<String, String> WHITESPACES =
ImmutableMap.of(
"\n", "\\\\n",
"\t", "\\\\t",
"\r", "\\\\r",
"\f", "\\\\f");

private static final Pattern SIMPLE_ID = Pattern.compile("[A-Za-z_][A-Za-z_0-9]*");

/**
* Joins parts into a single compound ZetaSQL identifier.
*
* <p>Escapes backticks, slashes, double and single quotes, doesn't handle other special
* characters for now.
*/
static String escapeAndJoin(List<String> parts) {
return parts.stream()
.map(ZetaSqlIdUtils::escapeSpecialChars)
.map(ZetaSqlIdUtils::replaceWhitespaces)
.map(ZetaSqlIdUtils::backtickIfNeeded)
.collect(joining("."));
}

private static String escapeSpecialChars(String str) {
return SPECIAL_CHARS_ESCAPE.matcher(str).replaceAll("\\\\${SpecialChar}");
}

private static String replaceWhitespaces(String s) {
return WHITESPACES.keySet().stream()
.reduce(s, (str, whitespace) -> str.replaceAll(whitespace, WHITESPACES.get(whitespace)));
}

private static String backtickIfNeeded(String s) {
return SIMPLE_ID.matcher(s).matches() ? s : ("`" + s + "`");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;

import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Integration tests for DataCatalog+BigQuery. */
@RunWith(JUnit4.class)
public class DataCatalogBigQueryIT {

private static final Schema ID_NAME_SCHEMA =
Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build();

@Rule public transient TestPipeline writeToBQPipeline = TestPipeline.create();
@Rule public transient TestPipeline readPipeline = TestPipeline.create();
@Rule public transient TestBigQuery bigQuery = TestBigQuery.create(ID_NAME_SCHEMA);

@Test
public void testReadWrite() throws Exception {
createBQTableWith(
new TableRow().set("id", 1).set("name", "name1"),
new TableRow().set("id", 2).set("name", "name2"),
new TableRow().set("id", 3).set("name", "name3"));

TableReference bqTable = bigQuery.tableReference();
String tableId =
String.format(
"bigquery.`table`.`%s`.`%s`.`%s`",
bqTable.getProjectId(), bqTable.getDatasetId(), bqTable.getTableId());

PCollection<Row> result =
readPipeline.apply(
"query",
SqlTransform.query("SELECT id, name FROM " + tableId)
.withDefaultTableProvider(
"datacatalog", DataCatalogTableProvider.create(readPipeline.getOptions())));

PAssert.that(result).containsInAnyOrder(row(1, "name1"), row(2, "name2"), row(3, "name3"));
readPipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}

private Row row(long id, String name) {
return Row.withSchema(ID_NAME_SCHEMA).addValues(id, name).build();
}

private void createBQTableWith(TableRow r1, TableRow r2, TableRow r3) {
writeToBQPipeline
.apply(Create.of(r1, r2, r3).withCoder(TableRowJsonCoder.of()))
.apply(
BigQueryIO.writeTableRows()
.to(bigQuery.tableSpec())
.withSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("id").setType("INTEGER"),
new TableFieldSchema().setName("name").setType("STRING"))))
.withoutValidation());
writeToBQPipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.List;
import org.junit.Test;

/** Unit tests for {@link ZetaSqlIdUtils}. */
public class ZetaSqlIdUtilsTest {

@Test
public void testHandlesSimpleIds() {
List<String> id = Arrays.asList("aaa", "BbB", "zAzzz00");
assertEquals("aaa.BbB.zAzzz00", ZetaSqlIdUtils.escapeAndJoin(id));
}

@Test
public void testHandlesMixedIds() {
List<String> id = Arrays.asList("aaa", "Bb---B", "zAzzz00");
assertEquals("aaa.`Bb---B`.zAzzz00", ZetaSqlIdUtils.escapeAndJoin(id));
}

@Test
public void testHandlesSpecialChars() {
List<String> id = Arrays.asList("a\\a", "b`b", "c'c", "d\"d", "e?e");
assertEquals("`a\\\\a`.`b\\`b`.`c\\'c`.`d\\\"d`.`e\\?e`", ZetaSqlIdUtils.escapeAndJoin(id));
}

@Test
public void testHandlesSpecialCharsInOnePart() {
List<String> id = Arrays.asList("a\\ab`bc'cd\"de?e");
assertEquals("`a\\\\ab\\`bc\\'cd\\\"de\\?e`", ZetaSqlIdUtils.escapeAndJoin(id));
}

@Test
public void testHandlesWhiteSpaces() {
List<String> id = Arrays.asList("a\na", "b\tb", "c\rc", "d\fd");
assertEquals("`a\\na`.`b\\tb`.`c\\rc`.`d\\fd`", ZetaSqlIdUtils.escapeAndJoin(id));
}

@Test
public void testHandlesWhiteSpacesInOnePart() {
List<String> id = Arrays.asList("a\nab\tbc\rcd\fd");
assertEquals("`a\\nab\\tbc\\rcd\\fd`", ZetaSqlIdUtils.escapeAndJoin(id));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** TableResolutionUtils. */
public class TableResolutionUtils {
/** Utils to wire up the custom table resolution into Calcite's planner. */
class TableResolutionUtils {

private static final Logger LOG = LoggerFactory.getLogger(TableResolutionUtils.class);

Expand Down
Loading

0 comments on commit 989c928

Please sign in to comment.