Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iss485 - backend database #501

Merged
merged 14 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions INSTALL.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
## Dependencies

* [Python](https://www.python.org/) 3.6 or higher, and the following modules:
* [Python](https://www.python.org/) 3.7 or higher, and the following modules:
* [BioPython](https://biopython.org/) version 1.7+
* [mpi4py](https://pypi.org/project/mpi4py/)
* [SciPy](https://www.scipy.org/) version 1.5+
* [rpy2](https://rpy2.github.io/doc/latest/html/index.html) version 3.5.13+
* [psycopg2](https://pypi.org/project/psycopg2/) version 2.9.6+
* [minimap2](https://github.com/lh3/minimap2) version 2.1+
* [FastTree2](http://www.microbesonline.org/fasttree/) version 2.1.10+, compiled for [double precision](http://www.microbesonline.org/fasttree/#BranchLen)
* [TreeTime](https://github.com/neherlab/treetime) version 0.7.5+
* [RapidNJ](https://birc.au.dk/software/rapidnj/)
* [git](https://git-scm.com/)
* [Node.js](https://nodejs.org/en/download/) version 18.0.0+
* [npm](https://docs.npmjs.com/about-npm-versions)
* [rpy2](https://rpy2.github.io/doc/latest/html/index.html) version 3.5.13+
* [R](https://cran.r-project.org/) (tested on 4.2.1) and the following packages:
* [tidyquant](https://cran.r-project.org/web/packages/tidyquant/index.html)
* [matrixStats](https://cran.rstudio.com/web/packages/matrixStats/index.html)
Expand All @@ -27,6 +28,8 @@ If running locally (without dedicated GISAID feed):

## Installation

### Setup Back-End

* Navigate to the directory in your filesystem under which you want to install the `covizu` directory

* Clone the repository:
Expand All @@ -39,6 +42,18 @@ git clone https://github.com/PoonLab/covizu
sudo python3 setup.py install # omit `sudo` if you don't have superuser privileges
```

* Install PostgreSQL - [Database for Extracted Features](https://github.com/PoonLab/covizu/issues/485)
* [Download](https://www.postgresql.org/download/) and install PostgreSQL based on your operating system or pull the latest [docker image](https://hub.docker.com/_/postgres)
* The following environment variables must be defined:
* `POSTGRES_USER` - User name set up during installation of PostgreSQL
* `POSTGRES_PASSWORD` - Password for the given user name
* Optional environment variables:
* `POSTGRES_DB` - The desired database name to use for the analysis. The default database name is `gisaid_db`
* `POSTGRES_HOST` - The host name where PostgreSQL is running on. The default host is `localhost`
* `POSTGRES_PORT` - The port used by PostgreSQL. The default port number is `5432`

### Setup Front-End

If you're running the server (front-end) for the first time, obtain the following data files from our main server at
https://filogeneti.ca/covizu/data/:
* `timetree.nwk`
Expand Down Expand Up @@ -90,12 +105,13 @@ Finally, run the following commands:

Once you launch the local webserver with `npm start`, allow up to a minute for the server to initialize and then navigate your browser to `localhost:8001`.

### Run Front-End Unit Tests

To run the end-to-end tests with [Cypress](http://cypress.io) start the test server
```
npm test
```
and in a new terminal terminal window run
```
npx cypress run
```

npx cypress run --config-file "config/cypress.config.js"
```
80 changes: 75 additions & 5 deletions batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from covizu.utils.batch_utils import *
from covizu.utils.seq_utils import SC2Locator
from tempfile import NamedTemporaryFile

import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.errors import DuplicateDatabase

def parse_args():
parser = argparse.ArgumentParser(description="CoVizu analysis pipeline automation")
Expand Down Expand Up @@ -95,20 +98,53 @@ def parse_args():
parser.add_argument("--boot-cutoff", type=float, default=0.5,
help="Bootstrap cutoff for consensus tree (default 0.5). "
"Only used if --cons is specified.")

parser.add_argument('--dbname', type=str, default=os.environ.get("POSTGRES_DB", "gisaid_db"),
help="Postgresql database name")
parser.add_argument('--dbhost', type=str, default=os.environ.get("POSTGRES_HOST", "localhost"),
help="Postgresql database host address")
parser.add_argument('--dbport', type=str, default=os.environ.get("POSTGRES_PORT", "5432"),
help="Connection to port number")
parser.add_argument('--dbuser', type=str, default=os.environ.get("POSTGRES_USER", None),
help="Postgresl user")
parser.add_argument('--dbpswd', type=str, default=os.environ.get("POSTGRES_PASSWORD", None),
help="Postgresl password")


parser.add_argument("--dry-run", action="store_true",
help="Do not upload output files to webserver.")

return parser.parse_args()


def process_feed(args, callback=None):
def open_connection(connection_parameters):
""" open connection to database, initialize tables if they don't exist
:out:
:cursor: interactive sql object containing tables
"""
conn = psycopg2.connect(**connection_parameters)
cur = conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor)

# create tables if they don't exist
seqs_table = '''CREATE TABLE IF NOT EXISTS SEQUENCES (accession VARCHAR(255)
PRIMARY KEY, qname VARCHAR(255), lineage VARCHAR(255),
date VARCHAR(255), location VARCHAR(255),
diffs VARCHAR, missing VARCHAR)'''
cur.execute(seqs_table)

cur.execute('''CREATE INDEX IF NOT EXISTS qname_index ON SEQUENCES (qname)''')

conn.commit()
return cur, conn


def process_feed(args, cur, callback=None):
""" Process feed data """
if callback:
callback("Processing GISAID feed data")
loader = gisaid_utils.load_gisaid(args.infile, minlen=args.minlen, mindate=args.mindate)
batcher = gisaid_utils.batch_fasta(loader, size=args.batchsize)
aligned = gisaid_utils.extract_features(batcher, ref_file=args.ref, binpath=args.mmbin,
batcher = gisaid_utils.batch_fasta(loader, cur, size=args.batchsize)
aligned = gisaid_utils.extract_features(batcher, ref_file=args.ref, cur=cur, binpath=args.mmbin,
nthread=args.mmthreads, minlen=args.minlen)
filtered = gisaid_utils.filter_problematic(aligned, vcf_file=args.vcf, cutoff=args.poisson_cutoff,
callback=callback)
Expand All @@ -119,6 +155,37 @@ def process_feed(args, callback=None):
args = parse_args()
cb = Callback()

# Check if database exists
connection_parameters = {
"host": args.dbhost,
"port": args.dbport,
"user": args.dbuser,
"password": args.dbpswd,
}

connection = None
try:
connection = psycopg2.connect(**connection_parameters)
connection.autocommit = True

cursor = connection.cursor()
cursor.execute(sql.SQL('CREATE DATABASE {}').format(sql.Identifier(args.dbname)))
cb.callback("Database {} created successfully.".format(args.dbname))


except DuplicateDatabase:
cb.callback("Database {} already exists.".format(args.dbname))
except psycopg2.Error as e:
cb.callback("Error initiating connection to database: {}".format(e))
sys.exit()
finally:
if connection is not None:
cursor.close()
connection.close()

connection_parameters['dbname'] = args.dbname
cur, conn = open_connection(connection_parameters)

# check that user has loaded openmpi module
try:
subprocess.check_call(['mpirun', '-np', '2', 'ls'], stdout=subprocess.DEVNULL)
Expand Down Expand Up @@ -147,7 +214,10 @@ def process_feed(args, callback=None):
args.infile = gisaid_utils.download_feed(args.url, args.user, args.password)

# filter data, align genomes, extract features, sort by lineage
by_lineage = process_feed(args, cb.callback)
by_lineage = process_feed(args, cur, cb.callback)

# calling commit immediately after db transactions
conn.commit()

# separate XBB and other recombinant lineages
aliases = parse_alias(args.alias)
Expand Down
2 changes: 1 addition & 1 deletion covizu/minimap2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json

import covizu
import covizu.utils.gisaid_utils
from covizu.utils import gisaid_utils


def apply_cigar(seq, rpos, cigar):
Expand Down
8 changes: 6 additions & 2 deletions covizu/treetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ def parse_nexus(nexus_file, fasta, callback=None):

# normalize residuals and append to tip labels
rvals = residuals.values()
rmean = statistics.mean(rvals)
rstdev = statistics.stdev(rvals)
try:
rmean = statistics.mean(rvals)
rstdev = statistics.stdev(rvals)
except statistics.StatisticsError:
callback("Provided records are already stored.")

for tip, resid in residuals.items():
residuals[tip] = (resid-rmean) / rstdev

Expand Down
4 changes: 2 additions & 2 deletions covizu/utils/batch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def build_timetree(by_lineage, args, callback=None):
clock=args.clock, verbosity=0)

# writes output to treetime.nwk at `nexus_file` path
return covizu.treetime.parse_nexus(nexus_file, fasta)
return covizu.treetime.parse_nexus(nexus_file, fasta, callback)


def beadplot_serial(lineage, features, args, callback=None): # pragma: no cover
Expand Down Expand Up @@ -393,7 +393,7 @@ def make_beadplots(by_lineage, args, callback=None, t0=None, txtfile='minor_line
for lineage in recoded:
# import trees
lineage_name = lineage.replace('/', '_') # issue #297
outfile = open('data/{}.nwk'.format(lineage_name))
outfile = open('{}/{}.nwk'.format(args.outdir, lineage_name))
trees = Phylo.parse(outfile, 'newick') # note this returns a generator

label_dict = recoded[lineage]['labels']
Expand Down
38 changes: 33 additions & 5 deletions covizu/utils/gisaid_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def load_gisaid(path, minlen=29000, mindate='2019-12-01', callback=None,
" {nonhuman} non-human genomes".format(**rejects))


def batch_fasta(gen, size=100):
def batch_fasta(gen, cur, size=100):
"""
Concatenate sequence records in stream into FASTA-formatted text in batches of
<size> records.
Expand All @@ -112,10 +112,22 @@ def batch_fasta(gen, size=100):
"""
stdin = ''
batch = []

for i, record in enumerate(gen, 1):
qname = record['covv_virus_name']
sequence = record.pop('sequence')
stdin += '>{}\n{}\n'.format(qname, sequence)

cur.execute("SELECT * FROM SEQUENCES WHERE qname = '%s'"%qname)
result = cur.fetchone()
if result:
# reading old records from database
# and handling list to tuple conversion
record.update({
'diffs': list(map(tuple, json.loads(result["diffs"]))),
'missing': list(map(tuple, json.loads(result["missing"])))
})
else:
stdin += '>{}\n{}\n'.format(qname, sequence)
batch.append(record)
if i > 0 and i % size == 0:
yield stdin, batch
Expand All @@ -126,7 +138,7 @@ def batch_fasta(gen, size=100):
yield stdin, batch


def extract_features(batcher, ref_file, binpath='minimap2', nthread=3, minlen=29000):
def extract_features(batcher, ref_file, cur, binpath='minimap2', nthread=3, minlen=29000):
"""
Stream output from JSON.xz file via load_gisaid() into minimap2
via subprocess.
Expand All @@ -143,13 +155,28 @@ def extract_features(batcher, ref_file, binpath='minimap2', nthread=3, minlen=29
reflen = len(convert_fasta(handle)[0][1])

for fasta, batch in batcher:
new_records = {}
for record in batch:
if 'diffs' in record:
yield record
else:
new_records[record['covv_virus_name']] = record

# If fasta is empty, no need to run minimap2
if len(fasta) == 0:
continue

mm2 = minimap2.minimap2(fasta, ref_file, stream=True, path=binpath, nthread=nthread,
minlen=minlen)
result = list(minimap2.encode_diffs(mm2, reflen=reflen))
for row, record in zip(result, batch):
for qname, diffs, missing in result:
# reconcile minimap2 output with GISAID record
qname, diffs, missing = row
record = new_records[qname]
record.update({'diffs': diffs, 'missing': missing})

# inserting diffs and missing as json strings
cur.execute("INSERT INTO SEQUENCES VALUES(%s, %s, %s, %s, %s, %s, %s)",
[json.dumps(v) if k in ['diffs', 'missing'] else v for k, v in record.items()])
yield record


Expand Down Expand Up @@ -238,6 +265,7 @@ def sort_by_lineage(records, callback=None, interval=10000):
:return: dict, lists of records keyed by lineage
"""
result = {}

for i, record in enumerate(records):
if callback and i % interval == 0:
callback('aligned {} records'.format(i))
Expand Down
Loading