Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Rework connections

* Better cleaning of databases
  • Loading branch information
mmcauliffe committed Feb 9, 2023
1 parent 0e7ac69 commit a55a686
Show file tree
Hide file tree
Showing 35 changed files with 163 additions and 316 deletions.
7 changes: 7 additions & 0 deletions docs/source/changelog/changelog_2.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
2.1 Changelog
*************

2.1.3
=====

- Fixed a bug with intervals after the end of the sound file having negative duration (they are now not parsed)
- Fixed an issue where utterances were not properly assigned to the correct channels
- Modified the logic for connections to attempt to solve error with too many clients

2.1.2
=====

Expand Down
88 changes: 39 additions & 49 deletions montreal_forced_aligner/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@

import sqlalchemy
import yaml
from sqlalchemy.orm import Session

from montreal_forced_aligner.config import GLOBAL_CONFIG
from montreal_forced_aligner.db import CorpusWorkflow, MfaSqlBase
from montreal_forced_aligner.exceptions import KaldiProcessingError, MultiprocessingError
from montreal_forced_aligner.helper import comma_join, load_configuration, mfa_open

if TYPE_CHECKING:

from montreal_forced_aligner.data import MfaArguments, WorkflowType
from montreal_forced_aligner.db import CorpusWorkflow, MfaSqlBase

__all__ = [
"MfaModel",
Expand Down Expand Up @@ -71,7 +70,13 @@ def __init__(self, args: MfaArguments):

def run(self) -> typing.Generator:
"""Run the function, calls subclassed object's ``_run`` with error handling"""
self.db_engine = sqlalchemy.create_engine(self.db_string)
self.db_engine = sqlalchemy.create_engine(
self.db_string,
poolclass=sqlalchemy.NullPool,
isolation_level="AUTOCOMMIT",
logging_name=f"{type(self).__name__}_engine",
pool_reset_on_return=None,
).execution_options(logging_token=f"{type(self).__name__}_engine")
try:
yield from self._run()
except Exception:
Expand Down Expand Up @@ -207,11 +212,15 @@ def __init__(

self._db_engine = None
self._db_path = None
self._session = None
self.database_initialized = False

def initialize_database(self) -> None:
"""
Initialize the database with database schema
"""
if self.database_initialized:
return
retcode = subprocess.call(
[
"createdb",
Expand All @@ -227,41 +236,21 @@ def initialize_database(self) -> None:
conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS pg_trgm"))
conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"))
conn.commit()
self.database_initialized = True
if exist_check:
return
from montreal_forced_aligner.db import MfaSqlBase
if GLOBAL_CONFIG.current_profile.clean:
self.clean_working_directory()
with self.db_engine.connect() as conn:
for tbl in reversed(MfaSqlBase.metadata.sorted_tables):
conn.execute(tbl.delete())
conn.commit()
else:
return

os.makedirs(self.output_directory, exist_ok=True)

MfaSqlBase.metadata.create_all(self.db_engine)

def remove_database(self):
if getattr(self, "_session", None) is not None:
try:
self._session.commit()
except Exception:
self._session.rollback()
finally:
self._session.close()
self._session = None
if getattr(self, "_db_engine", None) is not None:
self._db_engine.dispose()
self._db_engine = None
time.sleep(1)
try:
subprocess.call(
[
"dropdb",
f"--port={GLOBAL_CONFIG.current_profile.database_port}",
"-f",
self.identifier,
],
stderr=None if GLOBAL_CONFIG.current_profile.verbose else subprocess.DEVNULL,
stdout=None if GLOBAL_CONFIG.current_profile.verbose else subprocess.DEVNULL,
)
except Exception:
pass

@property
def db_engine(self) -> sqlalchemy.engine.Engine:
"""Database engine"""
Expand Down Expand Up @@ -329,7 +318,7 @@ def db_string(self):
"""Connection string for the database"""
return f"postgresql+psycopg2://localhost:{GLOBAL_CONFIG.current_profile.database_port}/{self.identifier}"

def construct_engine(self, read_only=False, **kwargs) -> sqlalchemy.engine.Engine:
def construct_engine(self, **kwargs) -> sqlalchemy.engine.Engine:
"""
Construct a database engine
Expand All @@ -345,9 +334,17 @@ def construct_engine(self, read_only=False, **kwargs) -> sqlalchemy.engine.Engin
:class:`~sqlalchemy.engine.Engine`
SqlAlchemy engine
"""
return sqlalchemy.create_engine(self.db_string, future=True, **kwargs)
e = sqlalchemy.create_engine(
self.db_string,
poolclass=sqlalchemy.NullPool,
logging_name="main_process_engine",
**kwargs,
).execution_options(logging_token="main_process_engine")

return e

def session(self, **kwargs) -> Session:
@property
def session(self, **kwargs) -> sqlalchemy.orm.sessionmaker:
"""
Construct database session
Expand All @@ -358,11 +355,12 @@ def session(self, **kwargs) -> Session:
Returns
-------
:class:`~sqlalchemy.orm.session.Session`
:class:`~sqlalchemy.orm.sessionmaker`
SqlAlchemy session
"""
autoflush = kwargs.pop("autoflush", False)
return sqlalchemy.orm.Session(self.db_engine, autoflush=autoflush, **kwargs)
if self._session is None:
self._session = sqlalchemy.orm.sessionmaker(bind=self.db_engine, **kwargs)
return self._session


class MfaWorker(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -517,10 +515,6 @@ def __del__(self):
def setup(self) -> None:
"""Setup for worker"""
self.check_previous_run()
if GLOBAL_CONFIG.clean:
self.clean_working_directory()
if hasattr(self, "remove_database"):
self.remove_database()
if hasattr(self, "initialize_database"):
self.initialize_database()

Expand Down Expand Up @@ -622,15 +616,11 @@ def cleanup(self) -> None:
"""
try:
if getattr(self, "_session", None) is not None:
try:
self._session.commit()
except Exception:
self._session.rollback()
finally:
self._session.close()
del self._session
self._session = None

if getattr(self, "_db_engine", None) is not None:
self._db_engine.dispose()
del self._db_engine
self._db_engine = None
if self.dirty:
logger.error("There was an error in the run, please see the log.")
Expand Down
1 change: 0 additions & 1 deletion montreal_forced_aligner/alignment/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,6 @@ def export_textgrids(
)
exported_file_count = Counter()
export_procs = []
self.db_engine.dispose()
for j in range(len(self.jobs)):
export_proc = ExportTextGridProcessWorker(
self.db_string,
Expand Down
8 changes: 7 additions & 1 deletion montreal_forced_aligner/alignment/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2381,7 +2381,13 @@ def __init__(

def run(self) -> None:
"""Run the exporter function"""
db_engine = sqlalchemy.create_engine(self.db_string)
db_engine = sqlalchemy.create_engine(
self.db_string,
poolclass=sqlalchemy.NullPool,
pool_reset_on_return=None,
logging_name=f"{type(self).__name__}_engine",
isolation_level="AUTOCOMMIT",
).execution_options(logging_token=f"{type(self).__name__}_engine")
with mfa_open(self.log_path, "w") as log_file, Session(db_engine) as session:
workflow: CorpusWorkflow = (
session.query(CorpusWorkflow)
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/adapt.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ def adapt_model_cli(context, **kwargs) -> None:
acoustic_model_path=acoustic_model_path,
**AdaptingAligner.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
adapter.clean_working_directory()
adapter.remove_database()

try:
adapter.adapt()
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ def align_corpus_cli(context, **kwargs) -> None:
acoustic_model_path=acoustic_model_path,
**PretrainedAligner.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
aligner.clean_working_directory()
aligner.remove_database()
try:
aligner.align()
if aligner.use_phone_model:
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/create_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ def create_segments_cli(context, **kwargs) -> None:
corpus_directory=corpus_directory,
**Segmenter.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
segmenter.clean_working_directory()
segmenter.remove_database()
try:
segmenter.segment()
segmenter.export_files(output_directory, output_format)
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/diarize_speakers.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ def diarize_speakers_cli(context, **kwargs) -> None:
ivector_extractor_path=ivector_extractor_path,
**SpeakerDiarizer.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
classifier.clean_working_directory()
classifier.remove_database()
try:
if classify:
classifier.classify_speakers()
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/g2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ def g2p_cli(context, **kwargs) -> None:
g2p_model_path=g2p_model_path,
**PyniniWordListGenerator.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
g2p.clean_working_directory()
g2p.remove_database()

try:
g2p.setup()
Expand Down
24 changes: 19 additions & 5 deletions montreal_forced_aligner/command_line/mfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def history_save_handler(self) -> None:
name="mfa",
help="Montreal Forced Aligner is a command line utility for aligning speech and text.",
)
def mfa_cli() -> None:
@click.pass_context
def mfa_cli(ctx: click.Context) -> None:
"""
Main function for the MFA command line interface
"""
Expand All @@ -108,16 +109,28 @@ def mfa_cli() -> None:
warnings.simplefilter("ignore")
configure_logger("mfa")
check_third_party()

hooks = ExitHooks()
hooks.hook()
atexit.register(hooks.history_save_handler)
if ctx.invoked_subcommand != "anchor":
hooks = ExitHooks()
hooks.hook()
atexit.register(hooks.history_save_handler)
from colorama import init

init()
mp.freeze_support()


@click.command(
name="version",
short_help="Show version of MFA",
)
def version_cli():
try:
from montreal_forced_aligner._version import version
except ImportError:
version = None
print(version)


mfa_cli.add_command(adapt_model_cli)
mfa_cli.add_command(align_corpus_cli)
mfa_cli.add_command(anchor_cli)
Expand All @@ -136,6 +149,7 @@ def mfa_cli() -> None:
mfa_cli.add_command(transcribe_corpus_cli)
mfa_cli.add_command(validate_corpus_cli)
mfa_cli.add_command(validate_dictionary_cli)
mfa_cli.add_command(version_cli)

if __name__ == "__main__":
mfa_cli()
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/train_acoustic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ def train_acoustic_model_cli(context, **kwargs) -> None:
dictionary_path=dictionary_path,
**TrainableAligner.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
trainer.clean_working_directory()
trainer.remove_database()
try:
trainer.train()
if output_model_path is not None:
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/train_dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ def train_dictionary_cli(context, **kwargs) -> None:
dictionary_path=dictionary_path,
**DictionaryTrainer.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
trainer.clean_working_directory()
trainer.remove_database()

try:
trainer.align()
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/train_g2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ def train_g2p_cli(context, **kwargs) -> None:
dictionary_path=dictionary_path,
**PyniniTrainer.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
trainer.clean_working_directory()
trainer.remove_database()

try:
trainer.setup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ def train_ivector_cli(context, **kwargs) -> None:
corpus_directory=corpus_directory,
**TrainableIvectorExtractor.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
trainer.clean_working_directory()
trainer.remove_database()

try:

Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/train_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ def train_lm_cli(context, **kwargs) -> None:
arpa_path=source_path,
**MfaLmArpaTrainer.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
trainer.clean_working_directory()
trainer.remove_database()

try:
trainer.setup()
Expand Down
3 changes: 0 additions & 3 deletions montreal_forced_aligner/command_line/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ def transcribe_corpus_cli(context, **kwargs) -> None:
language_model_path=language_model_path,
**Transcriber.parse_parameters(config_path, context.params, context.args),
)
if kwargs.get("clean", False):
transcriber.clean_working_directory()
transcriber.remove_database()
try:
transcriber.setup()
transcriber.transcribe()
Expand Down
Loading

0 comments on commit a55a686

Please sign in to comment.