Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5823][RFC-65] RFC for Partition TTL Management #8062

Merged
merged 9 commits into from
Dec 8, 2023
Next Next commit
[HUDI-5823] RFC for Partition TTL Management
  • Loading branch information
StreamingFlames authored and stream2000 committed Nov 30, 2023
commit 2d1ce8271a9afc8d1d833b0b675f17e91e434ca9
110 changes: 110 additions & 0 deletions rfc/rfc-65/rfc-65.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
## Proposers
- @stream2000
- @hujincalrin
- @huberylee
- @YuweiXiao
## Approvers
## Status
JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823)
## Abstract
In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the dataset from growing infinitely.
This proposal introduces Partition TTL Management policies to hudi, people can config the policies by table config directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them.
## Background
TTL management mechanism is an important feature for databases. Hudi already provides a delete_partition interface to delete outdated partitions. However, users still need to detect which partitions are outdated and call `delete_partition` manually, which means that users need to define and implement some kind of TTL policies and maintain proper statistics to find expired partitions by themself. As the scale of installations grew, it's more important to implement a user-friendly TTL management mechanism for hudi.
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
## Implementation
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
There are 3 components to implement Partition TTL Management

- TTL policy definition & storage
- Partition statistics for TTL management
- Appling policies
### TTL Policy Definition
We have three main considerations when designing TTL policy:

1. User hopes to manage partition TTL not only by expired time but also by sub-partitions count and sub-partitions size. So we need to support the following three different TTL policy types.
1. **KEEP_BY_TIME**. Partitions will expire N days after their last modified time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is last mod time. is it referring to new inserts, or updates as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, inserts/updates will be treated as modification to the partition. And we track them by looking the commit/deltacommit write stats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to fetch the info when a commit is archived?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest version of the RFC, we use the max instant time of the committed file slices in the partition as the partition's last modified time for simplicity. Otherwise, we need some extra mechanism to get the last modified time. In our inner version, we maintain an extra JSON file and update it incrementally as new instants committed to get the real modified time for the partition. Also, we can use metadata table to track the last modify time. What do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When retiring the old/unused/not-accessed partitions, another approach we are taking internally is:
(a) stash the partitions to be cleaned up in .stashedForDeletion folder (at .hoodie level).
(b) partitions stashed for deletion will wait in the folder for a week (or time dictated by the policy) before actually getting deleted. In cases, where we realize that something has been accidentally deleted (like a bad policy configuration, TTL exclusion not configured etc), we can always move back from the stash to quickly recover from the TTL event.
(c) We shall configure policies for .stashedForDeletion// subfolders to manage for appropriate tiering level (whether to be moved to a warm/cold tier etc)
(d) in addition to the deletePartitions() API, which would stash the folder (instead of deleting) based on the configs, we would need a restore API to move the subfolder/files back to their original location.
(e) Metadata left by the delete operation to be synced with MDT to keep the file listing metadata in sync with the file system. (In cases where replication to a different region is supported, this also would warrant applying the changes on the replicated copies of data).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add the stash/restore mechanism to replace commit/clean process of hudi instead of dealing with it in TTL management? TTL management should only decide which partitions are outdated and call delete_partition to delete them. If we want to retain the deleted data we can add extra mechanism in the delete_parrtition method.

2. **KEEP_BY_COUNT**. Keep N sub-partitions for a high-level partition. When sub partition count exceeds, delete the partitions with smaller partition values until the sub-partition count meets the policy configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help us understand the use-case here. I mean, I am trying to get an understanding of the sub-partitions here. in hudi, we have only one partitioning, but if could be multi-leveled. so, trying to see, if we can keep it high level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel, both (2) and (3) is very much catered towards multi-field partitioning like an ProductId/datstr based partitioning. can we layout high level strategies for one level partitioning as well in addition to multi-field partitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to simplify the strategies where in we can achieve it for both single or multi field partitioning. for eg,
TTL any partition whose last mod time (last time when data was added/updated), is > 1 month for eg. this will work for both a single field partitioning (datestr), or multi-field (productId/datestr).
Open to ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also call out that the sub-partitioning might work only for day based or time based sub-partitioning right. for eg, lets say, if partitioning is datestr/productId. how do we know out of 1000 productIds under a given day, which 100 is older or newer (assuming all 1000 was created in same commit).

3. **KEEP_BY_SIZE**. Similar to KEEP_BY_COUNT, but to ensure that the sum of the data size of all sub-partitions does not exceed the policy configuration.
2. User need to set different policies for different partitions. For example, the hudi table is partitioned by two fields (user_id, ts). For partition(user_id='1'), we set the policy to keep 100G data for all sub-partitions, and for partition(user_id='2') we set the policy that all sub-partitions will expire 10 days after their last modified time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to add regex and achieve this.
for eg,
Map<{PartitionRegex/Static Partitions{ -> {TTL policy} >
so, this map can have multiple entries as well.

3. It's possible that there are a lot of high-level partitions in the user's table, and they don't want to set TTL policies for all the high-level partitions. So we need to provide a default policy mechanism so that users can set a default policy for all high-level partitions and add some explicit policies for some of them if needed. Explicit policies will override the default policy.

So here we have the TTL policy definition:
```java
public class HoodiePartitionTTLPolicy {
Copy link
Member

@SteNicholas SteNicholas May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stream2000, could we introduce HoodieTTLPolicy interface? Then HoodiePartitionTTLPolicy implements the HoodieTTLPolicy. HoodieRecordTTLPolicy could also implement this interface in feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SteNicholas Nice suggestions! I will take it into consideration when implementing the policy and make sure we can integrate more type of TTL policy in the future

public enum TTLPolicy {
KEEP_BY_TIME, KEEP_BY_SIZE, KEEP_BY_COUNT
}

// Partition spec for which the policy takes effect
private String partitionSpec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the partitionSpec support multiple level partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it supports regex expressions for partitions and static partition value

Copy link
Member

@SteNicholas SteNicholas May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stream2000, for example, there is a Hudi table partitioned by date and hour. Meanwhile, the user want to configure ttl with a year. How could user configure this ttl with current policy definition? Sets the policyValue with a year and partitionSpec with */*?


private TTLPolicy policy;

private long policyValue;
}
```

### User Interface for TTL policy
Users can config partition TTL management policies through SparkSQL Call Command and through table config directly. Assume that the user has a hudi table partitioned by two fields(user_id, ts), he can config partition TTL policies as follows.

```sql
-- Set default policy for all user_id, which keeps the data for 30 days.
call add_ttl_policy(table => 'test', partitionSpec => 'user_id=*/', policy => 'KEEP_BY_TIME', policyValue => '30');
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
stream2000 marked this conversation as resolved.
Show resolved Hide resolved

--For partition user_id=1/, keep 10 sub partitions.
call add_ttl_policy(table => 'test', partitionSpec => 'user_id=1/', policy => 'KEEP_BY_COUNT', policyValue => '10');

--For partition user_id=2/, keep 100GB data in total
call add_ttl_policy(table => 'test', partitionSpec => 'user_id=2/', policy => 'KEEP_BY_SIZE', policyValue => '107374182400');

--For partition user_id=3/, keep the data for 7 day.
call add_ttl_policy(table => 'test', partitionSpec => 'user_id=3/', policy => 'KEEP_BY_TIME', policyValue => '7');
stream2000 marked this conversation as resolved.
Show resolved Hide resolved

-- Show all the TTL policies including default and explicit policies
call show_ttl_policies(table => 'test');
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
user_id=*/ KEEP_BY_TIME 30
user_id=1/ KEEP_BY_COUNT 10
user_id=2/ KEEP_BY_SIZE 107374182400
user_id=3/ KEEP_BY_TIME 7
```

### Storage for TTL policy
The partition TTL policies will be stored in `hoodie.properties`since it is part of table metadata. The policy configs in `hoodie.properties`are defined as follows. Explicit policies are defined using a JSON array while default policy is defined using separate configs.
stream2000 marked this conversation as resolved.
Show resolved Hide resolved

```properties
# Default TTL policy definition
hoodie.partition.ttl.management.default.policy=KEEP_BY_TIME
hoodie.partition.ttl.management.default.fields=user_id
hoodie.partition.ttl.management.default.policy.value=30

# Explicit TTL policy definition
hoodie.partition.ttl.policies=[{"partitionSpec"\:"user_id\=2/","policy"\:"KEEP_BY_SIZE","policyValue"\:107374182400},{"partitionSpec"\:"user_id\=1/","policy"\:"KEEP_BY_COUNT","policyValue"\:10},{"partitionSpec"\:"user_id\=3/","policy"\:"KEEP_BY_TIME","policyValue"\:7}]
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
```

### Partition Statistics
#### Partition Statistics Entity
We need need to maintain a partition stat for every partition in the hudi table. The stat will contain three fields:

- RelativePath. Relative path to the base path, or we can call it PartitionPath.
- LastModifiedTime. Last instant time that modified the partition. We need this information to support the `KEEP_BY_TIME` policy.
- PartitionTotalFileSize. The sum of all valid data file sizes in the partition, to support the `KEEP_BY_SIZE`policy. We calculate only the latest file slices in the partition currently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you define valid data file? Is it just the latest version of each file id i.e. latest file slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we calculate only the latest file slice. If we want to calculate all the file slices instead of just the latest file slice, we can add a config to control the behavior or adding another stat field. Here we choose to calculate only the latest file slice because we think it reveals the real data size of the file group.

#### Gathering Partition Statistics
The simplest way to get partition statistics is that reading information from the metadata table. However, the write and compaction of the metadata table will severely affect the throughput and latency of data ingestion. So we design an asynchronous partition statistics as follows.

- At the first time we gather the partitions statistics, we list all partitions in the hudi table and calculate `PartitionTotalFileSize`and `LastModifiedTime`for each partition. Then we store all the partition stats in persistent storage along with the instant time we do the stats. For example, we can directly store all partition stats in a JSON file whose file name contains the instant time.
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
- After initializing the partition statistics, we can list only affected partitions by reading timeline metadata and store the new partition statistics back to the storage with new instant time.
- Note that deleted partition will be deleted from partition statistics too. If a partition was deleted before and have no new data, we will remove it from partition statistics so it won't calculated as expired partition again.
### Appling Policies
stream2000 marked this conversation as resolved.
Show resolved Hide resolved
Once we have defined the TTL policies and have proper partition statistics, it's easy to apply the policies and delete expired partitions.

1. Gather partitions statistics.
2. Apply each TTL policy. For explicit policies, find sub-partitions matched `PartitionSpec`defined in the policy and check if there are expired partitions according to the policy type and size. For default policy, find partitions that do not match any explicit policy and check if they are expired.
3. For all expired partitions, call delete_partition to delete them, which will generate a replace commit and mark all files in those partitions as replaced. For pending clustering and compaction that affect the target partition to delete, [HUDI-5553](https://issues.apache.org/jira/browse/HUDI-5553) introduces a pre-check that will throw an exception, and further improvement could be discussed in [HUDI-5663](https://issues.apache.org/jira/browse/HUDI-5663).
4. Clean then will delete all replaced file groups.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if you can also discuss how the read path will work? How do we maintain a consistent filesystem view for readers given that delete partition operation can take time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete_partition has been already implemented in current hudi master branch. When we call delete_partition to delete a list partitions, the executor will list all files for the partitions to delete and store them in the replacecommit commit metadata. After the replace commit committed, all the filesystem views that have seen the replace commit will exclude files that were replaced in the replace commit.

## Rollout/Adoption Plan
This will be updated once we decide on one of the approaches proposed above.
## Test Plan
This will be updated once we decide on one of the approaches proposed above.