From 24cce0da7fe8e1ecde2998b7ab72111cc90251b4 Mon Sep 17 00:00:00 2001 From: Jover Lee Date: Thu, 2 Nov 2023 17:20:14 -0700 Subject: [PATCH 1/6] ingest/upload: Refactor `files_to_upload` config to mapping Simplify the `files_to_upload` config as a single mapping where the key is the remote file name and the value is the local file instead of maintaining a two lists of files. This ensures that we know exactly which local file is uploaded to the remote file without worrying about order or duplicates. --- ingest/Snakefile | 19 +++--------- ingest/config/optional.yaml | 31 +++++++------------ .../snakemake_rules/trigger_rebuild.smk | 4 +-- ingest/workflow/snakemake_rules/upload.smk | 6 ++-- 4 files changed, 20 insertions(+), 40 deletions(-) diff --git a/ingest/Snakefile b/ingest/Snakefile index 4e829ea4..63872f48 100644 --- a/ingest/Snakefile +++ b/ingest/Snakefile @@ -20,19 +20,9 @@ def _get_all_targets(wildcards): upload_config = config.get("upload", {}) for target, params in upload_config.items(): - files_to_upload = params.get("files_to_upload", []) - remote_file_names = params.get("remote_file_names", []) + files_to_upload = params.get("files_to_upload", {}) - if len(files_to_upload) != len(remote_file_names): - print( - f"Skipping file upload for {target!r} because the number of", - "files to upload does not match the number of remote file names.", - ) - elif len(remote_file_names) != len(set(remote_file_names)): - print( - f"Skipping file upload for {target!r} because there are duplicate remote file names." - ) - elif not params.get("dst"): + if not params.get("dst"): print( f"Skipping file upload for {target!r} because the destintion was not defined." ) @@ -40,11 +30,10 @@ def _get_all_targets(wildcards): all_targets.extend( expand( [ - f"data/upload/{target}/{{file_to_upload}}-to-{{remote_file_name}}.done" + f"data/upload/{target}/{{remote_file_name}}.done" ], zip, - file_to_upload=files_to_upload, - remote_file_name=remote_file_names, + remote_file_name=files_to_upload.keys() ) ) diff --git a/ingest/config/optional.yaml b/ingest/config/optional.yaml index 8640764a..fdf91c89 100644 --- a/ingest/config/optional.yaml +++ b/ingest/config/optional.yaml @@ -5,26 +5,17 @@ upload: s3: # AWS S3 Bucket with prefix dst: 's3://nextstrain-data/files/workflows/mpox' - # Files to upload to S3 that are in the `data` directory - files_to_upload: [ - 'genbank.ndjson', - 'sequences.ndjson', - 'metadata.tsv', - 'sequences.fasta', - 'alignment.fasta', - 'insertions.csv', - 'translations.zip' - ] - # Remote file names for the files to upload, must be in the same order as local files above - remote_file_names: [ - 'genbank.ndjson.xz', - 'all_sequences.ndjson.xz', - 'metadata.tsv.gz', - 'sequences.fasta.xz', - 'alignment.fasta.xz', - 'insertions.csv.gz', - 'translations.zip' - ] + # Mapping of files to upload, with key as remote file name and the value + # the local file path relative to the ingest/data directory. + files_to_upload: + genbank.ndjson.xz: genbank.ndjson + all_sequences.ndjson.xz: sequences.ndjson + metadata.tsv.gz: metadata.tsv + sequences.fasta.xz: sequences.fasta + alignment.fasta.xz: alignment.fasta + insertions.csv.gz: insertions.csv + translations.zip: translations.zip + cloudfront_domain: 'data.nextstrain.org' # Toggle for Slack notifications diff --git a/ingest/workflow/snakemake_rules/trigger_rebuild.smk b/ingest/workflow/snakemake_rules/trigger_rebuild.smk index 1a7a52d0..2e797eeb 100644 --- a/ingest/workflow/snakemake_rules/trigger_rebuild.smk +++ b/ingest/workflow/snakemake_rules/trigger_rebuild.smk @@ -12,8 +12,8 @@ rule trigger_build: Triggering monekypox builds via repository action type `rebuild`. """ input: - metadata_upload="data/upload/s3/metadata.tsv-to-metadata.tsv.gz.done", - fasta_upload="data/upload/s3/sequences.fasta-to-sequences.fasta.xz.done", + metadata_upload="data/upload/s3/metadata.tsv.gz.done", + fasta_upload="data/upload/s3/sequences.fasta.xz.done", output: touch("data/trigger/rebuild.done"), shell: diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk index fd4d3305..8279c3a7 100644 --- a/ingest/workflow/snakemake_rules/upload.smk +++ b/ingest/workflow/snakemake_rules/upload.smk @@ -4,7 +4,7 @@ This part of the workflow handles uploading files to a specified destination. Uses predefined wildcard `file_to_upload` determine input and predefined wildcard `remote_file_name` as the remote file name in the specified destination. -Produces output files as `data/upload/{upload_target_name}/{file_to_upload}-to-{remote_file_name}.done`. +Produces output files as `data/upload/{upload_target_name}/{remote_file_name}.done`. Currently only supports uploads to AWS S3, but additional upload rules can be easily added as long as they follow the output pattern described above. @@ -26,7 +26,7 @@ def _get_upload_inputs(wildcards): the rules in `slack_notifications.smk`, so it only includes flag files if `send_notifications` is True. """ - file_to_upload = wildcards.file_to_upload + file_to_upload = config["upload"]["s3"]["files_to_upload"][wildcards.remote_file_name] inputs = { "file_to_upload": f"data/{file_to_upload}", @@ -49,7 +49,7 @@ rule upload_to_s3: input: unpack(_get_upload_inputs), output: - "data/upload/s3/{file_to_upload}-to-{remote_file_name}.done", + "data/upload/s3/{remote_file_name}.done", params: quiet="" if send_notifications else "--quiet", s3_dst=config["upload"].get("s3", {}).get("dst", ""), From 075ce786306a6705ae3002b3991b9fa1d2305127 Mon Sep 17 00:00:00 2001 From: Jover Lee Date: Thu, 2 Nov 2023 17:27:12 -0700 Subject: [PATCH 2/6] ingest/upload: Provide local path relative to ingest directory Change the expectation that the local file paths for `file_to_upload` must be relative to the ingest directory instead of the ingest/data directory. This is done in preparation for moving the final outputs fo the ingest workflow to an ingest/results directory. --- ingest/config/optional.yaml | 16 ++++++++-------- ingest/workflow/snakemake_rules/upload.smk | 8 +++----- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/ingest/config/optional.yaml b/ingest/config/optional.yaml index fdf91c89..23726966 100644 --- a/ingest/config/optional.yaml +++ b/ingest/config/optional.yaml @@ -6,15 +6,15 @@ upload: # AWS S3 Bucket with prefix dst: 's3://nextstrain-data/files/workflows/mpox' # Mapping of files to upload, with key as remote file name and the value - # the local file path relative to the ingest/data directory. + # the local file path relative to the ingest directory. files_to_upload: - genbank.ndjson.xz: genbank.ndjson - all_sequences.ndjson.xz: sequences.ndjson - metadata.tsv.gz: metadata.tsv - sequences.fasta.xz: sequences.fasta - alignment.fasta.xz: alignment.fasta - insertions.csv.gz: insertions.csv - translations.zip: translations.zip + genbank.ndjson.xz: data/genbank.ndjson + all_sequences.ndjson.xz: data/sequences.ndjson + metadata.tsv.gz: data/metadata.tsv + sequences.fasta.xz: data/sequences.fasta + alignment.fasta.xz: data/alignment.fasta + insertions.csv.gz: data/insertions.csv + translations.zip: data/translations.zip cloudfront_domain: 'data.nextstrain.org' diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk index 8279c3a7..7941351c 100644 --- a/ingest/workflow/snakemake_rules/upload.smk +++ b/ingest/workflow/snakemake_rules/upload.smk @@ -26,18 +26,16 @@ def _get_upload_inputs(wildcards): the rules in `slack_notifications.smk`, so it only includes flag files if `send_notifications` is True. """ - file_to_upload = config["upload"]["s3"]["files_to_upload"][wildcards.remote_file_name] - inputs = { - "file_to_upload": f"data/{file_to_upload}", + "file_to_upload": config["upload"]["s3"]["files_to_upload"][wildcards.remote_file_name], } if send_notifications: flag_file = [] - if file_to_upload == "genbank.ndjson": + if file_to_upload == "data/genbank.ndjson": flag_file = "data/notify/genbank-record-change.done" - elif file_to_upload == "metadata.tsv": + elif file_to_upload == "data/metadata.tsv": flag_file = "data/notify/metadata-diff.done" inputs["notify_flag_file"] = flag_file From d73f60aca4f3a6d2e34bb348d2049854fe3f6251 Mon Sep 17 00:00:00 2001 From: Jover Lee Date: Thu, 2 Nov 2023 17:32:02 -0700 Subject: [PATCH 3/6] ingest/transform: fix output file docstring --- ingest/workflow/snakemake_rules/transform.smk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/workflow/snakemake_rules/transform.smk b/ingest/workflow/snakemake_rules/transform.smk index a39fb735..0c3cf6b1 100644 --- a/ingest/workflow/snakemake_rules/transform.smk +++ b/ingest/workflow/snakemake_rules/transform.smk @@ -6,7 +6,7 @@ formats and expects input file This will produce output files as - metadata = "data/metadata.tsv" + metadata = "data/metadata_raw.tsv" sequences = "data/sequences.fasta" Parameters are expected to be defined in `config.transform`. From 1cccf1a640f9e217a5bcdd31718c26c14896aaa2 Mon Sep 17 00:00:00 2001 From: Jover Lee Date: Thu, 2 Nov 2023 17:38:58 -0700 Subject: [PATCH 4/6] ingest: Output final files to results directory Instead of mixing the final results with the intermediate files produced during the workflow run, output the final files to the result directory. --- ingest/README.md | 4 ++-- ingest/Snakefile | 2 +- ingest/config/optional.yaml | 4 ++-- ingest/workflow/snakemake_rules/nextclade.smk | 6 +++--- ingest/workflow/snakemake_rules/slack_notifications.smk | 2 +- ingest/workflow/snakemake_rules/transform.smk | 4 ++-- ingest/workflow/snakemake_rules/upload.smk | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ingest/README.md b/ingest/README.md index 52302b82..b7eb8152 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -25,8 +25,8 @@ nextstrain build . This will produce two files (within the `ingest` directory): -- `data/metadata.tsv` -- `data/sequences.fasta` +- `results/metadata.tsv` +- `results/sequences.fasta` Run the complete ingest pipeline and upload results to AWS S3 with diff --git a/ingest/Snakefile b/ingest/Snakefile index 63872f48..0566d575 100644 --- a/ingest/Snakefile +++ b/ingest/Snakefile @@ -14,7 +14,7 @@ send_slack_notifications = config.get("send_slack_notifications", False) def _get_all_targets(wildcards): # Default targets are the metadata TSV and sequences FASTA files - all_targets = ["data/sequences.fasta", "data/metadata.tsv"] + all_targets = ["results/sequences.fasta", "results/metadata.tsv"] # Add additional targets based on upload config upload_config = config.get("upload", {}) diff --git a/ingest/config/optional.yaml b/ingest/config/optional.yaml index 23726966..d445e075 100644 --- a/ingest/config/optional.yaml +++ b/ingest/config/optional.yaml @@ -10,8 +10,8 @@ upload: files_to_upload: genbank.ndjson.xz: data/genbank.ndjson all_sequences.ndjson.xz: data/sequences.ndjson - metadata.tsv.gz: data/metadata.tsv - sequences.fasta.xz: data/sequences.fasta + metadata.tsv.gz: results/metadata.tsv + sequences.fasta.xz: results/sequences.fasta alignment.fasta.xz: data/alignment.fasta insertions.csv.gz: data/insertions.csv translations.zip: data/translations.zip diff --git a/ingest/workflow/snakemake_rules/nextclade.smk b/ingest/workflow/snakemake_rules/nextclade.smk index 385ad6e4..f10a3f9e 100644 --- a/ingest/workflow/snakemake_rules/nextclade.smk +++ b/ingest/workflow/snakemake_rules/nextclade.smk @@ -19,7 +19,7 @@ rule nextclade_dataset_hMPXV: rule align: input: - sequences="data/sequences.fasta", + sequences="results/sequences.fasta", dataset="hmpxv.zip", output: alignment="data/alignment.fasta", @@ -41,7 +41,7 @@ rule align: rule nextclade: input: - sequences="data/sequences.fasta", + sequences="results/sequences.fasta", dataset="mpxv.zip", output: "data/nextclade.tsv", @@ -58,7 +58,7 @@ rule join_metadata_clades: metadata="data/metadata_raw.tsv", nextclade_field_map=config["nextclade"]["field_map"], output: - metadata="data/metadata.tsv", + metadata="results/metadata.tsv", params: id_field=config["transform"]["id_field"], nextclade_id_field=config["nextclade"]["id_field"], diff --git a/ingest/workflow/snakemake_rules/slack_notifications.smk b/ingest/workflow/snakemake_rules/slack_notifications.smk index 7dea0e7c..9eb04639 100644 --- a/ingest/workflow/snakemake_rules/slack_notifications.smk +++ b/ingest/workflow/snakemake_rules/slack_notifications.smk @@ -36,7 +36,7 @@ rule notify_on_genbank_record_change: rule notify_on_metadata_diff: input: - metadata="data/metadata.tsv", + metadata="results/metadata.tsv", output: touch("data/notify/metadata-diff.done"), params: diff --git a/ingest/workflow/snakemake_rules/transform.smk b/ingest/workflow/snakemake_rules/transform.smk index 0c3cf6b1..fe7d7c16 100644 --- a/ingest/workflow/snakemake_rules/transform.smk +++ b/ingest/workflow/snakemake_rules/transform.smk @@ -7,7 +7,7 @@ formats and expects input file This will produce output files as metadata = "data/metadata_raw.tsv" - sequences = "data/sequences.fasta" + sequences = "results/sequences.fasta" Parameters are expected to be defined in `config.transform`. """ @@ -43,7 +43,7 @@ rule transform: annotations=config["transform"]["annotations"], output: metadata="data/metadata_raw.tsv", - sequences="data/sequences.fasta", + sequences="results/sequences.fasta", log: "logs/transform.txt", params: diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk index 7941351c..f18aebe9 100644 --- a/ingest/workflow/snakemake_rules/upload.smk +++ b/ingest/workflow/snakemake_rules/upload.smk @@ -35,7 +35,7 @@ def _get_upload_inputs(wildcards): if file_to_upload == "data/genbank.ndjson": flag_file = "data/notify/genbank-record-change.done" - elif file_to_upload == "data/metadata.tsv": + elif file_to_upload == "results/metadata.tsv": flag_file = "data/notify/metadata-diff.done" inputs["notify_flag_file"] = flag_file From 0974020e936803e98ebcc20a22a49431a6fe2dd4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 3 Nov 2023 00:51:10 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ingest/Snakefile | 6 ++---- ingest/workflow/snakemake_rules/upload.smk | 4 +++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ingest/Snakefile b/ingest/Snakefile index 0566d575..6350149b 100644 --- a/ingest/Snakefile +++ b/ingest/Snakefile @@ -29,11 +29,9 @@ def _get_all_targets(wildcards): else: all_targets.extend( expand( - [ - f"data/upload/{target}/{{remote_file_name}}.done" - ], + [f"data/upload/{target}/{{remote_file_name}}.done"], zip, - remote_file_name=files_to_upload.keys() + remote_file_name=files_to_upload.keys(), ) ) diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk index f18aebe9..60c5c9b7 100644 --- a/ingest/workflow/snakemake_rules/upload.smk +++ b/ingest/workflow/snakemake_rules/upload.smk @@ -27,7 +27,9 @@ def _get_upload_inputs(wildcards): `send_notifications` is True. """ inputs = { - "file_to_upload": config["upload"]["s3"]["files_to_upload"][wildcards.remote_file_name], + "file_to_upload": config["upload"]["s3"]["files_to_upload"][ + wildcards.remote_file_name + ], } if send_notifications: From 302273ce1bc959fd4fa4fe938229fadca20cf768 Mon Sep 17 00:00:00 2001 From: Jover Lee Date: Mon, 6 Nov 2023 15:12:06 -0800 Subject: [PATCH 6/6] Fix spelling error --- ingest/Snakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/Snakefile b/ingest/Snakefile index 6350149b..0ed057b4 100644 --- a/ingest/Snakefile +++ b/ingest/Snakefile @@ -24,7 +24,7 @@ def _get_all_targets(wildcards): if not params.get("dst"): print( - f"Skipping file upload for {target!r} because the destintion was not defined." + f"Skipping file upload for {target!r} because the destination was not defined." ) else: all_targets.extend(