Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

augur curate I/O: validate records have same fields #1518

Merged
merged 3 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Major changes

* curate format-dates: Raises an error if provided date field does not exist in records. [#1509][] (@joverlee521)
* All curate subcommands: Verifies all input records have the same fields and raises an error if a record does not have matching fields. [#1518][] (@joverlee521)

### Features

Expand All @@ -14,7 +15,7 @@
* Added a new sub-command `augur curate abbreviate-authors` to abbreviate lists of authors to "<first author> et al." Previously, this was avaliable as the `transform-authors` script within the nextstrain/ingest repo. [#1483][] (@genehack)
* Added a new sub-command `augur curate parse-genbank-location` to parse the `geo_loc_name` field from GenBank reconds. Previously, this was available as the `translate-genbank-location` script within the nextstrain/ingest repo. [#1485][] (@genehack)
* curate format-dates: Added defaults to `--expected-date-formats` so that ISO 8601 dates (`%Y-%m-%d`) and its various masked forms (e.g. `%Y-XX-XX`) are automatically parsed by the command. [#1501][] (@joverlee521)
* Added a new sub-command `augur curate translate-strain-name` to filter strain names based on matching a regular expression. Previously, this was available as the `translate-strain-names` script within the nextstrain/ingest repo. [#1486][] (@genehack)
* Added a new sub-command `augur curate translate-strain-name` to filter strain names based on matching a regular expression. Previously, this was available as the `translate-strain-names` script within the nextstrain/ingest repo. [#1514][] (@genehack)

### Bug Fixes

Expand All @@ -29,6 +30,8 @@
[#1495]: https://github.com/nextstrain/augur/pull/1495
[#1501]: https://github.com/nextstrain/augur/pull/1501
[#1509]: https://github.com/nextstrain/augur/pull/1509
[#1514]: https://github.com/nextstrain/augur/pull/1514
[#1518]: https://github.com/nextstrain/augur/pull/1518

## 24.4.0 (15 May 2024)

Expand Down
53 changes: 47 additions & 6 deletions augur/curate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
from collections import deque
from textwrap import dedent
from typing import Iterable, Set

from augur.argparse_ import ExtendOverwriteDefault, add_command_subparsers
from augur.errors import AugurError
Expand Down Expand Up @@ -120,6 +121,40 @@ def register_parser(parent_subparsers):
return parser


def validate_records(records: Iterable[dict], is_input: bool) -> Iterable[dict]:
"""
Validate that the provided *records* all have the same fields.
Uses the keys of the first record to check against all other records.

Parameters
----------
records: iterable of dict

is_input: bool
Whether the provided records come directly from user provided input
"""
error_message = "Records do not have the same fields! "
if is_input:
error_message += "Please check your input data has the same fields."
else:
# Hopefully users should not run into this error as it means we are
# not uniformly adding/removing fields from records
error_message += dedent("""\
Something unexpected happened during the augur curate command.
To report this, please open a new issue including the original command:
<https://github.com/nextstrain/augur/issues/new/choose>
""")

first_record_keys: Set[str] = set()
for idx, record in enumerate(records):
if idx == 0:
first_record_keys.update(record.keys())
else:
if set(record.keys()) != first_record_keys:
raise AugurError(error_message)
yield record


def run(args):
# Print help if no subcommands are used
if not getattr(args, SUBCOMMAND_ATTRIBUTE, None):
Expand Down Expand Up @@ -177,25 +212,31 @@ def run(args):
input files can be provided via the command line options `--metadata` and `--fasta`.
See the command's help message for more details."""))

# Validate records have the same input fields
validated_input_records = validate_records(records, True)

# Run subcommand to get modified records
modified_records = getattr(args, SUBCOMMAND_ATTRIBUTE).run(args, records)
modified_records = getattr(args, SUBCOMMAND_ATTRIBUTE).run(args, validated_input_records)

# Validate modified records have the same output fields
validated_output_records = validate_records(modified_records, False)

# Output modified records
# First output FASTA, since the write fasta function yields the records again
# and removes the sequences from the records
if args.output_fasta:
modified_records = write_records_to_fasta(
modified_records,
validated_output_records = write_records_to_fasta(
validated_output_records,
args.output_fasta,
args.output_id_field,
args.output_seq_field)

if args.output_metadata:
write_records_to_tsv(modified_records, args.output_metadata)
write_records_to_tsv(validated_output_records, args.output_metadata)

if not (args.output_fasta or args.output_metadata):
dump_ndjson(modified_records)
dump_ndjson(validated_output_records)
else:
# Exhaust generator to ensure we run through all records
# when only a FASTA output is requested but not a metadata output
deque(modified_records, maxlen=0)
deque(validated_output_records, maxlen=0)
37 changes: 37 additions & 0 deletions tests/functional/curate/cram/validate-records.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Setup

$ source "$TESTDIR"/_setup.sh

Testing records are validated appropriately by augur curate.
Create NDJSON file for testing validation catches records with different fields.

$ cat >records.ndjson <<~~
> {"string": "string_1"}
> {"string": "string_2"}
> {"string": "string_3"}
> {"string": "string_4", "number": 123}
> ~~

This will always pass thru the records that pass validation but should raise an
error when it encounters the record with mismatched fields.

$ cat records.ndjson | ${AUGUR} curate passthru
ERROR: Records do not have the same fields! Please check your input data has the same fields.
{"string": "string_1"}
{"string": "string_2"}
{"string": "string_3"}
[2]

Passing the records through multiple augur curate commands should raise the
same error when it encounters the record with mismatched fields.
Copy link
Member

@jameshadfield jameshadfield Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[not a request for changes]

Do you have any good resources for understanding how pipes work in situations like this? I.e. the augur curate X is printing lines one-by-one, so does an individual NDJSON line flow through all curate commands before the next one starts flowing through? That's what this output makes it seem like. But when python flushes the print buffer must come into the equation right? And unix pipes presumably have some concept of backpressure / buffering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, the shell starts all the commands at the same time, with file descriptors arranged so that the STDOUT of the first command is the STDIN of the second, and so on down the line.

There is a buffer for the pipe, managed by the kernel, but if it's full, it blocks on the write side (and if it's empty, it blocks on the read side). This SO answer may be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is as @genehack described ☝️ (I've only skimmed the pipe man page and have a limited understanding here)

so does an individual NDJSON line flow through all curate commands before the next one starts flowing through?

This should depend on the buffer size, where multiple records can flow through a single command to fill up the buffer before being passed to the next command.

In the case where the first command runs into an error, it should close it's "write end" so the subsequent commands will receive some end-of-file signal and terminate after writing their outputs as well. (Or exit immediately if set -eo pipefail)

Copy link
Member

@jameshadfield jameshadfield Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both! Reading those resources this is my understanding of what's happening (using c1 for the first curate command, etc):

  1. c1 writes the first record to the appropriate fd, and there's no buffering on the python side (since we are using print() with the default new line ending).
  2. c2 reads this more-or-less immediately and in parallel with c1 continuing to run. It writes output to its fd, c3 reads it and so on.
  3. This happens for the first three (valid) records - they make it through all three curate commands and are written to stdout.
  4. At some point c1 reads the invalid record, prints to stderr, and exits code 2. This is, AFAICT, seen by c2 no differently to an "end of file" and so c2 exits (code 0) once it's consumed all the data in its input buffer.
  5. The pipefail causes the entire pipeline to have exit code 2 because c1 had exit code 2, but this is done after the pipeline has finished. I.e. it doesn't actually change any behaviour of the pipeline -- after c1 has exited c2 will continue to run while it has data to read on it's input buffer and so on.
  6. The order of steps (3) and (4) seems like it's a race-condition, but the fact that the pipeline's output has the error message (stderr) before the records (stdout) indicates that (4) comes before the first record has made it through the entire pipeline. Stdout seems to be line-buffered so I don't think that's important here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

James asked me to check his understanding, and generally yeah, it's about right AFAICT.

One small inaccuracy is that sys.stdout is block-buffered, not line-buffered, when not connected to a TTY. (And it can be unbuffered entirely if desired.)

If you wanted to empirically test this understanding, you could strace the pipeline (e.g. the bash process and its children) and inspect the order of read/write/exit operations for each process in the pipeline. (strace is a Linux tool; there are equivalents for macOS, but I'm not as adept with them.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional things to consider if you want to dig into the order of 3 vs. 4 more is also how exactly stdout and stderr are interleaved by cram to match against the test file and the buffering mode of Python's sys.stderr (which is version dependent).


$ set -o pipefail
$ cat records.ndjson \
> | ${AUGUR} curate passthru \
> | ${AUGUR} curate passthru \
> | ${AUGUR} curate passthru
ERROR: Records do not have the same fields! Please check your input data has the same fields.
{"string": "string_1"}
{"string": "string_2"}
{"string": "string_3"}
[2]
Loading