Skip to content

Commit

Permalink
MLeap Flavor/Deployment mlflow#2: Python support for the MLeap flavor (
Browse files Browse the repository at this point in the history
…mlflow#324)

* Add SparkJava plugin

* Add MLeap flavor and modify SparkML module to use it

* Fixes

* Add mleap

* Import model in test

* Import mleap

* Add missing assert

* Reorder spark session creation params in test

* Docs fix

* revert pom xml change

* remove java

* Add standalone  to mleap

* Import fix

* Add docs and

* Add warning about py incompatibility

* Address comments

* Code spacing fix

* Revert log test changes

* Whitespace fixes

* Exception import fixes

* Fix lint issues

* Fix method call

* callfix

* Remove unused imports

* Py4j log level fix

* reorder tests

* testfix

* test fixes

* Spacing fix

* lint fixes

* Add mleap schema test

* Fix test

* Whitespace fix

* Test fix

* Fix exception path

* Update test_exception.py

* Disable warning

* Disable unused variable warning
  • Loading branch information
dbczumar committed Aug 28, 2018
1 parent 150a345 commit d1e8025
Show file tree
Hide file tree
Showing 15 changed files with 379 additions and 102 deletions.
5 changes: 5 additions & 0 deletions mlflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ class MlflowException(Exception):

class IllegalArtifactPathError(MlflowException):
"""The artifact_path parameter was invalid."""


class ExecutionException(MlflowException):
"""Exception thrown when executing a project fails."""
pass
120 changes: 120 additions & 0 deletions mlflow/mleap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
MLflow integration of the MLeap serialization tool for PySpark MLlib pipelines
This module provides utilities for saving models using the MLeap
using the MLeap library's persistence mechanism.
A companion module for loading MLFlow models with the MLeap flavor format is available in the
`mlflow/java` package.
For more information about MLeap, see https://github.com/combust/mleap.
"""

from __future__ import absolute_import

import os
import sys
import traceback
import json
from six import reraise

import mlflow
from mlflow.models import Model

FLAVOR_NAME = "mleap"


def log_model(spark_model, sample_input, artifact_path):
"""
Log a Spark MLLib model in MLeap format as an MLflow artifact
for the current run. The logged model will have the MLeap flavor.
NOTE: The MLeap model flavor cannot be loaded in Python. It must be loaded using the
Java module within the `mlflow/java` package.
:param spark_model: Spark PipelineModel to be saved. This model must be MLeap-compatible and
cannot contain any custom transformers.
:param sample_input: A sample PySpark Dataframe input that the model can evaluate. This is
required by MLeap for data schema inference.
"""
return Model.log(artifact_path=artifact_path, flavor=mlflow.mleap,
spark_model=spark_model, sample_input=sample_input)


def save_model(spark_model, sample_input, path, mlflow_model=Model()):
"""
Save a Spark MLlib PipelineModel in MLeap format at the given local path.
The saved model will have the MLeap flavor.
NOTE: The MLeap model flavor cannot be loaded in Python. It must be loaded using the
Java module within the `mlflow/java` package.
:param path: Path of the MLFlow model to which this flavor is being added.
:param spark_model: Spark PipelineModel to be saved. This model must be MLeap-compatible and
cannot contain any custom transformers.
:param sample_input: A sample PySpark Dataframe input that the model can evaluate. This is
required by MLeap for data schema inference.
:param mlflow_model: MLFlow model config to which this flavor is being added.
"""
add_to_model(mlflow_model, path, spark_model, sample_input)
mlflow_model.save(os.path.join(path, "MLmodel"))


def add_to_model(mlflow_model, path, spark_model, sample_input):
"""
Add the MLeap flavor to a pre-existing MLFlow model.
:param mlflow_model: MLFlow model config to which this flavor is being added.
:param path: Path of the MLFlow model to which this flavor is being added.
:param spark_model: Spark PipelineModel to be saved. This model must be MLeap-compatible and
cannot contain any custom transformers.
:param sample_input: A sample PySpark Dataframe input that the model can evaluate. This is
required by MLeap for data schema inference.
"""
from pyspark.ml.pipeline import PipelineModel
from pyspark.sql import DataFrame
import mleap.version
from mleap.pyspark.spark_support import SimpleSparkSerializer # pylint: disable=unused-variable
from py4j.protocol import Py4JError

if not isinstance(spark_model, PipelineModel):
raise Exception("Not a PipelineModel."
" MLeap can currently only save PipelineModels.")
if sample_input is None:
raise Exception("A sample input must be specified in order to add the MLeap flavor.")
if not isinstance(sample_input, DataFrame):
raise Exception("The sample input must be a PySpark dataframe of type `{df_type}`".format(
df_type=DataFrame.__module__))

mleap_path_full = os.path.join(path, "mleap")
mleap_datapath_sub = os.path.join("mleap", "model")
mleap_datapath_full = os.path.join(path, mleap_datapath_sub)
if os.path.exists(mleap_path_full):
raise Exception("MLeap model data path already exists at: {path}".format(
path=mleap_path_full))
os.makedirs(mleap_path_full)

dataset = spark_model.transform(sample_input)
model_path = "file:{mp}".format(mp=mleap_datapath_full)
try:
spark_model.serializeToBundle(path=model_path,
dataset=dataset)
except Py4JError as e:
tb = sys.exc_info()[2]
error_str = ("MLeap encountered an error while serializing the model. Please ensure that"
" the model is compatible with MLeap"
" (i.e does not contain any custom transformers). Error text: {err}".format(
err=str(e)))
traceback.print_exc()
reraise(Exception, error_str, tb)

input_schema = json.loads(sample_input.schema.json())
mleap_schemapath_sub = os.path.join("mleap", "schema.json")
mleap_schemapath_full = os.path.join(path, mleap_schemapath_sub)
with open(mleap_schemapath_full, "w") as out:
json.dump(input_schema, out, indent=4)

mlflow_model.add_flavor(FLAVOR_NAME,
mleap_version=mleap.version.__version__,
model_data=mleap_datapath_sub,
input_schema=mleap_schemapath_sub)
2 changes: 1 addition & 1 deletion mlflow/projects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from mlflow.projects.submitted_run import LocalSubmittedRun
from mlflow.projects import _project_spec
from mlflow.utils.exception import ExecutionException
from mlflow.exceptions import ExecutionException
from mlflow.entities import RunStatus, SourceType, Param
import mlflow.tracking as tracking
from mlflow.tracking.fluent import _get_experiment_id, _get_git_commit
Expand Down
2 changes: 1 addition & 1 deletion mlflow/projects/_project_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from six.moves import shlex_quote

from mlflow import data
from mlflow.utils.exception import ExecutionException
from mlflow.exceptions import ExecutionException


MLPROJECT_FILE_NAME = "MLproject"
Expand Down
2 changes: 1 addition & 1 deletion mlflow/projects/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from mlflow.projects import _fetch_project
from mlflow.projects.submitted_run import SubmittedRun
from mlflow.utils import rest_utils, file_utils
from mlflow.utils.exception import ExecutionException
from mlflow.exceptions import ExecutionException
from mlflow.utils.logging_utils import eprint
from mlflow import tracking
from mlflow.version import VERSION
Expand Down
13 changes: 1 addition & 12 deletions mlflow/pyfunc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
from mlflow import tracking
from mlflow.models import Model
from mlflow.utils import PYTHON_VERSION, get_major_minor_py_version
from mlflow.utils.file_utils import TempDir
from mlflow.utils.file_utils import TempDir, _copy_file_or_tree
from mlflow.utils.logging_utils import eprint

FLAVOR_NAME = "python_function"
Expand Down Expand Up @@ -229,17 +229,6 @@ def predict(*args):
return pandas_udf(predict, result_type)


def _copy_file_or_tree(src, dst, dst_dir):
name = os.path.join(dst_dir, os.path.basename(os.path.abspath(src)))
if dst_dir:
os.mkdir(os.path.join(dst, dst_dir))
if os.path.isfile(src):
shutil.copy(src=src, dst=os.path.join(dst, name))
else:
shutil.copytree(src=src, dst=os.path.join(dst, name))
return name


def save_model(dst_path, loader_module, data_path=None, code_path=(), conda_env=None,
model=Model()):
"""
Expand Down
71 changes: 46 additions & 25 deletions mlflow/spark.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
"""
MLflow integration for Spark MLlib models.
Spark MLlib models are saved and loaded using native Spark MLlib persistence.
The models can be exported as pyfunc for out-of Spark deployment or it can be loaded back as Spark
Transformer in order to score it in Spark. The pyfunc flavor instantiates SparkContext internally
and reads the input data as Spark DataFrame prior to scoring.
This module enables the exporting of Spark MLlib models with the following flavors (formats):
1. Spark MLlib (native) format - Allows models to be loaded as Spark Transformers for scoring
in a Spark session. Models with this flavor can be loaded
back as PySpark PipelineModel objects in Python. This
is the main flavor and is always produced.
2. PyFunc - Supports deployment outside of Spark by instantiating a SparkContext and reading
input data as a Spark DataFrame prior to scoring. Also supports deployment in Spark
as a Spark UDF. Models with this flavor can be loaded back as Python functions
for performing inference. This flavor is always produced.
3. MLeap - Enables high-performance deployment outside of Spark by leveraging MLeap's
custom dataframe and pipeline representations. For more informatin about MLeap,
see https://github.com/combust/mleap. Models with this flavor *cannot* be loaded
back as Python objects. Rather, they must be deserialized in Java using the
`mlflow/java` package. This flavor is only produced if MLeap-compatible arguments
are specified.
"""

from __future__ import absolute_import
Expand All @@ -15,10 +25,9 @@
import pyspark
from pyspark import SparkContext
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.base import Transformer

import mlflow
from mlflow import pyfunc
from mlflow import pyfunc, mleap
from mlflow.models import Model
from mlflow.utils.logging_utils import eprint

Expand All @@ -28,9 +37,11 @@
DFS_TMP = "/tmp/mlflow"


def log_model(spark_model, artifact_path, conda_env=None, jars=None, dfs_tmpdir=None):
def log_model(spark_model, artifact_path, conda_env=None, jars=None, dfs_tmpdir=None,
sample_input=None):
"""
Log a Spark MLlib model as an MLflow artifact for the current run.
Log a Spark MLlib model as an MLflow artifact for the current run. This will use the
MLlib persistence format, and the logged model will have the Spark flavor.
:param spark_model: PipelineModel to be saved.
:param artifact_path: Run relative artifact path.
Expand All @@ -43,7 +54,10 @@ def log_model(spark_model, artifact_path, conda_env=None, jars=None, dfs_tmpdir=
destination and then copied into the model's artifact directory. This is
necessary as Spark ML models read / write from / to DFS if running on a
cluster. All temporary files created on the DFS will be removed if this
operation completes successfully. Defaults to /tmp/mlflow.
operation completes successfully. Defaults to /tmp/mlflow.`
:param sample_input: A sample input that will be used to add the MLeap flavor to the model.
This must be a PySpark dataframe that the model can evaluate. If
`sample_input` is `None`, the MLeap flavor will not be added.
>>> from pyspark.ml import Pipeline
>>> from pyspark.ml.classification import LogisticRegression
Expand All @@ -62,7 +76,8 @@ def log_model(spark_model, artifact_path, conda_env=None, jars=None, dfs_tmpdir=
"""
return Model.log(artifact_path=artifact_path, flavor=mlflow.spark, spark_model=spark_model,
jars=jars, conda_env=conda_env, dfs_tmpdir=dfs_tmpdir)
jars=jars, conda_env=conda_env, dfs_tmpdir=dfs_tmpdir,
sample_input=sample_input)


def _tmp_path(dfs_tmp):
Expand Down Expand Up @@ -118,11 +133,13 @@ def delete(cls, path):


def save_model(spark_model, path, mlflow_model=Model(), conda_env=None, jars=None,
dfs_tmpdir=None):
dfs_tmpdir=None, sample_input=None):
"""
Save Spark MLlib PipelineModel at given local path.
Save a Spark MLlib PipelineModel at the given local path.
Uses Spark MLlib persistence mechanism.
By default, this function saves models using the Spark MLlib persistence mechanism.
Additionally, if a sample input is specified via the `sample_input` parameter, the model
will also be serialized in MLeap format and the MLeap flavor will be added.
:param spark_model: Spark PipelineModel to be saved. Can save only PipelineModels.
:param path: Local path where the model is to be saved.
Expand All @@ -135,7 +152,9 @@ def save_model(spark_model, path, mlflow_model=Model(), conda_env=None, jars=Non
as Spark ML models read / write from / to DFS if running on a cluster. All
temporary files created on the DFS will be removed if this operation
completes successfully. Defaults to /tmp/mlflow.
:param sample_input: A sample input that will be used to add the MLeap flavor to the model.
This must be a PySpark dataframe that the model can evaluate. If
`sample_input` is `None`, the MLeap flavor will not be added.
>>> from mlflow import spark
>>> from pyspark.ml.pipeline.PipelineModel
Expand All @@ -147,26 +166,28 @@ def save_model(spark_model, path, mlflow_model=Model(), conda_env=None, jars=Non
dfs_tmpdir = dfs_tmpdir if dfs_tmpdir is not None else DFS_TMP
if jars:
raise Exception("jar dependencies are not implemented")
if not isinstance(spark_model, Transformer):
raise Exception("Unexpected type {}. SparkML model works only with Transformers".format(
str(type(spark_model))))

if sample_input is not None:
mleap.add_to_model(mlflow_model, path, spark_model, sample_input)

if not isinstance(spark_model, PipelineModel):
raise Exception("Not a PipelineModel. SparkML can save only PipelineModels.")
raise Exception("Not a PipelineModel. SparkML can only save PipelineModels.")

# Spark ML stores the model on DFS if running on a cluster
# Save it to a DFS temp dir first and copy it to local path
tmp_path = _tmp_path(dfs_tmpdir)
spark_model.save(tmp_path)
model_path = os.path.abspath(os.path.join(path, "model"))
_HadoopFileSystem.copy_to_local_file(tmp_path, model_path, removeSrc=True)
sparkml_data_path_sub = "sparkml"
sparkml_data_path = os.path.abspath(os.path.join(path, sparkml_data_path_sub))
_HadoopFileSystem.copy_to_local_file(tmp_path, sparkml_data_path, removeSrc=True)
pyspark_version = pyspark.version.__version__
model_conda_env = None
if conda_env:
model_conda_env = os.path.basename(os.path.abspath(conda_env))
shutil.copyfile(conda_env, os.path.join(path, model_conda_env))
if jars:
raise Exception("JAR dependencies are not yet implemented")
mlflow_model.add_flavor(FLAVOR_NAME, pyspark_version=pyspark_version, model_data="model")
pyfunc.add_to_model(mlflow_model, loader_module="mlflow.spark", data="model",
mlflow_model.add_flavor(FLAVOR_NAME, pyspark_version=pyspark_version,
model_data=sparkml_data_path_sub)
pyfunc.add_to_model(mlflow_model, loader_module="mlflow.spark", data=sparkml_data_path_sub,
env=model_conda_env)
mlflow_model.save(os.path.join(path, "MLmodel"))

Expand Down
7 changes: 0 additions & 7 deletions mlflow/utils/exception.py

This file was deleted.

11 changes: 11 additions & 0 deletions mlflow/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,14 @@ def ignore(_, names):
shutil.copytree(src_path, os.path.join(dst_path, mlflow_dir),
ignore=_docker_ignore(src_path))
return mlflow_dir


def _copy_file_or_tree(src, dst, dst_dir):
name = os.path.join(dst_dir, os.path.basename(os.path.abspath(src)))
if dst_dir:
os.mkdir(os.path.join(dst, dst_dir))
if os.path.isfile(src):
shutil.copy(src=src, dst=os.path.join(dst, name))
else:
shutil.copytree(src=src, dst=os.path.join(dst, name))
return name
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def package_files(directory):
'boto3',
'querystring_parser',
'simplejson',
'mleap>=0.8.1',
],
entry_points='''
[console_scripts]
Expand Down
2 changes: 1 addition & 1 deletion tests/projects/test_entry_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from six.moves import shlex_quote


from mlflow.utils.exception import ExecutionException
from mlflow.exceptions import ExecutionException
from mlflow.utils.file_utils import TempDir
from tests.projects.utils import load_project, TEST_PROJECT_DIR

Expand Down
2 changes: 1 addition & 1 deletion tests/projects/test_project_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from mlflow.utils.exception import ExecutionException
from mlflow.exceptions import ExecutionException
from mlflow.projects import _project_spec
from tests.projects.utils import load_project

Expand Down
2 changes: 1 addition & 1 deletion tests/projects/test_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import mlflow
from mlflow.entities import RunStatus
from mlflow.utils.exception import ExecutionException
from mlflow.exceptions import ExecutionException
from mlflow.store.file_store import FileStore
from mlflow.utils import env

Expand Down
Loading

0 comments on commit d1e8025

Please sign in to comment.