Skip to content

Commit

Permalink
Update how we handle mlflow dependency when dealing with models. (mlf…
Browse files Browse the repository at this point in the history
…low#1308)

* MLflow is now a default dependency for all models. Do not pip install mlflow automatically in predict and serve by default. Predict and serve cal into cli in side the environment.

* Minor fix.

* instead of using cli, use internal functions mirroring the cli. Updated tests.

* Addressed linter complaints.

* Addressed linter complaints.

* Addressed linter complaints.

* Updated comment.

* Nit.

* Fix.

* Fix.

* Lint.

* Fixes.

* Moved models/test_cli back to small tests to run on Windows.

* Addressed comments.

* Fixes.

* Fixed winwodws and python2 tests, added extra test to verify we do fail politely in case there are no flavors to dploy.

* Addressed review comments.
  • Loading branch information
tomasatdatabricks committed May 23, 2019
1 parent 156ac19 commit 8e820da
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 128 deletions.
27 changes: 16 additions & 11 deletions mlflow/models/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ def commands():
@click.option("--port", "-p", default=5000, help="Server port. [default: 5000]")
@click.option("--host", "-h", default="127.0.0.1", help="Server host. [default: 127.0.0.1]")
@cli_args.NO_CONDA
def serve(model_uri, port, host, no_conda=False):
@cli_args.INSTALL_MLFLOW
def serve(model_uri, port, host, no_conda=False, install_mlflow=False):
"""
Serve a model saved with MLflow by launching a webserver on the specified host and port. For
information about the input data formats accepted by the webserver, see the following
documentation: https://www.mlflow.org/docs/latest/models.html#model-deployment.
"""
return _get_flavor_backend(model_uri, no_conda=no_conda).serve(model_uri=model_uri, port=port,
host=host)
return _get_flavor_backend(model_uri, no_conda=no_conda,
install_mlflow=install_mlflow).serve(model_uri=model_uri, port=port,
host=host)


@commands.command("predict")
Expand All @@ -54,27 +56,30 @@ def serve(model_uri, port, host, no_conda=False):
"https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json"
".html")
@cli_args.NO_CONDA
def predict(model_uri, input_path, output_path, content_type, json_format, no_conda):
@cli_args.INSTALL_MLFLOW
def predict(model_uri, input_path, output_path, content_type, json_format, no_conda,
install_mlflow):
"""
Generate predictions in json format using a saved MLflow model. For information about the input
data formats accepted by this function, see the following documentation:
https://www.mlflow.org/docs/latest/models.html#model-deployment.
"""
if content_type == "json" and json_format not in ("split", "records"):
raise Exception("Unsupported json format '{}'.".format(json_format))
return _get_flavor_backend(model_uri, no_conda=no_conda).predict(model_uri=model_uri,
input_path=input_path,
output_path=output_path,
content_type=content_type,
json_format=json_format)
return _get_flavor_backend(model_uri, no_conda=no_conda,
install_mlflow=install_mlflow).predict(model_uri=model_uri,
input_path=input_path,
output_path=output_path,
content_type=content_type,
json_format=json_format)


def _get_flavor_backend(model_uri, no_conda):
def _get_flavor_backend(model_uri, **kwargs):
with TempDir() as tmp:
local_path = _download_artifact_from_uri(posixpath.join(model_uri, "MLmodel"),
output_path=tmp.path())
model = Model.load(local_path)
flavor_name, flavor_backend = get_flavor_backend(model, no_conda=no_conda)
flavor_name, flavor_backend = get_flavor_backend(model, **kwargs)

_logger.info("Selected backend for flavor '%s'", flavor_name)
if flavor_backend is None:
Expand Down
2 changes: 1 addition & 1 deletion mlflow/models/flavor_backend_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ def get_flavor_backend(model, **kwargs):
backend = _flavor_backends[flavor_name](flavor_config, **kwargs)
if backend.can_score_model():
return flavor_name, backend
return None
return None, None
14 changes: 11 additions & 3 deletions mlflow/projects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ def _fetch_git_repo(uri, version, dst_dir):
repo.heads.master.checkout()


def _get_conda_env_name(conda_env_path):
def _get_conda_env_name(conda_env_path, env_id=None):
conda_env_contents = open(conda_env_path).read() if conda_env_path else ""
if env_id:
conda_env_contents += env_id
return "mlflow-%s" % hashlib.sha1(conda_env_contents.encode("utf-8")).hexdigest()


Expand All @@ -425,10 +427,16 @@ def _get_conda_bin_executable(executable_name):
return executable_name


def _get_or_create_conda_env(conda_env_path):
def _get_or_create_conda_env(conda_env_path, env_id=None):
"""
Given a `Project`, creates a conda environment containing the project's dependencies if such a
conda environment doesn't already exist. Returns the name of the conda environment.
:param conda_env_path: Path to a conda yaml file.
:param env_id: Optional string that is added to the contents of the yaml file before
calculating the hash. It can be used to distinguish environments that have the
same conda dependencies but are supposed to be different based on the context.
For example, when serving the model we may install additional dependencies to the
environment after the environment has been activated.
"""
conda_path = _get_conda_bin_executable("conda")
try:
Expand All @@ -442,7 +450,7 @@ def _get_or_create_conda_env(conda_env_path):
"executable".format(conda_path, MLFLOW_CONDA_HOME))
(_, stdout, _) = process.exec_cmd([conda_path, "env", "list", "--json"])
env_names = [os.path.basename(env) for env in json.loads(stdout)['envs']]
project_env_name = _get_conda_env_name(conda_env_path)
project_env_name = _get_conda_env_name(conda_env_path, env_id)
if project_env_name not in env_names:
_logger.info('=== Creating conda environment %s ===', project_env_name)
if conda_env_path:
Expand Down
103 changes: 69 additions & 34 deletions mlflow/pyfunc/backend.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,83 @@
import logging
import os

import subprocess

from mlflow.models import FlavorBackend
from mlflow.pyfunc import ENV

from mlflow.pyfunc import scoring_server
from mlflow.models import FlavorBackend

from mlflow.utils.file_utils import TempDir
from mlflow.projects import _get_or_create_conda_env, _get_conda_bin_executable
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.projects import _get_conda_bin_executable
from mlflow.utils.file_utils import TempDir
from mlflow.utils.file_utils import path_to_local_file_uri
from mlflow.version import VERSION

from six.moves import shlex_quote
_logger = logging.getLogger(__name__)


class PyFuncBackend(FlavorBackend):
"""
Flavor backend implementation for the generic python models.
"""

def __init__(self, config, no_conda=False, **kwargs):
def __init__(self, config, no_conda=False, install_mlflow=False, **kwargs):
super(PyFuncBackend, self).__init__(config=config, **kwargs)
self._no_conda = no_conda
self._install_mlflow = install_mlflow

def predict(self, model_uri, input_path, output_path, content_type, json_format):
def predict(self, model_uri, input_path, output_path, content_type, json_format, ):
"""
Generate predictions using generic python model saved with MLflow.
Return the prediction results as a JSON.
"""
with TempDir() as tmp:
local_path = _download_artifact_from_uri(model_uri, output_path=tmp.path())
# NB: Absolute windows paths do not work with mlflow apis, use file uri to ensure
# platform compatibility.
local_uri = path_to_local_file_uri(local_path)
if not self._no_conda and ENV in self._config:
conda_env_path = os.path.join(local_path, self._config[ENV])
# NOTE: We're calling main in the pyfunc scoring server belonging to the current
# conda environment. The model environment may contain mlflow with different version
# than the one in the current active environment. This is the intended behavior.
# We need to make sure the scoring server is consistent with the outside mlflow
# while the model that is being loaded may depend on a different version of mlflow.
# The hope is that the scoring server is self contained enough and does not have
# external mlflow dependencies that would be incompatible between mlflow versions.
if input_path is None:
input_path = "__stdin__"
if output_path is None:
output_path = "__stdout__"
command = "python {0} predict {1} {2} {3} {4} {5}".format(scoring_server.__file__,
shlex_quote(local_path),
shlex_quote(input_path),
shlex_quote(output_path),
content_type,
json_format)
return scoring_server._execute_in_conda_env(conda_env_path, command)
command = ('python -c "from mlflow.pyfunc.scoring_server import _predict; _predict('
'model_uri={model_uri}, '
'input_path={input_path}, '
'output_path={output_path}, '
'content_type={content_type}, '
'json_format={json_format})"'
).format(
model_uri=repr(local_uri),
input_path=repr(input_path),
output_path=repr(output_path),
content_type=repr(content_type),
json_format=repr(json_format))
return _execute_in_conda_env(conda_env_path, command, self._install_mlflow)
else:
scoring_server._predict(local_path, input_path, output_path, content_type,
scoring_server._predict(local_uri, input_path, output_path, content_type,
json_format)

def serve(self, model_uri, port, host):
"""
Serve pyfunc model locally.
"""
with TempDir() as tmp:
local_path = shlex_quote(_download_artifact_from_uri(model_uri, output_path=tmp.path()))
local_path = _download_artifact_from_uri(model_uri, output_path=tmp.path())
# NB: Absolute windows paths do not work with mlflow apis, use file uri to ensure
# platform compatibility.
local_uri = path_to_local_file_uri(local_path)
if not self._no_conda and ENV in self._config:
conda_env_path = os.path.join(local_path, self._config[ENV])
command = "python {0} serve {1} {2} {3}".format(scoring_server.__file__,
shlex_quote(local_path),
port, host)
return scoring_server._execute_in_conda_env(conda_env_path, command)

command = ('python -c "from mlflow.pyfunc.scoring_server import _serve; _serve('
'model_uri={model_uri}, '
'port={port}, '
'host={host})"'
).format(
model_uri=repr(local_uri),
port=repr(port),
host=repr(host))

return _execute_in_conda_env(conda_env_path, command,
self._install_mlflow)
else:
scoring_server._serve(local_path, port, host)
scoring_server._serve(local_uri, port, host)

def can_score_model(self):
if self._no_conda:
Expand All @@ -80,3 +91,27 @@ def can_score_model(self):
except FileNotFoundError:
# Can not find conda
return False


def _execute_in_conda_env(conda_env_path, command, install_mlflow):
env_id = os.environ.get("MLFLOW_HOME", VERSION) if install_mlflow else None
conda_env_name = _get_or_create_conda_env(conda_env_path, env_id=env_id)
activate_path = _get_conda_bin_executable("activate")
activate_conda_env = ["source {0} {1}".format(activate_path, conda_env_name)]

if install_mlflow:
if "MLFLOW_HOME" in os.environ: # dev version
install_mlflow = "pip install -e {} 1>&2".format(os.environ["MLFLOW_HOME"])
else:
install_mlflow = "pip install mlflow=={} 1>&2".format(VERSION)

activate_conda_env += [install_mlflow]

command = " && ".join(activate_conda_env + [command])
_logger.info("=== Running command '%s'", command)
child = subprocess.Popen(["bash", "-c", command], close_fds=True)
rc = child.wait()
if rc != 0:
raise Exception("Command '{0}' returned non zero return code. Return code = {1}".format(
command, rc
))
54 changes: 6 additions & 48 deletions mlflow/pyfunc/scoring_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import numpy as np
import pandas as pd
from six import reraise
import subprocess
import sys
import traceback

Expand All @@ -36,7 +35,6 @@
from mlflow.pyfunc import load_pyfunc as load_model
from mlflow.protos.databricks_pb2 import MALFORMED_REQUEST, BAD_REQUEST
from mlflow.server.handlers import catch_mlflow_exception
from mlflow.projects import _get_or_create_conda_env, _get_conda_bin_executable

try:
from StringIO import StringIO
Expand Down Expand Up @@ -181,24 +179,9 @@ def transformation(): # pylint: disable=unused-variable
return app


def _safe_local_path(local_path):
# NB: Since mlflow 1.0, mlflow.pyfunc.load_model expects uri instead of local path. Local paths
# work, however absolute windows path don't because the drive is parsed as scheme. Since we do
# not control the version of mlflow (the scoring server version matches the version of mlflow
# invoking the scoring command, the rest of mlflow comes from the model environment) we check
# the mlflow version at run time and convert local path to file uri to ensure platform
# independence.
from mlflow.version import VERSION
is_recent_version = VERSION.endswith("dev0") or int(VERSION.split(".")[0]) >= 1
if is_recent_version:
from mlflow.tracking.utils import path_to_local_file_uri
return path_to_local_file_uri(local_path)
return local_path


def _predict(local_path, input_path, output_path, content_type, json_format):
pyfunc_model = load_model(_safe_local_path(local_path))
if input_path is None or input_path == "__stdin__":
def _predict(model_uri, input_path, output_path, content_type, json_format):
pyfunc_model = load_model(model_uri)
if input_path is None:
input_path = sys.stdin

if content_type == "json":
Expand All @@ -208,33 +191,18 @@ def _predict(local_path, input_path, output_path, content_type, json_format):
else:
raise Exception("Unknown content type '{}'".format(content_type))

if output_path is None or output_path == "__stdout__":
if output_path is None:
predictions_to_json(pyfunc_model.predict(df), sys.stdout)
else:
with open(output_path, "w") as fout:
predictions_to_json(pyfunc_model.predict(df), fout)


def _serve(local_path, port, host):
pyfunc_model = load_model(_safe_local_path(local_path))
def _serve(model_uri, port, host):
pyfunc_model = load_model(model_uri)
init(pyfunc_model).run(port=port, host=host)


def _execute_in_conda_env(conda_env_path, command):
conda_env_name = _get_or_create_conda_env(conda_env_path)
activate_path = _get_conda_bin_executable("activate")
command = " && ".join(
["source {} {}".format(activate_path, conda_env_name), "pip install mlflow 1>&2", command]
)
_logger.info("=== Running command '%s'", command)
child = subprocess.Popen(["bash", "-c", command], close_fds=True)
rc = child.wait()
if rc != 0:
raise Exception("Command '{0}' returned non zero return code. Return code = {1}".format(
command, rc
))


class NumpyEncoder(JSONEncoder):
""" Special json encoder for numpy types.
Note that some numpy types doesn't have native python equivalence,
Expand Down Expand Up @@ -265,13 +233,3 @@ def _get_jsonable_obj(data, pandas_orient="records"):
return pd.DataFrame(data).to_dict(orient=pandas_orient)
else: # by default just return whatever this is and hope for the best
return data


if __name__ == '__main__':
if sys.argv[1] == "predict":
_predict(local_path=sys.argv[2], input_path=sys.argv[3], output_path=sys.argv[4],
content_type=sys.argv[5], json_format=sys.argv[6])
elif sys.argv[1] == "serve":
_serve(local_path=sys.argv[2], port=sys.argv[3], host=sys.argv[4])
else:
raise Exception("Unknown command '{}'".format(sys.argv[1]))
17 changes: 8 additions & 9 deletions mlflow/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import os
import pickle
import yaml
import copy

import mlflow
from mlflow import pyfunc
Expand All @@ -37,18 +36,21 @@
]


def get_default_conda_env():
def get_default_conda_env(include_cloudpickle=False):
"""
:return: The default Conda environment for MLflow Models produced by calls to
:func:`save_model()` and :func:`log_model()`.
"""
import sklearn

pip_deps = None
if include_cloudpickle:
import cloudpickle
pip_deps = ["cloudpickle=={}".format(cloudpickle.__version__)]
return _mlflow_conda_env(
additional_conda_deps=[
"scikit-learn={}".format(sklearn.__version__),
],
additional_pip_deps=None,
additional_pip_deps=pip_deps,
additional_conda_channels=None
)

Expand Down Expand Up @@ -124,11 +126,8 @@ def save_model(sk_model, path, conda_env=None, mlflow_model=Model(),

conda_env_subpath = "conda.yaml"
if conda_env is None:
conda_env = copy.deepcopy(get_default_conda_env())
if serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE:
import cloudpickle
conda_env["dependencies"].append(
{"pip": ["cloudpickle=={}".format(cloudpickle.__version__)]})
conda_env = get_default_conda_env(
include_cloudpickle=serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE)
elif not isinstance(conda_env, dict):
with open(conda_env, "r") as f:
conda_env = yaml.safe_load(f)
Expand Down
Loading

0 comments on commit 8e820da

Please sign in to comment.