Skip to content

Commit

Permalink
[FLINK-23010][hive] HivePartitionFetcherContextBase shouldn't list fo…
Browse files Browse the repository at this point in the history
…lders to discover new partitions

This closes apache#16182
  • Loading branch information
lirui-apache committed Jun 25, 2021
1 parent e58f7dd commit 53034ea
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,36 @@
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.filesystem.PartitionTimeExtractor;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;

/** Base class for table partition fetcher context. */
public abstract class HivePartitionFetcherContextBase<P> implements HivePartitionContext<P> {
Expand All @@ -64,13 +68,14 @@ public abstract class HivePartitionFetcherContextBase<P> implements HivePartitio
protected final String defaultPartitionName;
protected final ConsumeOrder consumeOrder;

protected transient IMetaStoreClient metaStoreClient;
protected transient HiveMetastoreClientWrapper metaStoreClient;
protected transient StorageDescriptor tableSd;
protected transient Properties tableProps;
protected transient Path tableLocation;
protected transient FileSystem fs;
private transient PartitionTimeExtractor extractor;
private transient Table table;
// remember the map from partition to its create time
private transient Map<List<String>, Long> partValuesToCreateTime;

public HivePartitionFetcherContextBase(
ObjectPath tablePath,
Expand All @@ -95,7 +100,8 @@ public HivePartitionFetcherContextBase(

@Override
public void open() throws Exception {
metaStoreClient = hiveShim.getHiveMetastoreClient(HiveConfUtils.create(confWrapper.conf()));
metaStoreClient =
new HiveMetastoreClientWrapper(HiveConfUtils.create(confWrapper.conf()), hiveShim);
table = metaStoreClient.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
tableSd = table.getSd();
tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
Expand All @@ -111,7 +117,7 @@ public void open() throws Exception {
extractorClass,
extractorPattern);
tableLocation = new Path(table.getSd().getLocation());
fs = tableLocation.getFileSystem(confWrapper.conf());
partValuesToCreateTime = new HashMap<>();
}

@Override
Expand All @@ -129,17 +135,29 @@ public List<ComparablePartitionValue> getComparablePartitionValueList() throws E
}
break;
case CREATE_TIME_ORDER:
FileStatus[] statuses =
HivePartitionUtils.getFileStatusRecurse(
tableLocation, partitionKeys.size(), fs);
for (FileStatus status : statuses) {
List<String> partValues =
extractPartitionValues(
new org.apache.flink.core.fs.Path(status.getPath().toString()));
Long creatTime =
TimestampData.fromTimestamp(new Timestamp(status.getModificationTime()))
.getMillisecond();
partitionValueList.add(getComparablePartitionByTime(partValues, creatTime));
partitionNames =
metaStoreClient.listPartitionNames(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
Short.MAX_VALUE);
List<String> newNames =
partitionNames.stream()
.filter(
n ->
!partValuesToCreateTime.containsKey(
extractPartitionValues(n)))
.collect(Collectors.toList());
List<Partition> newPartitions =
metaStoreClient.getPartitionsByNames(
tablePath.getDatabaseName(), tablePath.getObjectName(), newNames);
for (Partition partition : newPartitions) {
partValuesToCreateTime.put(
partition.getValues(), getPartitionCreateTime(partition));
}
for (List<String> partValues : partValuesToCreateTime.keySet()) {
partitionValueList.add(
getComparablePartitionByTime(
partValues, partValuesToCreateTime.get(partValues)));
}
break;
case PARTITION_TIME_ORDER:
Expand All @@ -149,9 +167,7 @@ public List<ComparablePartitionValue> getComparablePartitionValueList() throws E
tablePath.getObjectName(),
Short.MAX_VALUE);
for (String partitionName : partitionNames) {
List<String> partValues =
extractPartitionValues(
new org.apache.flink.core.fs.Path(partitionName));
List<String> partValues = extractPartitionValues(partitionName);
Long partitionTime = toMills(extractor.extract(partitionKeys, partValues));
partitionValueList.add(getComparablePartitionByTime(partValues, partitionTime));
}
Expand All @@ -163,6 +179,19 @@ public List<ComparablePartitionValue> getComparablePartitionValueList() throws E
return partitionValueList;
}

private long getPartitionCreateTime(Partition partition) throws IOException {
Path partLocation = new Path(partition.getSd().getLocation());
FileSystem fs = partLocation.getFileSystem(confWrapper.conf());
FileStatus fileStatus = fs.getFileStatus(partLocation);
return TimestampData.fromTimestamp(new Timestamp(fileStatus.getModificationTime()))
.getMillisecond();
}

private static List<String> extractPartitionValues(String partitionName) {
return PartitionPathUtils.extractPartitionValues(
new org.apache.flink.core.fs.Path(partitionName));
}

private ComparablePartitionValue<List<String>, Long> getComparablePartitionByTime(
List<String> partValues, Long time) {
return new ComparablePartitionValue<List<String>, Long>() {
Expand All @@ -188,7 +217,7 @@ private ComparablePartitionValue<List<String>, String> getComparablePartitionByN

@Override
public List<String> getPartitionValue() {
return extractPartitionValues(new org.apache.flink.core.fs.Path(partitionName));
return extractPartitionValues(partitionName);
}

@Override
Expand All @@ -202,6 +231,9 @@ public String getComparator() {

@Override
public void close() throws Exception {
if (partValuesToCreateTime != null) {
partValuesToCreateTime.clear();
}
if (this.metaStoreClient != null) {
this.metaStoreClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -51,8 +50,6 @@
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to handle different Hive
* versions. Methods provided mostly conforms to IMetaStoreClient interfaces except those that
Expand All @@ -68,11 +65,12 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
private final HiveShim hiveShim;

public HiveMetastoreClientWrapper(HiveConf hiveConf, String hiveVersion) {
this(hiveConf, HiveShimLoader.loadHiveShim(hiveVersion));
}

public HiveMetastoreClientWrapper(HiveConf hiveConf, HiveShim hiveShim) {
this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(hiveVersion),
"hiveVersion cannot be null or empty");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
this.hiveShim = hiveShim;
// use synchronized client in case we're talking to a remote HMS
client =
HiveCatalog.isEmbeddedMetastore(hiveConf)
Expand Down Expand Up @@ -139,6 +137,11 @@ public Partition getPartition(String databaseName, String tableName, List<String
return client.getPartition(databaseName, tableName, list);
}

public List<Partition> getPartitionsByNames(
String databaseName, String tableName, List<String> partitionNames) throws TException {
return client.getPartitionsByNames(databaseName, tableName, partitionNames);
}

public List<String> listPartitionNames(
String databaseName, String tableName, short maxPartitions)
throws MetaException, TException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connectors.hive.read;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
import static org.junit.Assert.assertEquals;

/** Tests for hive partition fetch implementations. */
public class HivePartitionFetcherTest {

@Test
public void testIgnoreNonExistPartition() throws Exception {
// it's possible a partition path exists but the partition is not added to HMS, e.g. the
// partition is still being loaded, or the path is simply misplaced
// make sure the fetch can ignore such paths
HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog();
hiveCatalog.open();

// create test table
String[] fieldNames = new String[] {"i", "date"};
DataType[] fieldTypes = new DataType[] {DataTypes.INT(), DataTypes.STRING()};
TableSchema schema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
List<String> partitionKeys = Collections.singletonList("date");
Map<String, String> options = new HashMap<>();
options.put("connector", "hive");
CatalogTable catalogTable = new CatalogTableImpl(schema, partitionKeys, options, null);
ObjectPath tablePath = new ObjectPath("default", "test");
hiveCatalog.createTable(tablePath, catalogTable, false);

// add a valid partition path
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
Path path = new Path(hiveTable.getSd().getLocation(), "date=2021-06-18");
FileSystem fs = path.getFileSystem(hiveCatalog.getHiveConf());
fs.mkdirs(path);

// test partition-time order
Configuration flinkConf = new Configuration();
flinkConf.set(STREAMING_SOURCE_PARTITION_ORDER, "partition-time");
HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveCatalog.getHiveVersion());
JobConfWrapper jobConfWrapper = new JobConfWrapper(new JobConf(hiveCatalog.getHiveConf()));
String defaultPartName = "__HIVE_DEFAULT_PARTITION__";
MyHivePartitionFetcherContext fetcherContext =
new MyHivePartitionFetcherContext(
tablePath,
hiveShim,
jobConfWrapper,
partitionKeys,
fieldTypes,
fieldNames,
flinkConf,
defaultPartName);
fetcherContext.open();
assertEquals(0, fetcherContext.getComparablePartitionValueList().size());

// test create-time order
flinkConf.set(STREAMING_SOURCE_PARTITION_ORDER, "create-time");
fetcherContext =
new MyHivePartitionFetcherContext(
tablePath,
hiveShim,
jobConfWrapper,
partitionKeys,
fieldTypes,
fieldNames,
flinkConf,
defaultPartName);
fetcherContext.open();
assertEquals(0, fetcherContext.getComparablePartitionValueList().size());

// test partition-name order
flinkConf.set(STREAMING_SOURCE_PARTITION_ORDER, "partition-name");
fetcherContext =
new MyHivePartitionFetcherContext(
tablePath,
hiveShim,
jobConfWrapper,
partitionKeys,
fieldTypes,
fieldNames,
flinkConf,
defaultPartName);
fetcherContext.open();
assertEquals(0, fetcherContext.getComparablePartitionValueList().size());
}

private static class MyHivePartitionFetcherContext
extends HivePartitionFetcherContextBase<Partition> {

private static final long serialVersionUID = 1L;

public MyHivePartitionFetcherContext(
ObjectPath tablePath,
HiveShim hiveShim,
JobConfWrapper confWrapper,
List<String> partitionKeys,
DataType[] fieldTypes,
String[] fieldNames,
Configuration configuration,
String defaultPartitionName) {
super(
tablePath,
hiveShim,
confWrapper,
partitionKeys,
fieldTypes,
fieldNames,
configuration,
defaultPartitionName);
}

@Override
public Optional<Partition> getPartition(List<String> partValues) throws Exception {
return Optional.empty();
}
}
}

0 comments on commit 53034ea

Please sign in to comment.