Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core:Refactor the code of HadoopTableOptions #10623

Open
wants to merge 78 commits into
base: main
Choose a base branch
from

Conversation

BsoBird
Copy link

@BsoBird BsoBird commented Jul 3, 2024

Refactor the code of HadoopTableOptions

1.Current problems

1.1.Enabling write.metadata.delete-after-commit.enabled may result in dirty commits.

Since this option is turned on, catalog will clean up the commit history.
If the client re-commits an older version at this point, the commit will succeed because the old-metadata has been deleted.
Let's assume that there are currently three versions of v7 v8 v9 in the metadata folder. Submitting v2 will work. This is because the file v2 does not exist at the moment. The end result is that after the commit is complete, there are four files v2 v7 v8 v9 in the metadata folder.
However, the user will not be able to view this commit.
From the user's point of view, he will encounter a very strange result: his submission is successful,
but the content of the submission disappears.
In addition, dirty commits generated in this case will not be cleaned up.

1.2.Problems with lockManager's locking policy.

1.2.1.Incorrect timing of locking and releasing

Current version of the code ignores the fact that the writeVersionHint process may also need to be locked.
In current implementations, users rely on versionHintFile to speed up version lookups.
If the information in the versionHintFile is incorrect, then it will lead to big trouble.
Example:If more than one client executes the commit method concurrently,
the version information in the versionHintFile may be written to a very old version.(versionHintFile’s version < currentVersion - write.metadata.previous-versions-max).
At this point, the user cannot call the refresh method to get the latest version.
Eventually, the user's commit will fail forever.

1.2.2.Incorrect behaviour of locking and releasing

Current version of the code is locked only for commits of a certain version, e.g., multiple clients are committing to version 3 at the same time, and the lock is in effect.
However, when multiple clients submit different versions at the same time, they are not locked to each other.
For object storage filesystems, such locking may cause the information in the versionHintFile to be incorrect.
For normal filesystems, such locking is meaningless, since the OS provides the same functionality when calling fs.rename. And it also causes the versionHintFile content to be messed up.

1.3.Incorrect behaviour of handing exception

The current implementation of the commit method is not atomic, assuming that the client calls renameMetadata successfully and then encounters an Unexpected Runtime exception,
which triggers spark to clean up the data files from this commit.
But spark doesn't clean up the metadata files generated by this commit, which ultimately leads to the problem described in here.

In addition to the three major problems mentioned above, there are actually a large number of minor problems with hadoopTableOptions that make it look like it's almost "full of holes".

So.given the large number of problems with hadoopTableOptions, we need to completely refactor hadoopTableOptions.

2.List of features to be supported

2.1.If a user uses both the global locking service and any type of file system, we need to support the following features:

  • Supports concurrency control
  • No dirty commits
  • Avoid generating versionHintFile files with misplaced content
  • Guarantees the atomicity of commit operations.

2.2.In the case where the user is using a normal file system (which supports rename operations that do not overwrite the target file) and is not using the global locking service, the following features need to be supported:

  • Supports concurrency control
  • Regardless of whether the contents of the versionHintFile are incorrect, the client can submit the correct metadata.
  • Support for cleaning up dirty commits
  • Guarantees the atomicity of commit operations.

Ultimately, users can achieve reliable management of metadata&catalogs while using only file systems.

3.Glossary:

  • Dirty-commit: commitVersion < currentVersion - write.metadata.previous-versions-max && commit-success=True && fs.exists(version-metadata.json)=True
  • Normal file system: which supports rename operations that do not overwrite the target file,Example: mv -n a.txt b.txt,HdfsFileSystem.rename(src,dst),WindowsFs.rename(src,dst)
  • Atomicity of commit: After the new version of the metadata has been renamed successfully, any exceptions will be ignored. Any exceptions thrown before the metadata has been renamed will be treated as a commit failure. If the rename of the new version of the metadata fails, we will throw a CommitStateUnknown exception and stop everything. The operation of VersionHintFile will be disregarded.
  • Global locking service: All clients operating on the iceberg table must acquire the same lock. Successful acquisitions are required before subsequent commits can be performed. Example: Distributed locking based on zk/redis/database, constrains all operations to the same in-memory lock.

4.Flow chart:

graph TB
    START[commit-start] -->TRY-LOCK{try-get-lock}
    TRY-LOCK --> |fail| COMMIT-FAIL[commit-fail]
    TRY-LOCK --> |success| CHECK-EXISTS{check-metadata-file-exists}
    CHECK-EXISTS -->|already-exists| COMMIT-FAIL
    CHECK-EXISTS -->|not-exists| GLOBAL-LOCK{useGlobalLockingService?}
    GLOBAL-LOCK -->|yes| FIND-LATEST-VERSION(findLatestVersion)
    GLOBAL-LOCK -->|no| FIND-LATEST-VERSION-NO-HINT(findLatestVersionWithOutVersionHint)
    FIND-LATEST-VERSION-NO-HINT --> CHECK-COMMIT-VERSION{checkCurrentVersionIsLatest}
    FIND-LATEST-VERSION --> CHECK-COMMIT-VERSION
    CHECK-COMMIT-VERSION --> |no| COMMIT-FAIL(commit-fail)
    CHECK-COMMIT-VERSION --> |yes| DO-COMMIT{do-commit}
    FAST-FAIL --> COMMIT-STATE-UNKNOWN(commit-state-unknow)
    DO-COMMIT --> |not-success| COMMIT-FAIL(commit-fail)
    DO-COMMIT --> |server-side-exception| TRY-CHECK-COMMIT(try-check)
    DO-COMMIT --> |commit-success| NEED-CLEAN-DIRTY{useGlobalLockingService?}
    DO-COMMIT --> |client-side-exception| COMMIT-STATE-UNKNOWN
    TRY-CHECK-COMMIT --> |check-success-and-commit-fail| COMMIT-FAIL
    TRY-CHECK-COMMIT --> |check-success-and-commit-success| COMMIT-SUCCESS
    TRY-CHECK-COMMIT --> |exception| COMMIT-STATE-UNKNOWN
    NEED-CLEAN-DIRTY --> |no| CHECK-DIRTY-COMMIT{this-commit-is-a-dirty-commit?}
    CHECK-DIRTY-COMMIT --> |no| CLEAN-DIRTY(clean-old-dirty-commit)
    CHECK-DIRTY-COMMIT --> |yes| FAST-FAIL(fast-fail)
    CLEAN-DIRTY  --> COMMIT-SUCCESS
    NEED-CLEAN-DIRTY --> |yes| COMMIT-SUCCESS
    COMMIT-SUCCESS --> UNLOCK(UNLOCK)
    COMMIT-STATE-UNKNOWN --> UNLOCK
    COMMIT-FAIL --> UNLOCK
    UNLOCK --> END(END)
Loading

BsoBird and others added 30 commits January 23, 2024 11:21
2. Detailed design of the logic related to the exception thrown .
3. Removed the default return value of some methods.
@github-actions github-actions bot added the core label Jul 3, 2024
@BsoBird
Copy link
Author

BsoBird commented Jul 4, 2024

@nastra @Fokko @RussellSpitzer @rdblue @pvary hello. If someone could help me review this PR, that would be great. Tks.

// global locking service. But we should support add some other conditions in future.
boolean supportGlobalLocking = useObjectStore;
try {
tryLock(tempMetadataFile, metadataRoot());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current version of the code is locked only for commits of a certain version, e.g., multiple clients are committing to version 3 at the same time, and the lock is in effect.
However, when multiple clients submit different versions at the same time, they are not locked to each other.
For object storage filesystems, such locking may cause the information in the versionHintFile to be incorrect.
For normal filesystems, such locking is meaningless, since the OS provides the same functionality when calling fs.rename. And it also causes the versionHintFile content to be messed up.

So, we need to lock the metadataRoot.

try {
tryLock(tempMetadataFile, metadataRoot());
versionCommitSuccess =
commitNewVersion(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing the commitNewVersion method can only produce four results:
1.return true.(rename success)
2.return false.(rename failed)
3.throw CommitFailedException
4.throw CommitStateUnknownException

throw e;
} catch (Exception e) {
this.shouldRefresh = versionCommitSuccess;
if (!versionCommitSuccess) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the commitNewVersion method succeeds, we will ignore any exceptions caught after that.

FileSystem fs,
Path finalMetadataFile)
throws IOException {
if (!supportGlobalLocking) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where distributed locks are not used, we need to do some checksumming and clean up some dirty data.

if ((currentMaxVersion - nextVersion) > previousVersionsMax && fs.exists(finalMetadataFile)) {
tryDelete(finalMetadataFile);
throw new CommitStateUnknownException(
new RejectedExecutionException(
Copy link
Author

@BsoBird BsoBird Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under some boundary conditions, this check may be misinterpreted, and we may misidentify a normal commit as a dirty commit.However, we throw the commitStateUnknownExcetion so that it does not affect the normal commits of other clients.

Situations where there is a miscarriage of justice: During the previous commits, the deleteRemovedMetadataFiles method was not executed due to some exceptions.

I think even if that happens, it's at least better than a dirty commit.

if (!fs.exists(metadataRoot())) {
// Either the table has just been created, or it has been corrupted, but either way, we have
// to start at version 0.
LOG.warn("Metadata for table not found in directory [{}]", metadataRoot());
return 0;
Copy link
Author

@BsoBird BsoBird Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old-code:

      } catch (IOException io) {
        LOG.warn("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
        return 0;
      }

This is a fatal error, we should not use 0 as the current latest version after an IO exception.

if (fs.exists(dst)) {
throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst);
}
int maxVersion = supportGlobalLocking ? findVersion() : findVersionWithOutVersionHint(fs);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under normal circumstances, if we use the global locking service, then we are not deleting the versionHintFile at this point. We can therefore avoid calling the findVersionWithOutVersionHint method to find the latest version.

In the case where we don't use the global locking service, we will ignore the contents of VersionHintFile since it is always unreliable.

int maxVersion = supportGlobalLocking ? findVersion() : findVersionWithOutVersionHint(fs);
if (!nextVersionIsLatest(nextVersion, maxVersion)) {
if (!supportGlobalLocking) {
io().deleteFile(versionHintFile().toString());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where the global locking service is not used, we always try to remove the wrong versionHintFile before throwing an exception.

TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT);
// todo:Currently, if the user is using an object store, we assume that he must be using the
// global locking service. But we should support add some other conditions in future.
boolean supportGlobalLocking = useObjectStore;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag actually serves to distinguish the difference in behaviour between block storage and object storage.In the original design, we should have been more careful in determining the type of the current filesystem, and used different execution strategies for different filesystems. But in practice, if we use the global locking service, the logic for committing a new version is the same regardless of the filesystem we use. So in the end, we'll just use this identifier to differentiate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant