Skip to content

Commit

Permalink
support RunTTLProcedure
Browse files Browse the repository at this point in the history
  • Loading branch information
stream2000 committed Jan 15, 2024
1 parent eb82362 commit 9ef8e0c
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public KeepByTimeStrategy(HoodieTable hoodieTable) {

@Override
public List<String> getExpiredPartitionPaths() {
if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0) {
if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0
|| !hoodieTable.getMetaClient().getTableConfig().getPartitionFields().isPresent()) {
return Collections.emptyList();
}
List<String> expiredPartitions = getExpiredPartitionsForTimeStrategy(getPartitionPathsForTTL());
Expand All @@ -70,7 +71,7 @@ public List<String> getExpiredPartitionPaths() {
protected List<String> getExpiredPartitionsForTimeStrategy(List<String> partitionsForTTLManagement) {
HoodieTimer timer = HoodieTimer.start();
Map<String, Option<String>> lastCommitTimeForPartitions = getLastCommitTimeForPartitions(partitionsForTTLManagement);
LOG.info("Collect last commit for partitions cost {} ms", timer.endTimer());
LOG.info("Collect last commit time for partitions cost {} ms", timer.endTimer());
return lastCommitTimeForPartitions.entrySet()
.stream()
.filter(entry -> entry.getValue().isPresent())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void scanTimeline() {
updateLastCommitTime(instant.getTimestamp(), instant.getAction(), metadataOpt.get());
}
});
// Scan the whole active timeline to get the last cached last modified time map
// Scan the archive timeline
HoodieArchivedTimeline.loadInstants(hoodieTable.getMetaClient(),
null,
HoodieArchivedTimeline.LoadMode.METADATA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object HoodieProcedures {
,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder)
,(HelpProcedure.NAME, HelpProcedure.builder)
,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder)
,(RunTTLProcedure.NAME, RunTTLProcedure.builder)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.config.HoodieTTLConfig
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

class RunTTLProcedure extends BaseProcedure with ProcedureBuilder with Logging {

private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "ttl_policy", DataTypes.StringType),
ProcedureParameter.optional(2, "retain_days", DataTypes.IntegerType),
ProcedureParameter.optional(3, "options", DataTypes.StringType)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("deleted_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
))

override def build: Procedure = new RunTTLProcedure

/**
* Returns the input parameters of this procedure.
*/
override def parameters: Array[ProcedureParameter] = PARAMETERS

/**
* Returns the type of rows produced by this procedure.
*/
override def outputType: StructType = OUTPUT_TYPE

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)

val tableName = getArgValueOrDefault(args, PARAMETERS(0))
var confs: Map[String, String] = Map.empty
if (getArgValueOrDefault(args, PARAMETERS(1)).isDefined) {
confs += HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE.key() -> getArgValueOrDefault(args, PARAMETERS(1)).get.toString
}
if (getArgValueOrDefault(args, PARAMETERS(2)).isDefined) {
confs += HoodieTTLConfig.DAYS_RETAIN.key() -> getArgValueOrDefault(args, PARAMETERS(2)).get.toString
}
if (getArgValueOrDefault(args, PARAMETERS(3)).isDefined) {
confs ++= HoodieCLIUtils.extractOptions(getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String])
}

val basePath = getBasePath(tableName, Option.empty)

var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs,
tableName.asInstanceOf[Option[String]])
val ttlInstantTime = client.createNewInstantTime()
val hoodieTTLMeta = client.managePartitionTTL(ttlInstantTime)
if (hoodieTTLMeta == null) {
Seq.empty
} else {
hoodieTTLMeta.getPartitionToReplaceFileIds.keySet().map { p =>
Row(p)
}.toSeq
}
} finally {
if (client != null) {
client.close()
}
}
}
}

object RunTTLProcedure {
val NAME = "run_ttl"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RunTTLProcedure
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.hudi.procedure

import org.apache.hudi.SparkDatasetMixin
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.HoodieTestDataGenerator.{TRIP_EXAMPLE_SCHEMA, getCommitTimeAtUTC}
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.api.java.JavaSparkContext

import java.util.Properties
import scala.collection.JavaConverters._

class TestTTLProcedure extends HoodieSparkProcedureTestBase with SparkDatasetMixin {

test("Test Call run_ttl Procedure by Table") {
withSQLConf("hoodie.partition.ttl.automatic" -> "false") {
withTempDir { tmp => {
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
initTable(basePath)

val writeConfig = getConfigBuilder(basePath, tableName, true).build()
val client = getHoodieWriteClient(writeConfig)
val dataGen = new HoodieTestDataGenerator(0xDEED)
val partitionPaths = dataGen.getPartitionPaths()
val partitionPath0 = partitionPaths(0)
val instant0 = getCommitTimeAtUTC(0)

writeRecordsForPartition(client, dataGen, partitionPath0, instant0)

val instant1 = getCommitTimeAtUTC(1000)
val partitionPath1 = partitionPaths(1)
writeRecordsForPartition(client, dataGen, partitionPath1, instant1)

val currentInstant = client.createNewInstantTime()
val partitionPath2 = partitionPaths(2)
writeRecordsForPartition(client, dataGen, partitionPath2, currentInstant)
spark.sql(
s"""
| create table $tableName using hudi
| location '$basePath'
| tblproperties (
| primaryKey = '_row_key',
| preCombineField = '_row_key',
| type = 'cow'
| )
|""".stripMargin)

checkAnswer(s"call run_ttl(table => '$tableName', retain_days => 1)")(
Seq(partitionPath0),
Seq(partitionPath1)
)
}
}
}
}

private def writeRecordsForPartition(client: SparkRDDWriteClient[Nothing],
dataGen: HoodieTestDataGenerator,
partition: String, instantTime: String): Unit = {
val records: java.util.List[HoodieRecord[Nothing]] =
dataGen.generateInsertsForPartition(instantTime, 10, partition)
.asInstanceOf[java.util.List[HoodieRecord[Nothing]]]
// Use this JavaRDD to call the insert method
client.startCommitWithTime(instantTime, HoodieTimeline.COMMIT_ACTION)
client.insert(spark.sparkContext.parallelize(records.asScala).toJavaRDD(), instantTime)
}

private def getHoodieWriteClient(cfg: HoodieWriteConfig): SparkRDDWriteClient[Nothing] = {
val writeClient = new SparkRDDWriteClient[Nothing](
new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), cfg
)
writeClient
}

private def initTable(basePath: String): Unit = {
val props = new Properties()
props.put("hoodie.datasource.write.partitionpath.field", "partition_path")
props.put("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator")
props.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path")
props.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key")
HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, props);
}

protected def getConfigBuilder(basePath: String, tableName: String, autoCommit: Boolean): HoodieWriteConfig.Builder =
HoodieWriteConfig
.newBuilder
.withPath(basePath)
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withAutoCommit(autoCommit)
.withPreCombineField("_row_key")
.forTable(tableName)

}

0 comments on commit 9ef8e0c

Please sign in to comment.