diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java index 2fbda38a32c77..c94c3970a5d47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java @@ -56,7 +56,8 @@ public KeepByTimeStrategy(HoodieTable hoodieTable) { @Override public List getExpiredPartitionPaths() { - if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0) { + if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0 + || !hoodieTable.getMetaClient().getTableConfig().getPartitionFields().isPresent()) { return Collections.emptyList(); } List expiredPartitions = getExpiredPartitionsForTimeStrategy(getPartitionPathsForTTL()); @@ -70,7 +71,7 @@ public List getExpiredPartitionPaths() { protected List getExpiredPartitionsForTimeStrategy(List partitionsForTTLManagement) { HoodieTimer timer = HoodieTimer.start(); Map> lastCommitTimeForPartitions = getLastCommitTimeForPartitions(partitionsForTTLManagement); - LOG.info("Collect last commit for partitions cost {} ms", timer.endTimer()); + LOG.info("Collect last commit time for partitions cost {} ms", timer.endTimer()); return lastCommitTimeForPartitions.entrySet() .stream() .filter(entry -> entry.getValue().isPresent()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TimelineBasedKeepByTimeStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TimelineBasedKeepByTimeStrategy.java index 345f2b17cbdc0..4e1cd87436150 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TimelineBasedKeepByTimeStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TimelineBasedKeepByTimeStrategy.java @@ -85,7 +85,7 @@ private void scanTimeline() { updateLastCommitTime(instant.getTimestamp(), instant.getAction(), metadataOpt.get()); } }); - // Scan the whole active timeline to get the last cached last modified time map + // Scan the archive timeline HoodieArchivedTimeline.loadInstants(hoodieTable.getMetaClient(), null, HoodieArchivedTimeline.LoadMode.METADATA, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 9bb6fb6db8dbf..e12aad789d780 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -91,6 +91,7 @@ object HoodieProcedures { ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder) ,(HelpProcedure.NAME, HelpProcedure.builder) ,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder) + ,(RunTTLProcedure.NAME, RunTTLProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala new file mode 100644 index 0000000000000..f87ae5b33a3c6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.config.HoodieTTLConfig +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + +class RunTTLProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "ttl_policy", DataTypes.StringType), + ProcedureParameter.optional(2, "retain_days", DataTypes.IntegerType), + ProcedureParameter.optional(3, "options", DataTypes.StringType) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("deleted_partitions", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def build: Procedure = new RunTTLProcedure + + /** + * Returns the input parameters of this procedure. + */ + override def parameters: Array[ProcedureParameter] = PARAMETERS + + /** + * Returns the type of rows produced by this procedure. + */ + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + var confs: Map[String, String] = Map.empty + if (getArgValueOrDefault(args, PARAMETERS(1)).isDefined) { + confs += HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE.key() -> getArgValueOrDefault(args, PARAMETERS(1)).get.toString + } + if (getArgValueOrDefault(args, PARAMETERS(2)).isDefined) { + confs += HoodieTTLConfig.DAYS_RETAIN.key() -> getArgValueOrDefault(args, PARAMETERS(2)).get.toString + } + if (getArgValueOrDefault(args, PARAMETERS(3)).isDefined) { + confs ++= HoodieCLIUtils.extractOptions(getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]) + } + + val basePath = getBasePath(tableName, Option.empty) + + var client: SparkRDDWriteClient[_] = null + try { + client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs, + tableName.asInstanceOf[Option[String]]) + val ttlInstantTime = client.createNewInstantTime() + val hoodieTTLMeta = client.managePartitionTTL(ttlInstantTime) + if (hoodieTTLMeta == null) { + Seq.empty + } else { + hoodieTTLMeta.getPartitionToReplaceFileIds.keySet().map { p => + Row(p) + }.toSeq + } + } finally { + if (client != null) { + client.close() + } + } + } +} + +object RunTTLProcedure { + val NAME = "run_ttl" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RunTTLProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala new file mode 100644 index 0000000000000..002375ac462ec --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.hudi.SparkDatasetMixin +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.testutils.HoodieTestDataGenerator.{TRIP_EXAMPLE_SCHEMA, getCommitTimeAtUTC} +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext + +import java.util.Properties +import scala.collection.JavaConverters._ + +class TestTTLProcedure extends HoodieSparkProcedureTestBase with SparkDatasetMixin { + + test("Test Call run_ttl Procedure by Table") { + withSQLConf("hoodie.partition.ttl.automatic" -> "false") { + withTempDir { tmp => { + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + initTable(basePath) + + val writeConfig = getConfigBuilder(basePath, tableName, true).build() + val client = getHoodieWriteClient(writeConfig) + val dataGen = new HoodieTestDataGenerator(0xDEED) + val partitionPaths = dataGen.getPartitionPaths() + val partitionPath0 = partitionPaths(0) + val instant0 = getCommitTimeAtUTC(0) + + writeRecordsForPartition(client, dataGen, partitionPath0, instant0) + + val instant1 = getCommitTimeAtUTC(1000) + val partitionPath1 = partitionPaths(1) + writeRecordsForPartition(client, dataGen, partitionPath1, instant1) + + val currentInstant = client.createNewInstantTime() + val partitionPath2 = partitionPaths(2) + writeRecordsForPartition(client, dataGen, partitionPath2, currentInstant) + spark.sql( + s""" + | create table $tableName using hudi + | location '$basePath' + | tblproperties ( + | primaryKey = '_row_key', + | preCombineField = '_row_key', + | type = 'cow' + | ) + |""".stripMargin) + + checkAnswer(s"call run_ttl(table => '$tableName', retain_days => 1)")( + Seq(partitionPath0), + Seq(partitionPath1) + ) + } + } + } + } + + private def writeRecordsForPartition(client: SparkRDDWriteClient[Nothing], + dataGen: HoodieTestDataGenerator, + partition: String, instantTime: String): Unit = { + val records: java.util.List[HoodieRecord[Nothing]] = + dataGen.generateInsertsForPartition(instantTime, 10, partition) + .asInstanceOf[java.util.List[HoodieRecord[Nothing]]] + // Use this JavaRDD to call the insert method + client.startCommitWithTime(instantTime, HoodieTimeline.COMMIT_ACTION) + client.insert(spark.sparkContext.parallelize(records.asScala).toJavaRDD(), instantTime) + } + + private def getHoodieWriteClient(cfg: HoodieWriteConfig): SparkRDDWriteClient[Nothing] = { + val writeClient = new SparkRDDWriteClient[Nothing]( + new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), cfg + ) + writeClient + } + + private def initTable(basePath: String): Unit = { + val props = new Properties() + props.put("hoodie.datasource.write.partitionpath.field", "partition_path") + props.put("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator") + props.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path") + props.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key") + HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, props); + } + + protected def getConfigBuilder(basePath: String, tableName: String, autoCommit: Boolean): HoodieWriteConfig.Builder = + HoodieWriteConfig + .newBuilder + .withPath(basePath) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withAutoCommit(autoCommit) + .withPreCombineField("_row_key") + .forTable(tableName) + +}