Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

112 make fastadir thread safe #118

Merged
merged 3 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions misc/threading-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Threading Tests

This directory contains seqrepo tests for file descriptor exhaustion, especially in threading context
The idea: make it easy to test threading and cache size combinations.


See https://github.com/biocommons/biocommons.seqrepo/issues/112



## Examples

### single thread, without fd caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1
2023-09-13 15:25:56 snafu biocommons.seqrepo.fastadir.fastadir[2274974] INFO File descriptor caching disabled
2023-09-13 15:25:57 snafu root[2274974] INFO Queued 1000 accessions
2023-09-13 15:25:57 snafu root[2274974] INFO Starting run with 1 threads
2023-09-13 15:26:01 snafu root[2274974] INFO <Worker(Thread-1, started 139822207334080)>: Done; processed 1000 accessions
2023-09-13 15:26:01 snafu root[2274974] INFO Fetched 1000 sequences in 4.281685499 s with 1 threads; 234 seq/sec
```

### single thread, with fd caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100
2023-09-13 15:26:07 snafu biocommons.seqrepo.fastadir.fastadir[2275006] WARNING File descriptor caching enabled (size=100)
2023-09-13 15:26:08 snafu root[2275006] INFO Queued 1000 accessions
2023-09-13 15:26:08 snafu root[2275006] INFO Starting run with 1 threads
2023-09-13 15:26:08 snafu root[2275006] INFO <Worker(Thread-1, started 140250961671872)>: Done; processed 1000 accessions
2023-09-13 15:26:08 snafu root[2275006] INFO Fetched 1000 sequences in 0.41264548700000003 s with 1 threads; 2423 seq/sec
CacheInfo(hits=934, misses=66, maxsize=100, currsize=66)
```

### five threads, without caching

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5
2023-09-13 15:26:16 snafu biocommons.seqrepo.fastadir.fastadir[2275039] INFO File descriptor caching disabled
2023-09-13 15:26:17 snafu root[2275039] INFO Queued 1000 accessions
2023-09-13 15:26:17 snafu root[2275039] INFO Starting run with 5 threads
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-5, started 139965979674304)>: Done; processed 197 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-3, started 139965996459712)>: Done; processed 200 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-4, started 139965988067008)>: Done; processed 210 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-2, started 139966004852416)>: Done; processed 198 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO <Worker(Thread-1, started 139966088738496)>: Done; processed 195 accessions
2023-09-13 15:26:19 snafu root[2275039] INFO Fetched 1000 sequences in 5.946146807 s with 5 threads; 168 seq/sec
```

### five threads, with caching :-(

```
snafu$ ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 5 -f 10
2023-09-13 15:26:32 snafu biocommons.seqrepo.fastadir.fastadir[2275104] WARNING File descriptor caching enabled (size=10)
2023-09-13 15:26:33 snafu root[2275104] INFO Queued 1000 accessions
2023-09-13 15:26:33 snafu root[2275104] INFO Starting run with 5 threads
[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back
[E::fai_retrieve] Failed to retrieve block. (Seeking in a compressed, .gzi unindexed, file?)
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
self.run()
```


### 1 thread, cache_size < # available fds

Same as above successful run, but Limit the process to 50 open file descriptors causes failure

```
snafu$ (ulimit -n 50; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -m 1000 -n 1 -f 100)
2023-09-13 15:31:21 snafu biocommons.seqrepo.fastadir.fastadir[2275776] WARNING File descriptor caching enabled (size=100)
2023-09-13 15:31:21 snafu root[2275776] INFO Queued 1000 accessions
2023-09-13 15:31:21 snafu root[2275776] INFO Starting run with 1 threads
[E::fai_load3_core] Failed to open FASTA index /usr/local/share/seqrepo/2021-01-29/sequences/2020/0412/1420/1586701238.5306098.fa.bgz.gzi: Too many open files
Exception in thread Thread-1:
Traceback (most recent call last):
```


## Other useful commands

```
# dynamic (/2s) list of open files in seqrepo instance directory
watch lsof +D '/usr/local/share/seqrepo/'

# arbitrarily
(ulimit -n 200; ./threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a archive/accessions.gz -f 128)
```
83 changes: 83 additions & 0 deletions misc/threading-tests/threading-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3
"""test seqrepo for file descriptor exhaustion, especially in threading context

https://github.com/biocommons/biocommons.seqrepo/issues/112

The idea: read a bunch of NMs on stdin. Fetch the sequence for each in a threading context.

"""

import argparse
import logging
import queue
import pathlib
import random
import threading
import time

from smart_open import open

from biocommons.seqrepo import SeqRepo

_logger = logging.getLogger()


class Worker(threading.Thread):
def __init__(self, q: queue.Queue, sr: SeqRepo):
self.q = q
self.sr = sr
self.n = 0
super().__init__()

def run(self):
try:
while True:
ac = self.q.get(False)
sr.fetch(ac, 0, 5)
self.q.task_done()
self.n += 1
except queue.Empty:
_logger.info(f"{self}: Done; processed {self.n} accessions")
return


def parse_args():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("-n", "--n-threads", type=int, default=1)
ap.add_argument("-s", "--seqrepo-path", type=pathlib.Path, required=True)
ap.add_argument("-m", "--max-accessions", type=int)
ap.add_argument("-f", "--fd-cache-size", type=int, default=0)
opts = ap.parse_args()
return opts

if __name__ == "__main__":
import coloredlogs
import sys

coloredlogs.install(level="INFO")

opts = parse_args()

sr = SeqRepo(opts.seqrepo_path, fd_cache_size=opts.fd_cache_size)

acs = set(a["alias"] for a in sr.aliases.find_aliases(namespace="RefSeq", alias="NM_%"))
acs = random.sample(sorted(acs), opts.max_accessions or len(acs))
q = queue.Queue()
for ac in acs:
q.put(ac)
qs = q.qsize()
_logger.info(f"Queued {qs} accessions")

_logger.info(f"Starting run with {opts.n_threads} threads")
t0 = time.process_time()
for _ in range(opts.n_threads):
Worker(q=q, sr=sr).start()
q.join()
t1 = time.process_time()
td = t1 - t0
rate = float(qs) / td
_logger.info(f"Fetched {qs} sequences in {td} s with {opts.n_threads} threads; {rate:.0f} seq/sec")

if hasattr(sr.sequences._open_for_reading, "cache_info"):
print(sr.sequences._open_for_reading.cache_info())

13 changes: 13 additions & 0 deletions src/biocommons/seqrepo/fastadir/fabgz.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import shutil
import stat
import subprocess
import threading

import six
from pysam import FastaFile
Expand Down Expand Up @@ -70,12 +71,24 @@ def _find_bgzip():


class FabgzReader(object):
"""
Class that implements ContextManager and wraps a FabgzReader.
The FabgzReader is returned when acquired in a contextmanager with statement.
"""
def __init__(self, filename):
self.lock = threading.Lock()
self._fh = FastaFile(filename)

def __del__(self):
self._fh.close()

def __enter__(self):
self.lock.acquire()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()

def fetch(self, seq_id, start=None, end=None):
return self._fh.fetch(seq_id.encode("ascii"), start, end)

Expand Down
45 changes: 13 additions & 32 deletions src/biocommons/seqrepo/fastadir/fastadir.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,6 @@
expected_schema_version = 1


class LockableFabgzReader(contextlib.AbstractContextManager):
"""
Class that implements ContextManager and wraps a FabgzReader.
The FabgzReader is returned when acquired in a contextmanager with statement.
"""

def __init__(self, path):
self.lock = threading.Lock()
self.fabgz_reader = FabgzReader(path)

def __enter__(self):
self.lock.acquire()
return self.fabgz_reader

def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()


class FastaDir(BaseReader, BaseWriter):
"""This class provides simple a simple key-value interface to a
directory of compressed fasta files.
Expand All @@ -70,7 +52,7 @@ class FastaDir(BaseReader, BaseWriter):

"""

def __init__(self, root_dir, writeable=False, check_same_thread=True):
def __init__(self, root_dir, writeable=False, check_same_thread=True, fd_cache_size=0):
"""Creates a new sequence repository if necessary, and then opens it"""

self._root_dir = root_dir
Expand Down Expand Up @@ -99,6 +81,18 @@ def __init__(self, root_dir, writeable=False, check_same_thread=True):
schema_version, expected_schema_version
)
)

if fd_cache_size == 0:
_logger.info(f"File descriptor caching disabled")
def _open_for_reading(path):
_logger.debug("Opening for reading uncached: " + path)
return FabgzReader(path)
else:
_logger.warning(f"File descriptor caching enabled (size={fd_cache_size})")
@functools.lru_cache(maxsize=fd_cache_size)
def _open_for_reading(path):
return FabgzReader(path)
self._open_for_reading = _open_for_reading

def __del__(self):
self._db.close()
Expand Down Expand Up @@ -238,19 +232,6 @@ def _upgrade_db(self):
migrations_to_apply = backend.to_apply(migrations)
backend.apply_migrations(migrations_to_apply)

@functools.lru_cache()
def _open_for_reading(self, path):
"""
Opens a FabgzReader to path, wraps in a LockableFabgzReader for use in context managers.
Places it in an LRU cache so file is only opened once per FastaDir object. Caller must
lock the LockableFabgzReader or otherwise handle concurrent access if sharing between
in-process concurrent execution threads, such as asyncio (e.g. WSGI/ASGI web servers)
"""
_logger.debug("Opening for reading: %s", path)
if not os.path.exists(path):
_logger.error("_open_for_reading path does not exist: %s", path)
return LockableFabgzReader(path)

def _dump_aliases(self):
import prettytable

Expand Down
2 changes: 2 additions & 0 deletions src/biocommons/seqrepo/seqrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(
translate_ncbi_namespace=None,
check_same_thread=False,
use_sequenceproxy=True,
fd_cache_size=0
):
self._root_dir = root_dir
self._upcase = upcase
Expand All @@ -122,6 +123,7 @@ def __init__(
self._seq_path,
writeable=self._writeable,
check_same_thread=self._check_same_thread,
fd_cache_size=fd_cache_size
)
self.aliases = SeqAliasDB(
self._db_path,
Expand Down