Skip to content

Commit

Permalink
MLeap Flavor/Deployment mlflow#5: Container support for MLeap (mlflow…
Browse files Browse the repository at this point in the history
…#395)

* Add SparkJava plugin

* Add MLeap flavor and modify SparkML module to use it

* Fixes

* Add mleap

* Import model in test

* Import mleap

* Add missing assert

* Reorder spark session creation params in test

* Docs fix

* revert pom xml change

* remove java

* Add standalone  to mleap

* Import fix

* Add docs and

* Add warning about py incompatibility

* Address comments

* Code spacing fix

* Revert log test changes

* Whitespace fixes

* Exception import fixes

* Fix lint issues

* Fix method call

* callfix

* Remove unused imports

* Py4j log level fix

* reorder tests

* testfix

* test fixes

* Spacing fix

* lint fixes

* Add mleap schema test

* Fix test

* Whitespace fix

* Test fix

* Bump version to 0.5.3dev0

* Update container + build function to support MLeap

* Fix lint errors

* Install maven

* Test support infra

* Fixws

* Add tests

* Path fixes

* line fix

* Lint fixes

* Revert scoring server

* Update test asserts, remove unused file

* Specify server port

* Sigquit >> sigterm

* accept any version of mlflow jar

* Address some comments, fix bug in scoring server argument parsing (whoops :/), add support for specifying deployment flavor

* Address comments

* Update ScoringServer.java

* Update ScoringServer.java

* Update __init__.py

* varfix

* Revert tempdir changes

* Mark test as large

* Use sagemaker env var instead of file upload

* Parameter fixes

* CLI fixes

* CLI Fixes

* CLI support, tests

* Make linter happy

* Fix serving command

* Name change for get_serving_flavor

* Address comments

* Make linter happy

* Address comments

* Add  cli command and refactor flavor selection/validation

* Test fixes

* Add print statements

* Varfix

* Run local in shell to get docker env var successfully

* Lint fix

* Iteration fix

* Deployment env var fixes for local serving
  • Loading branch information
dbczumar committed Sep 4, 2018
1 parent 9af99d0 commit d85292e
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public static void main(String[] args) throws IOException, PredictorLoadingExcep
String modelPath = args[0];
Optional<Integer> portNum = Optional.empty();
if (args.length > 1) {
portNum = Optional.of(Integer.parseInt(args[2]));
portNum = Optional.of(Integer.parseInt(args[1]));
}
ScoringServer server = new ScoringServer(modelPath);
try {
Expand Down
164 changes: 128 additions & 36 deletions mlflow/sagemaker/__init__.py

Large diffs are not rendered by default.

26 changes: 21 additions & 5 deletions mlflow/sagemaker/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ def commands():
" configuration will be used when creating the new SageMaker model associated"
" with this application. For more information, see"
" https://docs.aws.amazon.com/sagemaker/latest/dg/API_VpcConfig.html")
@click.option("--flavor", "-f", default=None,
help=("The name of the flavor to use for deployment. Must be one of the following:"
" {supported_flavors}. If unspecified, a flavor will be automatically selected"
" from the model's available flavors.".format(
supported_flavors=mlflow.sagemaker.SUPPORTED_DEPLOYMENT_FLAVORS)))
def deploy(app_name, model_path, execution_role_arn, bucket, run_id, image_url, region_name, mode,
archive, instance_type, instance_count, vpc_config):
archive, instance_type, instance_count, vpc_config, flavor):
"""
Deploy model on Sagemaker as a REST API endpoint. Current active AWS account needs to have
correct permissions setup.
Expand All @@ -57,7 +62,13 @@ def deploy(app_name, model_path, execution_role_arn, bucket, run_id, image_url,
execution_role_arn=execution_role_arn, bucket=bucket, run_id=run_id,
image_url=image_url, region_name=region_name, mode=mode,
archive=archive, instance_type=instance_type,
instance_count=instance_count, vpc_config=vpc_config)
instance_count=instance_count, vpc_config=vpc_config, flavor=flavor)


@commands.command("list-flavors")
def list_flavors():
print("Supported model flavors for SageMaker deployment are: {supported_flavors}".format(
supported_flavors=mlflow.sagemaker.SUPPORTED_DEPLOYMENT_FLAVORS))


@commands.command("delete")
Expand All @@ -80,12 +91,17 @@ def delete(app_name, region_name, archive):
@cli_args.RUN_ID
@click.option("--port", "-p", default=5000, help="Server port. [default: 5000]")
@click.option("--image", "-i", default=IMAGE, help="Docker image name")
def run_local(model_path, run_id, port, image):
@click.option("--flavor", "-f", default=None,
help=("The name of the flavor to use for local serving. Must be one of the following:"
" {supported_flavors}. If unspecified, a flavor will be automatically selected"
" from the model's available flavors.".format(
supported_flavors=mlflow.sagemaker.SUPPORTED_DEPLOYMENT_FLAVORS)))
def run_local(model_path, run_id, port, image, flavor):
"""
Serve model locally running in a Sagemaker-compatible Docker container.
"""
mlflow.sagemaker.run_local(
model_path=model_path, run_id=run_id, port=port, image=image)
model_path=model_path, run_id=run_id, port=port, image=image, flavor=flavor)


@commands.command("build-and-push-container")
Expand All @@ -102,7 +118,7 @@ def build_and_push_container(build, push, container, mlflow_home):
The image is pushed to ECR under current active AWS account and to current active AWS region.
"""
if not (build or push):
print("skipping both build nad push, have nothing to do!")
print("skipping both build and push, have nothing to do!")
if build:
mlflow.sagemaker.build_image(container,
mlflow_home=os.path.abspath(mlflow_home) if mlflow_home
Expand Down
103 changes: 80 additions & 23 deletions mlflow/sagemaker/container/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,29 @@
import signal
from subprocess import check_call, Popen
import sys
import yaml

from pkg_resources import resource_filename

import mlflow
import mlflow.version

from mlflow import pyfunc
from mlflow import pyfunc, mleap
from mlflow.models import Model
from mlflow.utils.logging_utils import eprint
from mlflow.version import VERSION as MLFLOW_VERSION

MODEL_PATH = "/opt/ml/model"

DEPLOYMENT_CONFIG_KEY_FLAVOR_NAME = "deployment_flavor_name"

DEFAULT_SAGEMAKER_SERVER_PORT = 8080

SUPPORTED_FLAVORS = [
pyfunc.FLAVOR_NAME,
mleap.FLAVOR_NAME
]


def _init(cmd):
"""
Expand All @@ -47,7 +60,7 @@ def _server_dependencies_cmds():
"""
# TODO: Should we reinstall MLflow? What if there is MLflow in the user's conda environment?
return ["conda install -c anaconda gunicorn", "conda install -c anaconda gevent",
"pip install /opt/mlflow/." if os.path.isdir("/opt/mlflow")
"pip install /opt/mlflow/." if _container_includes_mlflow_source()
else "pip install mlflow=={}".format(MLFLOW_VERSION)]


Expand All @@ -57,10 +70,33 @@ def _serve():
Read the MLmodel config, initialize the Conda environment if needed and start python server.
"""
m = Model.load("/opt/ml/model/MLmodel")
if pyfunc.FLAVOR_NAME not in m.flavors:
raise Exception("Only supports pyfunc models and this is not one.")
conf = m.flavors[pyfunc.FLAVOR_NAME]
model_config_path = os.path.join(MODEL_PATH, "MLmodel")
m = Model.load(model_config_path)

if DEPLOYMENT_CONFIG_KEY_FLAVOR_NAME in os.environ:
serving_flavor = os.environ[DEPLOYMENT_CONFIG_KEY_FLAVOR_NAME]
else:
# Older versions of mlflow may not specify a deployment configuration
serving_flavor = pyfunc.FLAVOR_NAME

if serving_flavor == mleap.FLAVOR_NAME:
# TODO(dbczumar): Host the scoring Java package on Maven Central so that we no
# longer require the container source for this flavor.
if _container_includes_mlflow_source():
_serve_mleap()
else:
raise Exception("The container does not support the specified deployment flavor:"
" `{mleap_flavor}`. Please build the container with the `mlflow_home`"
" parameter specified to enable this feature.".format(
mleap_flavor=mleap.FLAVOR_NAME))
elif pyfunc.FLAVOR_NAME in m.flavors:
_serve_pyfunc(m)
else:
raise Exception("This container only supports models with the MLeap or PyFunc flavors.")


def _serve_pyfunc(model):
conf = model.flavors[pyfunc.FLAVOR_NAME]
bash_cmds = []
if pyfunc.ENV in conf:
print("activating custom environment")
Expand All @@ -71,7 +107,7 @@ def _serve():
os.makedirs(env_path_dst_dir)
# TODO: should we test that the environment does not include any of the server dependencies?
# Those are gonna be reinstalled. should probably test this on the client side
shutil.copyfile(os.path.join("/opt/ml/model/", env), env_path_dst)
shutil.copyfile(os.path.join(MODEL_PATH, env), env_path_dst)
os.system("conda env create -n custom_env -f {}".format(env_path_dst))
bash_cmds += ["source /miniconda/bin/activate custom_env"] + _server_dependencies_cmds()
nginx_conf = resource_filename(mlflow.sagemaker.__name__, "container/scoring_server/nginx.conf")
Expand All @@ -87,35 +123,56 @@ def _serve():
"mlflow.sagemaker.container.scoring_server.wsgi:app").format(nworkers=cpu_count)
bash_cmds.append(cmd)
gunicorn = Popen(["/bin/bash", "-c", "; ".join(bash_cmds)])
signal.signal(signal.SIGTERM, lambda a, b: _sigterm_handler(nginx.pid, gunicorn.pid))
signal.signal(signal.SIGTERM, lambda a, b: _sigterm_handler(pids=[nginx.pid, gunicorn.pid]))
# If either subprocess exits, so do we.
pids = set([nginx.pid, gunicorn.pid])
while True:
pid, _ = os.wait()
if pid in pids:
break
_sigterm_handler(nginx.pid, gunicorn.pid)
awaited_pids = _await_subprocess_exit_any(procs=[nginx, gunicorn])
_sigterm_handler(awaited_pids)


def _serve_mleap():
serve_cmd = ["java", "-cp", "/opt/mlflow/mlflow/java/scoring/target/mlflow-scoring-*"
"-with-dependencies.jar".format(
mlflow_version=mlflow.version.VERSION),
"org.mlflow.sagemaker.ScoringServer",
MODEL_PATH, str(DEFAULT_SAGEMAKER_SERVER_PORT)]
# Invoke `Popen` with a single string command in the shell to support wildcard usage
# with the mlflow jar version.
serve_cmd = " ".join(serve_cmd)
mleap = Popen(serve_cmd, shell=True)
signal.signal(signal.SIGTERM, lambda a, b: _sigterm_handler(pids=[mleap.pid]))
awaited_pids = _await_subprocess_exit_any(procs=[mleap])
_sigterm_handler(awaited_pids)


def _container_includes_mlflow_source():
return os.path.isdir("/opt/mlflow")


def _train():
raise Exception("Train is not implemented.")


def _sigterm_handler(nginx_pid, gunicorn_pid):
def _await_subprocess_exit_any(procs):
pids = [proc.pid for proc in procs]
while True:
pid, _ = os.wait()
if pid in pids:
break
return pids


def _sigterm_handler(pids):
"""
Cleanup when terminating.
Attempt to kill all launched processes and exit.
"""
print("Got sigterm signal, exiting.")
try:
os.kill(nginx_pid, signal.SIGQUIT)
except OSError:
pass
try:
os.kill(gunicorn_pid, signal.SIGTERM)
except OSError:
pass
for pid in pids:
try:
os.kill(pid, signal.SIGTERM)
except OSError:
pass

sys.exit(0)
73 changes: 0 additions & 73 deletions mlflow/sagemaker/container/scoring_server/serve.py

This file was deleted.

11 changes: 9 additions & 2 deletions tests/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from subprocess import Popen, PIPE, STDOUT
import time

import pandas as pd


def random_int(lo=1, hi=1e10):
return random.randint(lo, hi)
Expand All @@ -20,6 +22,10 @@ def random_file(ext):


def score_model_in_sagemaker_docker_container(model_path, data):
"""
:param data: The data to send to the docker container for testing. This is either a
Pandas dataframe or a JSON-formatted string.
"""
env = dict(os.environ)
env.update(LC_ALL="en_US.UTF-8", LANG="en_US.UTF-8")
proc = Popen(['mlflow', 'sagemaker', 'run-local', '-m', model_path], stdout=PIPE, stderr=STDOUT,
Expand All @@ -42,8 +48,9 @@ def score_model_in_sagemaker_docker_container(model_path, data):
print("server up, ping status", ping_status)
if ping_status.status_code != 200:
raise Exception("ping failed, server is not happy")
x = data.to_dict(orient='records')
y = requests.post(url='http://localhost:5000/invocations', json=x)
if type(data) == pd.DataFrame:
data = data.to_dict(orient='records')
y = requests.post(url='http://localhost:5000/invocations', json=data)
import json
return json.loads(y.content)
finally:
Expand Down
Loading

0 comments on commit d85292e

Please sign in to comment.