Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add manifest file for MSQ export #15953

Merged
merged 14 commits into from
Apr 15, 2024
Merged

Conversation

adarshsanjeev
Copy link
Contributor

@adarshsanjeev adarshsanjeev commented Feb 23, 2024

This PR adds the capability for MSQ export to create a manifest file at the destination.

Motivation

Currently, export creates the files at the provided destination. The addition of the manifest file will provide a list of files created as part of the manifest. This will allow easier consumption of the data exported from Druid, especially for automated data pipelines. There is still a safety check that requires the destination to be empty, but this would be especially helpful if that condition is relaxed in the future. Druid currently does not support reading from a manifest file.

Structure

The manifest file created is in the symlink manifest format. The file is created at the
path <export destination>/_symlink_format_manifest/manifest. Normally, this would be <export destination>/_symlink_format_manifest/<partition path>/manifest, but since Druid does not support partitioning, the manifest is always created in the _symlink_format_manifest folder itself. Each line of the file contains an absolute path to a file created by the export.
The path is prefixed by file: if the destination is on a local disk.

Additionally, a file _symlink_format_manifest/druid_export_meta is created. The file contains additional information about the export. Currently, this only contains the manifest file version, to track which version of the manifest file was created by the export.

Example

Local storage:

└[~/export]> cat _symlink_format_manifest/manifest
file:/Users/adarshsanjeev/export/query-293c1f4c-d5ed-4b04-9690-d7d2d9db4995-worker2-partition23.csv
file:/Users/adarshsanjeev/export/query-293c1f4c-d5ed-4b04-9690-d7d2d9db4995-worker1-partition13.csv
file:/Users/adarshsanjeev/export/query-293c1f4c-d5ed-4b04-9690-d7d2d9db4995-worker0-partition24.csv
...
file:/Users/adarshsanjeev/export/query-293c1f4c-d5ed-4b04-9690-d7d2d9db4995-worker1-partition1.csv

S3 export file:

File created at s3://export-bucket/export/_symlink_format_manifest/manifest

s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker2-partition2.csv
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker1-partition1.csv
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition0.csv
...
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv

druid_export_meta:

version: 1

Export is still an experimental feature, and the structure of the file could be changed in the future.


Upgrade issues

  • During a rolling update, older versions of workers would not return a list of exported files, and older controller would not create a manifest file. Therefore, export queries run during this time might have incomplete manifests.

Release notes

  • Export queries will also create a manifest file at the destination, which lists the files created by the query.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Feb 23, 2024
@adarshsanjeev
Copy link
Contributor Author

To see if the created export file is in the symlink format, I generated manifest files using Apache Spark with Delta Lake. The generated file is in a similar format for both local disk and S3, with the only difference being that since DeltaLake uses s3a while writing, the paths in the manifest file also have the same absolute paths.

Local:

NBuser@c46fe1cca55f:/tmp/delta-table$ cat _symlink_format_manifest/manifest
file:/tmp/delta-table/part-00003-463556da-8423-41f9-a25a-0b68a51e0fff-c000.snappy.parquet
file:/tmp/delta-table/part-00005-f1682aee-f29b-41ad-8ac5-9b49d8de1394-c000.snappy.parquet
file:/tmp/delta-table/part-00007-486d1cda-5871-43f8-86cc-3a44e4adede7-c000.snappy.parquet
file:/tmp/delta-table/part-00001-50edffa4-06c3-4d70-bc8b-f67da8ef4195-c000.snappy.parquet
file:/tmp/delta-table/part-00009-dfd23cfe-e6c2-4001-ae42-8e964ec8f197-c000.snappy.parquet

S3:

s3a://export-bucket/delta_test_table2/part-00000-2c8c8389-e5a6-47f9-8394-6730c474357f-c000.snappy.parquet
s3a://export-bucket/delta_test_table2/part-00002-f897b11d-692a-427d-a1ca-9b15ef218d83-c000.snappy.parquet
...
s3a://export-bucket/delta_test_table2/part-00004-a4fc5555-6fa3-46da-a4f7-ccb6b3e8b8eb-c000.snappy.parquet

@@ -99,6 +99,17 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte
This variation of EXTERN requires one argument, the details of the destination as specified below.
This variation additionally requires an `AS` clause to specify the format of the exported rows.

While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`.
- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the symlink manifest format? I wasn't able to find a definitive answer while searching "symlink manifest format", therefore some clarification would be helpful.

Also, is it for Druid's internal use, or can other systems and operators make use of the manifest file created?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format itself does not seem to be well documented. It's not for Druid's use, other data stores have the capability to read the format, like delta.compatibility.symlinkFormatManifest.enabled Athena, Trino etc support reading them.

Given this, I think that we can skip documenting the format itself, but mention that it follows the symlink manifest format, wdyt?

...
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv
```
- `_symlink_format_manifest/druid_export_meta`: Used to store additional information about the export metadata, such as the version of the manifest file format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this version for internal use, or does it have relevance outside of Druid as well? Also, can you please add the format of this metadata file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this part since it is not intended to be user facing.

* <br>
* Currently, this only contains the manifest file version.
*/
private void createDruidMetadataFile(StorageConnector storageConnector) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we are writing the results in an ad-hoc format. I think it makes sense to use one of the standard formats like JSON, YAML, etc if this is a user-facing file. Else, we should remove it from the documentation as well, since it is an implementation detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this documentation

log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath());

if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) {
throw DruidException.defensive("Found existing manifest file already present at path.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a defensive check? A user can create a manifest file manually, and the job will fail. Then it isn't a defensive check. We should use something relevant to either the users or the operator here. I think it makes sense that we don't expect to encounter it, given that the files would be namespace with task id, however, it still shouldn't be a defensive check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

public void writeMetadata(List<String> exportedFiles) throws IOException
{
final StorageConnector storageConnector = exportStorageProvider.get();
log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The sentence should make sense when reading without the interpolation

Suggested change
log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath());
log.info("Writing manifest file at location[%s]", exportStorageProvider.getBasePath());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

}

createManifestFile(storageConnector, exportedFiles);
createDruidMetadataFile(storageConnector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the previous call succeeds and this one fails? Would we end up in a partial state where the manifest is created but the metadata isn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would create manifest file, but not the metadata one and the query should fail. I think this should be fine, since the metadata file itself is for druid to track the version which had previously created the import only

@@ -88,7 +93,7 @@ public ExportStorageProvider getExportStorageProvider()
}

@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
public ProcessorsAndChannels<Object, Object> makeProcessors(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not

Suggested change
public ProcessorsAndChannels<Object, Object> makeProcessors(
public ProcessorsAndChannels<Object, List<String>> makeProcessors(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the type here would result in the arguments in thecall to mergeResults also having List, which would cause a cast exception to be thrown instead of a nicer one, which we would like to avoid, right?

Comment on lines 117 to 120
.withAccumulation(new ArrayList<String>(), (acc, file) -> {
((ArrayList<String>) acc).add((String) file);
return acc;
}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems confusing, do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the withAccumulation call so that the types match, but I have changed it to be a noop instead, more in line with other places.

@Override
public Object mergeAccumulatedResult(Object accumulated, Object otherAccumulated)
{
// Maintain upgrade compatibility, if a worker does not return a list, ignore it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we end up in a state where some files are present and some are not? What is to say that there are files absent? I think if the manifest file is for cosmetic purposes only, we can get away with partial results, but it should be explicitly called out in the documentation.

Since it's in a widely used format, the expectation seems to be that the users would be able to ingest the results in other systems, like:
Druid -> Export destination -> External system

This will lead to incorrect results, without the users knowing. I think we should fail the job here, and if not, then this seems like the usage of the manifest file is limited by its correctness and should be called out explicitly. We can also have a context parameter (which I am not a big fan of), that can switch between error-throwing and ignoring behaviour, for those who want correctness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to throw an exception

@cryptoe cryptoe merged commit 3df00ae into apache:master Apr 15, 2024
85 checks passed
Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId);
if (!(resultObjectForStage instanceof List)) {
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case.
log.warn("Was unable to create manifest file due to ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adarshsanjeev looks like a log message is incomplete here- what should this say?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this message, added a PR to correct it #16363

@adarshsanjeev adarshsanjeev added this to the 30.0.0 milestone May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - Documentation Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying Design Review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants