Skip to content

Commit

Permalink
Updated scoring model schema (#1513)
Browse files Browse the repository at this point in the history
* [ScoringModel] added archived, default and created attributes

* [MetadataEditor] showing new ML models

* [webapp] mocked websocket for tests

* [AccountDialogHandler] waiting route loading to check dialog open

* [UploadPage] adedd extra loggin to test on staging

* [UploadPage] fixed first login check query

* [UploadPage] hidding archived scoring models from option list

* [EditDataset] reprocessing dataset on scoring model selection change

* [EngineTests] updated ds config template

* [ScoringModelReadME] updated according to the PR1496

* [ScoringModel] removed is_default column

* [PythonClient] added scoring_model option to create and update dataset

* [PythonClient] reformatted sm_annotation_utils with blacj

* [GenerateDsConfig] changed scoring model schema to use scoring model id

* Updated scoring model README

* [PythonClient] changed  db_name to model_name on scoring model functions

* [Metadata] showing model version according to new fields

* [Navigation] recovering last navigation params

* [UploadPage] analysis version as required field
  • Loading branch information
lmacielvieira committed Apr 16, 2024
1 parent 563168b commit 7f165ed
Show file tree
Hide file tree
Showing 49 changed files with 580 additions and 220 deletions.
6 changes: 5 additions & 1 deletion docker/sm-engine/install-scoring-model.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#!/usr/bin/env bash

cd /opt/dev/metaspace/metaspace/engine

pip install -qr requirements.txt
pip install -e .

python -m scripts.import_scoring_model v3_default "../scoring-models/v3_default/v2.20230517_(METASPACE-ML).cbm" sm-engine-dev
python -m scripts.import_scoring_model "MSM" "v1" "original"
python -m scripts.import_scoring_model "Animal" "v2.2023-12-14" "catboost" --model="../scoring-models/models_default/v2.2023-12-14_(METASPACE-ML_Animal).cbm" --bucket="sm-engine-dev"
python -m scripts.import_scoring_model "Plant" "v2.2023-12-14" "catboost" --model="../scoring-models/models_default/v2.2023-12-14_(METASPACE-ML_Plant).cbm" --bucket="sm-engine-dev"
9 changes: 5 additions & 4 deletions metaspace/engine/scripts/db_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,16 @@ CREATE TABLE "public"."perf_profile_entry" (
CREATE TABLE "public"."scoring_model" (
"id" SERIAL NOT NULL,
"name" text NOT NULL,
"version" text NOT NULL,
"created_dt" TIMESTAMP NOT NULL,
"is_archived" boolean NOT NULL DEFAULT false,
"type" text NOT NULL,
"params" json NOT NULL,
CONSTRAINT "scoring_model_uindex" UNIQUE ("name",
"version"),
CONSTRAINT "PK_f4aafae7cbb3f34533cb9f932a6" PRIMARY KEY ("id")
);

CREATE UNIQUE INDEX "IDX_842e010e1dfd01fc0005c8ff8c" ON "public"."scoring_model" (
"name"
) ;

CREATE TABLE "graphql"."dataset" (
"id" text NOT NULL,
"user_id" uuid NOT NULL,
Expand Down
60 changes: 41 additions & 19 deletions metaspace/engine/scripts/import_scoring_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,47 @@
logger = logging.getLogger('engine')


def import_catboost_model(name: str, model: str, bucket: str, public: bool, overwrite: bool):
if not overwrite:
try:
load_scoring_model(name)
assert False, f'Scoring model with name {name} already exists'
except Exception:
pass

prefix = f'scoring_models/{name}'
logger.info('Uploading model')
params = upload_catboost_scoring_model(
model=model, bucket=bucket, prefix=prefix, is_public=public
)
def import_model(
name: str,
model: str,
bucket: str,
public: bool,
overwrite: bool,
version: str,
model_type: str,
):
params = None
if model_type == 'catboost':
if not overwrite:
try:
load_scoring_model(name, version)
assert False, f'Scoring model with name {name} already exists'
except Exception:
pass

prefix = f'scoring_models/{name}'
logger.info('Uploading model')
params = upload_catboost_scoring_model(
model=model, bucket=bucket, prefix=prefix, is_public=public
)

logger.info('Inserting model into DB')
save_scoring_model_to_db(name=name, type_='catboost', params=params)
save_scoring_model_to_db(name=name, type_=model_type, version=version, params=params)
logger.info('Done')


def main():
parser = argparse.ArgumentParser(description='Upload and import a .cbm CatBoost scoring model')
parser.add_argument('name', type=str, help='Name')
parser.add_argument('model', type=str, help='Path to a CBM model file.')
parser.add_argument('bucket', type=str, help='S3 or MinIO bucket to upload to')

parser.add_argument('version', type=str, help='Version')
parser.add_argument('model_type', type=str, help='Model type: catboost or original')
parser.add_argument('--model', type=str, default=None, help='Model type: catboost or original')
parser.add_argument(
'--bucket',
type=str,
default=None,
help='S3 or MinIO bucket to upload to. Optional for original models.',
)
parser.add_argument(
'--overwrite', action='store_true', help='Overwrite scoring model if it already exists'
)
Expand All @@ -49,13 +66,18 @@ def main():
args = parser.parse_args()

with GlobalInit(args.config_path):
assert Path(args.model).exists(), f'File "{args.model}" not found'
import_catboost_model(
if args.model_type == 'catboost':
assert Path(args.model).exists(), f'File "{args.model}" not found'
assert args.bucket is not None, f'Bucket "{args.bucket}" not found'

import_model(
name=args.name,
model=args.model,
bucket=args.bucket,
public=args.public,
overwrite=args.overwrite,
version=args.version,
model_type=args.model_type,
)


Expand Down
132 changes: 119 additions & 13 deletions metaspace/engine/sm/engine/annotation/scoring_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pandas as pd
from catboost import CatBoost

from sm.engine.errors import SMError
from sm.engine.storage import get_s3_client
from sm.engine.util import split_s3_path

Expand Down Expand Up @@ -80,6 +81,23 @@ def remove_uninteresting_features(target_df: pd.DataFrame, decoy_df: pd.DataFram


class ScoringModel:
"""Represents a scoring model to use as annotation base."""

# pylint: disable=redefined-builtin
def __init__(
self,
id: int = None,
name: str = None,
version: str = None,
type: str = None,
is_archived: bool = None,
):
self.id = id
self.name = name
self.version = version
self.type = type
self.is_archived = is_archived

def score(
self, target_df: pd.DataFrame, decoy_df: pd.DataFrame, decoy_ratio: float
) -> Tuple[pd.DataFrame, pd.DataFrame]:
Expand All @@ -88,9 +106,48 @@ def score(
would help explain the score."""
raise NotImplementedError()

def to_dict(self):
return {
'id': self.id,
'name': self.name,
'version': self.version,
'is_archived': self.is_archived,
}


def find_by_name_version(name: str, version: str) -> ScoringModel:
# Import DB locally so that Lithops doesn't try to pickle it & fail due to psycopg2
# pylint: disable=import-outside-toplevel # circular import
from sm.engine.db import DB

data = DB().select_one_with_fields(
'SELECT id, name, version, type FROM scoring_model WHERE name = %s AND version = %s',
params=(name, version),
)
if not data:
raise SMError(f'ScoringModel not found: {name}')
return ScoringModel(**data)


def find_by_id(id_: int) -> ScoringModel:
"""Find scoring model by id."""
# Import DB locally so that Lithops doesn't try to pickle it & fail due to psycopg2
# pylint: disable=import-outside-toplevel # circular import
from sm.engine.db import DB

data = DB().select_one_with_fields(
'SELECT id, name, version, type FROM scoring_model WHERE id = %s', params=(id_,)
)
if not data:
raise SMError(f'ScoringModel not found: {id_}')
return ScoringModel(**data)


class CatBoostScoringModel(ScoringModel):
def __init__(self, model_name: str, model: CatBoost, params: Dict):
def __init__(
self, model_name: str, model: CatBoost, params: Dict, id_: int, name: str, version: str
):
super().__init__(id_, name, version)
self.model_name = model_name
self.model = model
self.features = params['features']
Expand Down Expand Up @@ -136,17 +193,23 @@ def score(
return target_df, decoy_df


def load_scoring_model(name: Optional[str]) -> ScoringModel:
def load_scoring_model(name: Optional[str], version: Optional[str] = None) -> ScoringModel:
# Import DB locally so that Lithops doesn't try to pickle it & fail due to psycopg2
# pylint: disable=import-outside-toplevel # circular import
from sm.engine.db import DB

if name is None:
if name is None or version is None:
return MsmScoringModel()

row = DB().select_one("SELECT type, params FROM scoring_model WHERE name = %s", (name,))
assert row, f'Scoring model {name} not found'
type_, params = row
row = DB().select_one(
"SELECT type, params, id FROM scoring_model WHERE name = %s and version=%s",
(
name,
version,
),
)
assert row, f'Scoring model {name} {version} not found'
type_, params, id_ = row

if type_ == 'catboost':
bucket, key = split_s3_path(params['s3_path'])
Expand All @@ -157,7 +220,40 @@ def load_scoring_model(name: Optional[str]) -> ScoringModel:
model = CatBoost()
model.load_model(str(model_file), 'cbm')

return CatBoostScoringModel(name, model, params)
return CatBoostScoringModel(name, model, params, id_, name, version)
elif type_ == 'original':
return MsmScoringModel()
else:
raise ValueError(f'Unsupported scoring model type: {type_}')


def load_scoring_model_by_id(id_: Optional[int] = None) -> ScoringModel:
# Import DB locally so that Lithops doesn't try to pickle it & fail due to psycopg2
# pylint: disable=import-outside-toplevel # circular import
from sm.engine.db import DB

if id_ is None:
return MsmScoringModel()

row = DB().select_one(
"SELECT name, version, type, params, id FROM scoring_model WHERE id = %s",
(id_,),
)
assert row, f'Scoring model {id_} not found'
name, version, type_, params, id_ = row

if type_ == 'catboost':
bucket, key = split_s3_path(params['s3_path'])
with TemporaryDirectory() as tmpdir:
model_file = Path(tmpdir) / 'model.cbm'
with model_file.open('wb') as f:
f.write(get_s3_client().get_object(Bucket=bucket, Key=key)['Body'].read())
model = CatBoost()
model.load_model(str(model_file), 'cbm')

return CatBoostScoringModel(name, model, params, id_, name, version)
elif type_ == 'original':
return MsmScoringModel()
else:
raise ValueError(f'Unsupported scoring model type: {type_}')

Expand Down Expand Up @@ -235,7 +331,7 @@ def upload_catboost_scoring_model(
}


def save_scoring_model_to_db(name, type_, params):
def save_scoring_model_to_db(name, type_, version, params, created_dt=None):
"""Adds/updates the scoring_model in the local database"""
# Import DB locally so that Lithops doesn't try to pickle it & fail due to psycopg2
# pylint: disable=import-outside-toplevel # circular import
Expand All @@ -244,16 +340,26 @@ def save_scoring_model_to_db(name, type_, params):
if not isinstance(params, str):
params = json.dumps(params)

if not created_dt:
created_dt = datetime.utcnow()

db = DB()
if db.select_one('SELECT * FROM scoring_model WHERE name = %s', (name,)):
if db.select_one(
'SELECT * FROM scoring_model WHERE name = %s and version = %s',
(
name,
version,
),
):
logger.info(f'Updating existing scoring model {name}')
DB().alter(
'UPDATE scoring_model SET type = %s, params = %s WHERE name = %s',
(type_, params, name),
'UPDATE scoring_model SET type = %s, version = %s, ' ' params = %s WHERE name = %s',
(type_, version, params, name),
)
else:
logger.info(f'Inserting new scoring model {name}')
DB().alter(
'INSERT INTO scoring_model(name, type, params) VALUES (%s, %s, %s)',
(name, type_, params),
'INSERT INTO scoring_model(name, type, version, params, created_dt) '
' VALUES (%s, %s, %s, %s, %s)',
(name, type_, version, params, created_dt),
)
4 changes: 2 additions & 2 deletions metaspace/engine/sm/engine/annotation_lithops/run_fdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
from lithops.storage import Storage

from sm.engine.annotation.scoring_model import load_scoring_model
from sm.engine.annotation.scoring_model import load_scoring_model_by_id
from sm.engine.annotation_lithops.build_moldb import DbFDRData
from sm.engine.annotation_lithops.executor import Executor
from sm.engine.annotation_lithops.io import load_cobj, CObj
Expand Down Expand Up @@ -42,7 +42,7 @@ def _run_fdr_for_db(db_data_cobject: CObj[DbFDRData], *, storage: Storage):
return db_data['id'], results_df

logger.info('Estimating FDRs...')
scoring_model = load_scoring_model(ds_config['fdr'].get('scoring_model'))
scoring_model = load_scoring_model_by_id(ds_config['fdr'].get('scoring_model_id'))

args = [(db_data_cobj,) for db_data_cobj in db_data_cobjs]
results = executor.map(_run_fdr_for_db, args, runtime_memory=2048)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sm.engine.annotation.formula_centroids import CentroidsGenerator
from sm.engine.annotation.imzml_reader import FSImzMLReader
from sm.engine.annotation.isocalc_wrapper import IsocalcWrapper
from sm.engine.annotation.scoring_model import ScoringModel, load_scoring_model
from sm.engine.annotation.scoring_model import ScoringModel, load_scoring_model_by_id
from sm.engine.annotation_spark.formula_imager import create_process_segment
from sm.engine.annotation_spark.segmenter import (
calculate_centroids_segments_n,
Expand Down Expand Up @@ -300,7 +300,7 @@ def search(self) -> Iterable[Tuple[pd.DataFrame, pyspark.RDD, FdrDiagnosticBundl
"""
logger.info('Running molecule search')

scoring_model = load_scoring_model(self._ds_config['fdr'].get('scoring_model'))
scoring_model = load_scoring_model_by_id(self._ds_config['fdr'].get('scoring_model_id'))
self._perf.record_entry('loaded scoring model')

ds_segments = self.define_segments_and_segment_ds(ds_segm_size_mb=20)
Expand Down
Loading

0 comments on commit 7f165ed

Please sign in to comment.