-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core:Refactor the code of HadoopTableOptions #10623
base: main
Are you sure you want to change the base?
Conversation
…ile & HADOOP CATALOG) (apache#9327)
…ile & HADOOP CATALOG) (apache#9327)
…ile & HADOOP CATALOG) (apache#9327)
2. Detailed design of the logic related to the exception thrown . 3. Removed the default return value of some methods.
@nastra @Fokko @RussellSpitzer @rdblue @pvary hello. If someone could help me review this PR, that would be great. Tks. |
// global locking service. But we should support add some other conditions in future. | ||
boolean supportGlobalLocking = useObjectStore; | ||
try { | ||
tryLock(tempMetadataFile, metadataRoot()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current version of the code is locked only for commits of a certain version, e.g., multiple clients are committing to version 3 at the same time, and the lock is in effect.
However, when multiple clients submit different versions at the same time, they are not locked to each other.
For object storage filesystems, such locking may cause the information in the versionHintFile to be incorrect.
For normal filesystems, such locking is meaningless, since the OS provides the same functionality when calling fs.rename. And it also causes the versionHintFile content to be messed up.
So, we need to lock the metadataRoot.
try { | ||
tryLock(tempMetadataFile, metadataRoot()); | ||
versionCommitSuccess = | ||
commitNewVersion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Executing the commitNewVersion method can only produce four results:
1.return true.(rename success)
2.return false.(rename failed)
3.throw CommitFailedException
4.throw CommitStateUnknownException
throw e; | ||
} catch (Exception e) { | ||
this.shouldRefresh = versionCommitSuccess; | ||
if (!versionCommitSuccess) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the commitNewVersion method succeeds, we will ignore any exceptions caught after that.
FileSystem fs, | ||
Path finalMetadataFile) | ||
throws IOException { | ||
if (!supportGlobalLocking) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cases where distributed locks are not used, we need to do some checksumming and clean up some dirty data.
if ((currentMaxVersion - nextVersion) > previousVersionsMax && fs.exists(finalMetadataFile)) { | ||
tryDelete(finalMetadataFile); | ||
throw new CommitStateUnknownException( | ||
new RejectedExecutionException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under some boundary conditions, this check may be misinterpreted, and we may misidentify a normal commit as a dirty commit.However, we throw the commitStateUnknownExcetion so that it does not affect the normal commits of other clients.
Situations where there is a miscarriage of justice: During the previous commits, the deleteRemovedMetadataFiles method was not executed due to some exceptions.
I think even if that happens, it's at least better than a dirty commit.
if (!fs.exists(metadataRoot())) { | ||
// Either the table has just been created, or it has been corrupted, but either way, we have | ||
// to start at version 0. | ||
LOG.warn("Metadata for table not found in directory [{}]", metadataRoot()); | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old-code:
} catch (IOException io) {
LOG.warn("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
return 0;
}
This is a fatal error, we should not use 0 as the current latest version after an IO exception.
if (fs.exists(dst)) { | ||
throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); | ||
} | ||
int maxVersion = supportGlobalLocking ? findVersion() : findVersionWithOutVersionHint(fs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under normal circumstances, if we use the global locking service, then we are not deleting the versionHintFile at this point. We can therefore avoid calling the findVersionWithOutVersionHint method to find the latest version.
In the case where we don't use the global locking service, we will ignore the contents of VersionHintFile since it is always unreliable.
int maxVersion = supportGlobalLocking ? findVersion() : findVersionWithOutVersionHint(fs); | ||
if (!nextVersionIsLatest(nextVersion, maxVersion)) { | ||
if (!supportGlobalLocking) { | ||
io().deleteFile(versionHintFile().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cases where the global locking service is not used, we always try to remove the wrong versionHintFile before throwing an exception.
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT); | ||
// todo:Currently, if the user is using an object store, we assume that he must be using the | ||
// global locking service. But we should support add some other conditions in future. | ||
boolean supportGlobalLocking = useObjectStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag actually serves to distinguish the difference in behaviour between block storage and object storage.In the original design, we should have been more careful in determining the type of the current filesystem, and used different execution strategies for different filesystems. But in practice, if we use the global locking service, the logic for committing a new version is the same regardless of the filesystem we use. So in the end, we'll just use this identifier to differentiate.
Refactor the code of HadoopTableOptions
1.Current problems
1.1.Enabling
write.metadata.delete-after-commit.enabled
may result in dirty commits.Since this option is turned on, catalog will clean up the commit history.
If the client re-commits an older version at this point, the commit will succeed because the old-metadata has been deleted.
Let's assume that there are currently three versions of v7 v8 v9 in the metadata folder. Submitting v2 will work. This is because the file v2 does not exist at the moment. The end result is that after the commit is complete, there are four files v2 v7 v8 v9 in the metadata folder.
However, the user will not be able to view this commit.
From the user's point of view, he will encounter a very strange result: his submission is successful,
but the content of the submission disappears.
In addition, dirty commits generated in this case will not be cleaned up.
1.2.Problems with lockManager's locking policy.
1.2.1.Incorrect timing of locking and releasing
Current version of the code ignores the fact that the writeVersionHint process may also need to be locked.
In current implementations, users rely on versionHintFile to speed up version lookups.
If the information in the versionHintFile is incorrect, then it will lead to big trouble.
Example:If more than one client executes the commit method concurrently,
the version information in the versionHintFile may be written to a very old version.(versionHintFile’s version < currentVersion - write.metadata.previous-versions-max).
At this point, the user cannot call the
refresh
method to get the latest version.Eventually, the user's commit will fail forever.
1.2.2.Incorrect behaviour of locking and releasing
Current version of the code is locked only for commits of a certain version, e.g., multiple clients are committing to version 3 at the same time, and the lock is in effect.
However, when multiple clients submit different versions at the same time, they are not locked to each other.
For object storage filesystems, such locking may cause the information in the versionHintFile to be incorrect.
For normal filesystems, such locking is meaningless, since the OS provides the same functionality when calling fs.rename. And it also causes the versionHintFile content to be messed up.
1.3.Incorrect behaviour of handing exception
The current implementation of the commit method is not atomic, assuming that the client calls renameMetadata successfully and then encounters an Unexpected Runtime exception,
which triggers spark to clean up the data files from this commit.
But spark doesn't clean up the metadata files generated by this commit, which ultimately leads to the problem described in here.
In addition to the three major problems mentioned above, there are actually a large number of minor problems with hadoopTableOptions that make it look like it's almost "full of holes".
So.given the large number of problems with hadoopTableOptions, we need to completely refactor hadoopTableOptions.
2.List of features to be supported
2.1.If a user uses both the global locking service and any type of file system, we need to support the following features:
2.2.In the case where the user is using a normal file system (which supports rename operations that do not overwrite the target file) and is not using the global locking service, the following features need to be supported:
Ultimately, users can achieve reliable management of metadata&catalogs while using only file systems.
3.Glossary:
commitVersion < currentVersion - write.metadata.previous-versions-max
&&commit-success=True
&&fs.exists(version-metadata.json)=True
mv -n a.txt b.txt
,HdfsFileSystem.rename(src,dst)
,WindowsFs.rename(src,dst)
CommitStateUnknown
exception and stop everything. The operation of VersionHintFile will be disregarded.Distributed locking based on zk/redis/database
,constrains all operations to the same in-memory lock
.4.Flow chart: