-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add manifest file for MSQ export #15953
Conversation
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
To see if the created export file is in the symlink format, I generated manifest files using Apache Spark with Delta Lake. The generated file is in a similar format for both local disk and S3, with the only difference being that since DeltaLake uses s3a while writing, the paths in the manifest file also have the same absolute paths. Local:
S3:
|
@@ -99,6 +99,17 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte | |||
This variation of EXTERN requires one argument, the details of the destination as specified below. | |||
This variation additionally requires an `AS` clause to specify the format of the exported rows. | |||
|
|||
While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`. | |||
- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the symlink manifest format? I wasn't able to find a definitive answer while searching "symlink manifest format", therefore some clarification would be helpful.
Also, is it for Druid's internal use, or can other systems and operators make use of the manifest file created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format itself does not seem to be well documented. It's not for Druid's use, other data stores have the capability to read the format, like delta.compatibility.symlinkFormatManifest.enabled Athena, Trino etc support reading them.
Given this, I think that we can skip documenting the format itself, but mention that it follows the symlink manifest format, wdyt?
docs/multi-stage-query/reference.md
Outdated
... | ||
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv | ||
``` | ||
- `_symlink_format_manifest/druid_export_meta`: Used to store additional information about the export metadata, such as the version of the manifest file format. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this version for internal use, or does it have relevance outside of Druid as well? Also, can you please add the format of this metadata file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this part since it is not intended to be user facing.
* <br> | ||
* Currently, this only contains the manifest file version. | ||
*/ | ||
private void createDruidMetadataFile(StorageConnector storageConnector) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we are writing the results in an ad-hoc format. I think it makes sense to use one of the standard formats like JSON, YAML, etc if this is a user-facing file. Else, we should remove it from the documentation as well, since it is an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this documentation
log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); | ||
|
||
if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { | ||
throw DruidException.defensive("Found existing manifest file already present at path."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it a defensive check? A user can create a manifest file manually, and the job will fail. Then it isn't a defensive check. We should use something relevant to either the users or the operator here. I think it makes sense that we don't expect to encounter it, given that the files would be namespace with task id, however, it still shouldn't be a defensive check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
public void writeMetadata(List<String> exportedFiles) throws IOException | ||
{ | ||
final StorageConnector storageConnector = exportStorageProvider.get(); | ||
log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The sentence should make sense when reading without the interpolation
log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); | |
log.info("Writing manifest file at location[%s]", exportStorageProvider.getBasePath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
} | ||
|
||
createManifestFile(storageConnector, exportedFiles); | ||
createDruidMetadataFile(storageConnector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the previous call succeeds and this one fails? Would we end up in a partial state where the manifest is created but the metadata isn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would create manifest file, but not the metadata one and the query should fail. I think this should be fine, since the metadata file itself is for druid to track the version which had previously created the import only
@@ -88,7 +93,7 @@ public ExportStorageProvider getExportStorageProvider() | |||
} | |||
|
|||
@Override | |||
public ProcessorsAndChannels<Object, Long> makeProcessors( | |||
public ProcessorsAndChannels<Object, Object> makeProcessors( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not
public ProcessorsAndChannels<Object, Object> makeProcessors( | |
public ProcessorsAndChannels<Object, List<String>> makeProcessors( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the type here would result in the arguments in thecall to mergeResults also having List, which would cause a cast exception to be thrown instead of a nicer one, which we would like to avoid, right?
.withAccumulation(new ArrayList<String>(), (acc, file) -> { | ||
((ArrayList<String>) acc).add((String) file); | ||
return acc; | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems confusing, do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the withAccumulation call so that the types match, but I have changed it to be a noop instead, more in line with other places.
Line 132 in 9c7d7fc
.withAccumulation(new HashSet<>(), (acc, segment) -> acc), |
.../src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
Show resolved
Hide resolved
@Override | ||
public Object mergeAccumulatedResult(Object accumulated, Object otherAccumulated) | ||
{ | ||
// Maintain upgrade compatibility, if a worker does not return a list, ignore it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we end up in a state where some files are present and some are not? What is to say that there are files absent? I think if the manifest file is for cosmetic purposes only, we can get away with partial results, but it should be explicitly called out in the documentation.
Since it's in a widely used format, the expectation seems to be that the users would be able to ingest the results in other systems, like:
Druid -> Export destination -> External system
This will lead to incorrect results, without the users knowing. I think we should fail the job here, and if not, then this seems like the usage of the manifest file is limited by its correctness and should be called out explicitly. We can also have a context parameter (which I am not a big fan of), that can switch between error-throwing and ignoring behaviour, for those who want correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to throw an exception
.../src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
Fixed
Show fixed
Hide fixed
Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); | ||
if (!(resultObjectForStage instanceof List)) { | ||
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. | ||
log.warn("Was unable to create manifest file due to "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adarshsanjeev looks like a log message is incomplete here- what should this say?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this message, added a PR to correct it #16363
This PR adds the capability for MSQ export to create a manifest file at the destination.
Motivation
Currently, export creates the files at the provided destination. The addition of the manifest file will provide a list of files created as part of the manifest. This will allow easier consumption of the data exported from Druid, especially for automated data pipelines. There is still a safety check that requires the destination to be empty, but this would be especially helpful if that condition is relaxed in the future. Druid currently does not support reading from a manifest file.
Structure
The manifest file created is in the
symlink manifest format
. The file is created at thepath
<export destination>/_symlink_format_manifest/manifest
. Normally, this would be<export destination>/_symlink_format_manifest/<partition path>/manifest
, but since Druid does not support partitioning, the manifest is always created in the _symlink_format_manifest folder itself. Each line of the file contains an absolute path to a file created by the export.The path is prefixed by
file:
if the destination is on a local disk.Additionally, a file
_symlink_format_manifest/druid_export_meta
is created. The file contains additional information about the export. Currently, this only contains the manifest file version, to track which version of the manifest file was created by the export.Example
Local storage:
S3 export file:
File created at s3://export-bucket/export/_symlink_format_manifest/manifest
druid_export_meta:
Export is still an experimental feature, and the structure of the file could be changed in the future.
Upgrade issues
Release notes
This PR has: