diff --git a/INSTALL.md b/INSTALL.md index 5051e30..46605af 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1,9 +1,11 @@ ## Dependencies -* [Python](https://www.python.org/) 3.6 or higher, and the following modules: +* [Python](https://www.python.org/) 3.7 or higher, and the following modules: * [BioPython](https://biopython.org/) version 1.7+ * [mpi4py](https://pypi.org/project/mpi4py/) * [SciPy](https://www.scipy.org/) version 1.5+ + * [rpy2](https://rpy2.github.io/doc/latest/html/index.html) version 3.5.13+ + * [psycopg2](https://pypi.org/project/psycopg2/) version 2.9.6+ * [minimap2](https://github.com/lh3/minimap2) version 2.1+ * [FastTree2](http://www.microbesonline.org/fasttree/) version 2.1.10+, compiled for [double precision](http://www.microbesonline.org/fasttree/#BranchLen) * [TreeTime](https://github.com/neherlab/treetime) version 0.7.5+ @@ -11,7 +13,6 @@ * [git](https://git-scm.com/) * [Node.js](https://nodejs.org/en/download/) version 18.0.0+ * [npm](https://docs.npmjs.com/about-npm-versions) -* [rpy2](https://rpy2.github.io/doc/latest/html/index.html) version 3.5.13+ * [R](https://cran.r-project.org/) (tested on 4.2.1) and the following packages: * [tidyquant](https://cran.r-project.org/web/packages/tidyquant/index.html) * [matrixStats](https://cran.rstudio.com/web/packages/matrixStats/index.html) @@ -27,6 +28,8 @@ If running locally (without dedicated GISAID feed): ## Installation +### Setup Back-End + * Navigate to the directory in your filesystem under which you want to install the `covizu` directory * Clone the repository: @@ -39,6 +42,18 @@ git clone https://github.com/PoonLab/covizu sudo python3 setup.py install # omit `sudo` if you don't have superuser privileges ``` +* Install PostgreSQL - [Database for Extracted Features](https://github.com/PoonLab/covizu/issues/485) + * [Download](https://www.postgresql.org/download/) and install PostgreSQL based on your operating system or pull the latest [docker image](https://hub.docker.com/_/postgres) + * The following environment variables must be defined: + * `POSTGRES_USER` - User name set up during installation of PostgreSQL + * `POSTGRES_PASSWORD` - Password for the given user name + * Optional environment variables: + * `POSTGRES_DB` - The desired database name to use for the analysis. The default database name is `gisaid_db` + * `POSTGRES_HOST` - The host name where PostgreSQL is running on. The default host is `localhost` + * `POSTGRES_PORT` - The port used by PostgreSQL. The default port number is `5432` + +### Setup Front-End + If you're running the server (front-end) for the first time, obtain the following data files from our main server at https://filogeneti.ca/covizu/data/: * `timetree.nwk` @@ -90,12 +105,13 @@ Finally, run the following commands: Once you launch the local webserver with `npm start`, allow up to a minute for the server to initialize and then navigate your browser to `localhost:8001`. +### Run Front-End Unit Tests + To run the end-to-end tests with [Cypress](http://cypress.io) start the test server ``` npm test ``` and in a new terminal terminal window run ``` -npx cypress run -``` - +npx cypress run --config-file "config/cypress.config.js" +``` \ No newline at end of file diff --git a/batch.py b/batch.py index 933edb6..3687599 100644 --- a/batch.py +++ b/batch.py @@ -10,7 +10,10 @@ from covizu.utils.batch_utils import * from covizu.utils.seq_utils import SC2Locator from tempfile import NamedTemporaryFile - +import psycopg2 +import psycopg2.extras +from psycopg2 import sql +from psycopg2.errors import DuplicateDatabase def parse_args(): parser = argparse.ArgumentParser(description="CoVizu analysis pipeline automation") @@ -95,6 +98,18 @@ def parse_args(): parser.add_argument("--boot-cutoff", type=float, default=0.5, help="Bootstrap cutoff for consensus tree (default 0.5). " "Only used if --cons is specified.") + + parser.add_argument('--dbname', type=str, default=os.environ.get("POSTGRES_DB", "gisaid_db"), + help="Postgresql database name") + parser.add_argument('--dbhost', type=str, default=os.environ.get("POSTGRES_HOST", "localhost"), + help="Postgresql database host address") + parser.add_argument('--dbport', type=str, default=os.environ.get("POSTGRES_PORT", "5432"), + help="Connection to port number") + parser.add_argument('--dbuser', type=str, default=os.environ.get("POSTGRES_USER", None), + help="Postgresl user") + parser.add_argument('--dbpswd', type=str, default=os.environ.get("POSTGRES_PASSWORD", None), + help="Postgresl password") + parser.add_argument("--dry-run", action="store_true", help="Do not upload output files to webserver.") @@ -102,13 +117,34 @@ def parse_args(): return parser.parse_args() -def process_feed(args, callback=None): +def open_connection(connection_parameters): + """ open connection to database, initialize tables if they don't exist + :out: + :cursor: interactive sql object containing tables + """ + conn = psycopg2.connect(**connection_parameters) + cur = conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) + + # create tables if they don't exist + seqs_table = '''CREATE TABLE IF NOT EXISTS SEQUENCES (accession VARCHAR(255) + PRIMARY KEY, qname VARCHAR(255), lineage VARCHAR(255), + date VARCHAR(255), location VARCHAR(255), + diffs VARCHAR, missing VARCHAR)''' + cur.execute(seqs_table) + + cur.execute('''CREATE INDEX IF NOT EXISTS qname_index ON SEQUENCES (qname)''') + + conn.commit() + return cur, conn + + +def process_feed(args, cur, callback=None): """ Process feed data """ if callback: callback("Processing GISAID feed data") loader = gisaid_utils.load_gisaid(args.infile, minlen=args.minlen, mindate=args.mindate) - batcher = gisaid_utils.batch_fasta(loader, size=args.batchsize) - aligned = gisaid_utils.extract_features(batcher, ref_file=args.ref, binpath=args.mmbin, + batcher = gisaid_utils.batch_fasta(loader, cur, size=args.batchsize) + aligned = gisaid_utils.extract_features(batcher, ref_file=args.ref, cur=cur, binpath=args.mmbin, nthread=args.mmthreads, minlen=args.minlen) filtered = gisaid_utils.filter_problematic(aligned, vcf_file=args.vcf, cutoff=args.poisson_cutoff, callback=callback) @@ -119,6 +155,37 @@ def process_feed(args, callback=None): args = parse_args() cb = Callback() + # Check if database exists + connection_parameters = { + "host": args.dbhost, + "port": args.dbport, + "user": args.dbuser, + "password": args.dbpswd, + } + + connection = None + try: + connection = psycopg2.connect(**connection_parameters) + connection.autocommit = True + + cursor = connection.cursor() + cursor.execute(sql.SQL('CREATE DATABASE {}').format(sql.Identifier(args.dbname))) + cb.callback("Database {} created successfully.".format(args.dbname)) + + + except DuplicateDatabase: + cb.callback("Database {} already exists.".format(args.dbname)) + except psycopg2.Error as e: + cb.callback("Error initiating connection to database: {}".format(e)) + sys.exit() + finally: + if connection is not None: + cursor.close() + connection.close() + + connection_parameters['dbname'] = args.dbname + cur, conn = open_connection(connection_parameters) + # check that user has loaded openmpi module try: subprocess.check_call(['mpirun', '-np', '2', 'ls'], stdout=subprocess.DEVNULL) @@ -147,7 +214,10 @@ def process_feed(args, callback=None): args.infile = gisaid_utils.download_feed(args.url, args.user, args.password) # filter data, align genomes, extract features, sort by lineage - by_lineage = process_feed(args, cb.callback) + by_lineage = process_feed(args, cur, cb.callback) + + # calling commit immediately after db transactions + conn.commit() # separate XBB and other recombinant lineages aliases = parse_alias(args.alias) diff --git a/covizu/minimap2.py b/covizu/minimap2.py index 2488b4a..9219eba 100644 --- a/covizu/minimap2.py +++ b/covizu/minimap2.py @@ -6,7 +6,7 @@ import json import covizu -import covizu.utils.gisaid_utils +from covizu.utils import gisaid_utils def apply_cigar(seq, rpos, cigar): diff --git a/covizu/treetime.py b/covizu/treetime.py index 9a83521..9fccbdb 100644 --- a/covizu/treetime.py +++ b/covizu/treetime.py @@ -157,8 +157,12 @@ def parse_nexus(nexus_file, fasta, callback=None): # normalize residuals and append to tip labels rvals = residuals.values() - rmean = statistics.mean(rvals) - rstdev = statistics.stdev(rvals) + try: + rmean = statistics.mean(rvals) + rstdev = statistics.stdev(rvals) + except statistics.StatisticsError: + callback("Provided records are already stored.") + for tip, resid in residuals.items(): residuals[tip] = (resid-rmean) / rstdev diff --git a/covizu/utils/batch_utils.py b/covizu/utils/batch_utils.py index e0c4222..4713000 100644 --- a/covizu/utils/batch_utils.py +++ b/covizu/utils/batch_utils.py @@ -84,7 +84,7 @@ def build_timetree(by_lineage, args, callback=None): clock=args.clock, verbosity=0) # writes output to treetime.nwk at `nexus_file` path - return covizu.treetime.parse_nexus(nexus_file, fasta) + return covizu.treetime.parse_nexus(nexus_file, fasta, callback) def beadplot_serial(lineage, features, args, callback=None): # pragma: no cover @@ -393,7 +393,7 @@ def make_beadplots(by_lineage, args, callback=None, t0=None, txtfile='minor_line for lineage in recoded: # import trees lineage_name = lineage.replace('/', '_') # issue #297 - outfile = open('data/{}.nwk'.format(lineage_name)) + outfile = open('{}/{}.nwk'.format(args.outdir, lineage_name)) trees = Phylo.parse(outfile, 'newick') # note this returns a generator label_dict = recoded[lineage]['labels'] diff --git a/covizu/utils/gisaid_utils.py b/covizu/utils/gisaid_utils.py index 8f9ceb5..bb45213 100644 --- a/covizu/utils/gisaid_utils.py +++ b/covizu/utils/gisaid_utils.py @@ -102,7 +102,7 @@ def load_gisaid(path, minlen=29000, mindate='2019-12-01', callback=None, " {nonhuman} non-human genomes".format(**rejects)) -def batch_fasta(gen, size=100): +def batch_fasta(gen, cur, size=100): """ Concatenate sequence records in stream into FASTA-formatted text in batches of records. @@ -112,10 +112,22 @@ def batch_fasta(gen, size=100): """ stdin = '' batch = [] + for i, record in enumerate(gen, 1): qname = record['covv_virus_name'] sequence = record.pop('sequence') - stdin += '>{}\n{}\n'.format(qname, sequence) + + cur.execute("SELECT * FROM SEQUENCES WHERE qname = '%s'"%qname) + result = cur.fetchone() + if result: + # reading old records from database + # and handling list to tuple conversion + record.update({ + 'diffs': list(map(tuple, json.loads(result["diffs"]))), + 'missing': list(map(tuple, json.loads(result["missing"]))) + }) + else: + stdin += '>{}\n{}\n'.format(qname, sequence) batch.append(record) if i > 0 and i % size == 0: yield stdin, batch @@ -126,7 +138,7 @@ def batch_fasta(gen, size=100): yield stdin, batch -def extract_features(batcher, ref_file, binpath='minimap2', nthread=3, minlen=29000): +def extract_features(batcher, ref_file, cur, binpath='minimap2', nthread=3, minlen=29000): """ Stream output from JSON.xz file via load_gisaid() into minimap2 via subprocess. @@ -143,13 +155,28 @@ def extract_features(batcher, ref_file, binpath='minimap2', nthread=3, minlen=29 reflen = len(convert_fasta(handle)[0][1]) for fasta, batch in batcher: + new_records = {} + for record in batch: + if 'diffs' in record: + yield record + else: + new_records[record['covv_virus_name']] = record + + # If fasta is empty, no need to run minimap2 + if len(fasta) == 0: + continue + mm2 = minimap2.minimap2(fasta, ref_file, stream=True, path=binpath, nthread=nthread, minlen=minlen) result = list(minimap2.encode_diffs(mm2, reflen=reflen)) - for row, record in zip(result, batch): + for qname, diffs, missing in result: # reconcile minimap2 output with GISAID record - qname, diffs, missing = row + record = new_records[qname] record.update({'diffs': diffs, 'missing': missing}) + + # inserting diffs and missing as json strings + cur.execute("INSERT INTO SEQUENCES VALUES(%s, %s, %s, %s, %s, %s, %s)", + [json.dumps(v) if k in ['diffs', 'missing'] else v for k, v in record.items()]) yield record @@ -238,6 +265,7 @@ def sort_by_lineage(records, callback=None, interval=10000): :return: dict, lists of records keyed by lineage """ result = {} + for i, record in enumerate(records): if callback and i % interval == 0: callback('aligned {} records'.format(i))