Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest updates #222

Merged
merged 6 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ nextstrain build .

This will produce two files (within the `ingest` directory):

- `data/metadata.tsv`
- `data/sequences.fasta`
- `results/metadata.tsv`
- `results/sequences.fasta`

Run the complete ingest pipeline and upload results to AWS S3 with

Expand Down
25 changes: 6 additions & 19 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,24 @@ send_slack_notifications = config.get("send_slack_notifications", False)

def _get_all_targets(wildcards):
# Default targets are the metadata TSV and sequences FASTA files
all_targets = ["data/sequences.fasta", "data/metadata.tsv"]
all_targets = ["results/sequences.fasta", "results/metadata.tsv"]

# Add additional targets based on upload config
upload_config = config.get("upload", {})

for target, params in upload_config.items():
files_to_upload = params.get("files_to_upload", [])
remote_file_names = params.get("remote_file_names", [])
files_to_upload = params.get("files_to_upload", {})

if len(files_to_upload) != len(remote_file_names):
if not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the number of",
"files to upload does not match the number of remote file names.",
)
elif len(remote_file_names) != len(set(remote_file_names)):
print(
f"Skipping file upload for {target!r} because there are duplicate remote file names."
)
elif not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the destintion was not defined."
f"Skipping file upload for {target!r} because the destination was not defined."
)
else:
all_targets.extend(
expand(
[
f"data/upload/{target}/{{file_to_upload}}-to-{{remote_file_name}}.done"
],
[f"data/upload/{target}/{{remote_file_name}}.done"],
zip,
file_to_upload=files_to_upload,
remote_file_name=remote_file_names,
remote_file_name=files_to_upload.keys(),
)
)

Expand Down
31 changes: 11 additions & 20 deletions ingest/config/optional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,17 @@ upload:
s3:
# AWS S3 Bucket with prefix
dst: 's3://nextstrain-data/files/workflows/mpox'
# Files to upload to S3 that are in the `data` directory
files_to_upload: [
'genbank.ndjson',
'sequences.ndjson',
'metadata.tsv',
'sequences.fasta',
'alignment.fasta',
'insertions.csv',
'translations.zip'
]
# Remote file names for the files to upload, must be in the same order as local files above
remote_file_names: [
'genbank.ndjson.xz',
'all_sequences.ndjson.xz',
'metadata.tsv.gz',
'sequences.fasta.xz',
'alignment.fasta.xz',
'insertions.csv.gz',
'translations.zip'
]
# Mapping of files to upload, with key as remote file name and the value
# the local file path relative to the ingest directory.
files_to_upload:
genbank.ndjson.xz: data/genbank.ndjson
all_sequences.ndjson.xz: data/sequences.ndjson
metadata.tsv.gz: results/metadata.tsv
sequences.fasta.xz: results/sequences.fasta
alignment.fasta.xz: data/alignment.fasta
insertions.csv.gz: data/insertions.csv
translations.zip: data/translations.zip

cloudfront_domain: 'data.nextstrain.org'

# Toggle for Slack notifications
Expand Down
6 changes: 3 additions & 3 deletions ingest/workflow/snakemake_rules/nextclade.smk
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rule nextclade_dataset_hMPXV:

rule align:
input:
sequences="data/sequences.fasta",
sequences="results/sequences.fasta",
dataset="hmpxv.zip",
output:
alignment="data/alignment.fasta",
Expand All @@ -41,7 +41,7 @@ rule align:

rule nextclade:
input:
sequences="data/sequences.fasta",
sequences="results/sequences.fasta",
dataset="mpxv.zip",
output:
"data/nextclade.tsv",
Expand All @@ -58,7 +58,7 @@ rule join_metadata_clades:
metadata="data/metadata_raw.tsv",
nextclade_field_map=config["nextclade"]["field_map"],
output:
metadata="data/metadata.tsv",
metadata="results/metadata.tsv",
params:
id_field=config["transform"]["id_field"],
nextclade_id_field=config["nextclade"]["id_field"],
Expand Down
2 changes: 1 addition & 1 deletion ingest/workflow/snakemake_rules/slack_notifications.smk
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rule notify_on_genbank_record_change:

rule notify_on_metadata_diff:
input:
metadata="data/metadata.tsv",
metadata="results/metadata.tsv",
output:
touch("data/notify/metadata-diff.done"),
params:
Expand Down
6 changes: 3 additions & 3 deletions ingest/workflow/snakemake_rules/transform.smk
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ formats and expects input file

This will produce output files as

metadata = "data/metadata.tsv"
sequences = "data/sequences.fasta"
metadata = "data/metadata_raw.tsv"
sequences = "results/sequences.fasta"

Parameters are expected to be defined in `config.transform`.
"""
Expand Down Expand Up @@ -43,7 +43,7 @@ rule transform:
annotations=config["transform"]["annotations"],
output:
metadata="data/metadata_raw.tsv",
sequences="data/sequences.fasta",
sequences="results/sequences.fasta",
log:
"logs/transform.txt",
params:
Expand Down
4 changes: 2 additions & 2 deletions ingest/workflow/snakemake_rules/trigger_rebuild.smk
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ rule trigger_build:
Triggering monekypox builds via repository action type `rebuild`.
"""
input:
metadata_upload="data/upload/s3/metadata.tsv-to-metadata.tsv.gz.done",
fasta_upload="data/upload/s3/sequences.fasta-to-sequences.fasta.xz.done",
metadata_upload="data/upload/s3/metadata.tsv.gz.done",
fasta_upload="data/upload/s3/sequences.fasta.xz.done",
output:
touch("data/trigger/rebuild.done"),
shell:
Expand Down
14 changes: 7 additions & 7 deletions ingest/workflow/snakemake_rules/upload.smk
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This part of the workflow handles uploading files to a specified destination.
Uses predefined wildcard `file_to_upload` determine input and predefined
wildcard `remote_file_name` as the remote file name in the specified destination.

Produces output files as `data/upload/{upload_target_name}/{file_to_upload}-to-{remote_file_name}.done`.
Produces output files as `data/upload/{upload_target_name}/{remote_file_name}.done`.

Currently only supports uploads to AWS S3, but additional upload rules can
be easily added as long as they follow the output pattern described above.
Expand All @@ -26,18 +26,18 @@ def _get_upload_inputs(wildcards):
the rules in `slack_notifications.smk`, so it only includes flag files if
`send_notifications` is True.
"""
file_to_upload = wildcards.file_to_upload

inputs = {
"file_to_upload": f"data/{file_to_upload}",
"file_to_upload": config["upload"]["s3"]["files_to_upload"][
wildcards.remote_file_name
],
}

if send_notifications:
flag_file = []

if file_to_upload == "genbank.ndjson":
if file_to_upload == "data/genbank.ndjson":
flag_file = "data/notify/genbank-record-change.done"
elif file_to_upload == "metadata.tsv":
elif file_to_upload == "results/metadata.tsv":
flag_file = "data/notify/metadata-diff.done"

inputs["notify_flag_file"] = flag_file
Expand All @@ -49,7 +49,7 @@ rule upload_to_s3:
input:
unpack(_get_upload_inputs),
output:
"data/upload/s3/{file_to_upload}-to-{remote_file_name}.done",
"data/upload/s3/{remote_file_name}.done",
params:
quiet="" if send_notifications else "--quiet",
s3_dst=config["upload"].get("s3", {}).get("dst", ""),
Expand Down