diff --git a/jenkins/Jenkinsfile.databricksnightly b/jenkins/Jenkinsfile.databricksnightly index a5810a38bfa..fc30299a18d 100644 --- a/jenkins/Jenkinsfile.databricksnightly +++ b/jenkins/Jenkinsfile.databricksnightly @@ -36,6 +36,8 @@ pipeline { options { ansiColor('xterm') + // timeout doesn't seem to work with environment variables so make sure to update below + // IDLE_TIMEOUT config as well timeout(time: 180, unit: 'MINUTES') buildDiscarder(logRotator(numToKeepStr: '10')) } @@ -44,24 +46,28 @@ pipeline { choice(name: 'DEPLOY_TO', choices: ['Urm', 'Local'], description: 'Where to deploy artifacts to') string(name: 'DATABRICKS_VERSION', - defaultValue: '0.3.0-SNAPSHOT', description: 'Version to set') + defaultValue: '0.3.0-SNAPSHOT', description: 'Version to use for databricks jar produced') string(name: 'CUDF_VERSION', defaultValue: '0.16-SNAPSHOT', description: 'Cudf version to use') string(name: 'CUDA_VERSION', defaultValue: 'cuda10-1', description: 'cuda version to use') - string(name: 'CLUSTER_ID', - defaultValue: '0909-141326-pawl52', description: 'databricks cluster id') + string(name: 'RUNTIME', + defaultValue: '7.0.x-gpu-ml-scala2.12', description: 'databricks runtime') + string(name: 'PUBLIC_KEY', + defaultValue: '\"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDB+ValakyoKn7w+iBRoAi1KlLVH4yVmRXhLCZs1qUECBAhbck2o8Lgjp5wJ+epdT3+EAP2+t/zlK1mU9tTylidapR4fZuIk9ApoQSLOUEXcUtHkPpZulIoGAq78HoyiEs1sKovc6ULvpymjdnQ3ogCZlTlP9uqmL2E4kbtrNCNL0SVj/w10AqzrJ5lqQgO5dIdDRMHW2sv88JI1VLlfiSsofa9RdI7hDRuCnfZ0+dv2URJGGzGt2BkdEmk9t5F1BMpuXvZ8HzOYdACzw0U+THBOk9d4CACUYMyO1XqlXwoYweNKRnigXDCRaTWGFBzTkugRuW/BZBccTR1ON430uRB svcngcc@nvidia.com\"', description: 'public key') string(name: 'REF', defaultValue: 'branch-0.3', description: 'Commit to build') } environment { + IDLE_TIMEOUT = 180 JENKINS_ROOT = 'jenkins' MVN_URM_MIRROR='-s jenkins/settings.xml -P mirror-apache-to-urm' LIBCUDF_KERNEL_CACHE_PATH='/tmp' URM_CREDS = credentials("svcngcc_artifactory") DATABRICKS_TOKEN = credentials("SPARK_DATABRICKS_TOKEN") SCALA_VERSION = '2.12' - SPARK_VERSION = '3.0.0-databricks' + // the spark version used when we install databricks jars into .m2 directory + SPARK_VERSION_TO_INSTALL_JARS = '3.0.0-databricks' CI_RAPIDS_JAR = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' CI_CUDF_JAR = 'cudf-0.14-cuda10-1.jar' URM_URL = "${urmUrl}" @@ -78,7 +84,11 @@ pipeline { sshagent(credentials : ['svcngcc_pubpriv']) { sh "rm -rf spark-rapids-ci.tgz" sh "tar -zcvf spark-rapids-ci.tgz *" - sh "python3.6 ./jenkins/databricks/run-tests.py -c $CLUSTER_ID -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -j $CI_RAPIDS_JAR -b $DATABRICKS_VERSION -k $SPARK_VERSION -a $SCALA_VERSION -f $CUDF_VERSION -u $CUDA_VERSION -m $CI_CUDF_JAR" + env.CLUSTERID = sh ( + script: "python3.6 ./jenkins/databricks/create.py -t $DATABRICKS_TOKEN -k $PUBLIC_KEY -r $RUNTIME -i $IDLE_TIMEOUT -n CI-GPU-databricks-${DATABRICKS_VERSION}", + returnStdout: true + ).trim() + sh "python3.6 ./jenkins/databricks/run-tests.py -c ${env.CLUSTERID} -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -j $CI_RAPIDS_JAR -b $DATABRICKS_VERSION -k $SPARK_VERSION_TO_INSTALL_JARS -a $SCALA_VERSION -f $CUDF_VERSION -u $CUDA_VERSION -m $CI_CUDF_JAR" sh "./jenkins/databricks/deploy.sh" } } @@ -88,7 +98,7 @@ pipeline { post { always { script { - sh "python3.6 ./jenkins/databricks/shutdown.py -c $CLUSTER_ID -t $DATABRICKS_TOKEN || true" + sh "python3.6 ./jenkins/databricks/shutdown.py -c ${env.CLUSTERID} -t $DATABRICKS_TOKEN -d || true" if (currentBuild.currentResult == "SUCCESS") { slack("#swrapids-spark-cicd", "Success", color: "#33CC33") } else { diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index ac0b53eea5f..074976f402d 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -17,29 +17,30 @@ set -e -SPARKTGZ=$1 -DATABRICKS_VERSION=$2 +SPARKSRCTGZ=$1 +# this should match whatever is in the pom files for the version +SPARK_PLUGIN_JAR_VERSION=$2 SCALA_VERSION=$3 CI_RAPIDS_JAR=$4 -SPARK_VERSION=$5 +# the version of spark used when we install the databricks jars in .m2 +SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS=$5 CUDF_VERSION=$6 CUDA_VERSION=$7 CI_CUDF_JAR=$8 +# version of Apache Spark we are building against BASE_SPARK_POM_VERSION=$9 -echo "Spark version is $SPARK_VERSION" +echo "Spark version is $SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS" echo "scala version is: $SCALA_VERSION" # this has to match the Databricks init script -DB_JAR_LOC=/databricks/jars -DB_RAPIDS_JAR_LOC=$DB_JAR_LOC/$CI_RAPIDS_JAR -DB_CUDF_JAR_LOC=$DB_JAR_LOC/$CI_CUDF_JAR -RAPIDS_BUILT_JAR=rapids-4-spark_$SCALA_VERSION-$DATABRICKS_VERSION.jar +DB_JAR_LOC=/databricks/jars/ +RAPIDS_BUILT_JAR=rapids-4-spark_$SCALA_VERSION-$SPARK_PLUGIN_JAR_VERSION.jar sudo apt install -y maven rm -rf spark-rapids mkdir spark-rapids -tar -zxvf $SPARKTGZ -C spark-rapids +tar -zxvf $SPARKSRCTGZ -C spark-rapids cd spark-rapids export WORKSPACE=`pwd` mvn -B '-Pdatabricks,!snapshot-shims' clean package -DskipTests || true @@ -60,7 +61,7 @@ mvn -B install:install-file \ -Dfile=$JARDIR/$COREJAR \ -DgroupId=org.apache.spark \ -DartifactId=spark-core_$SCALA_VERSION \ - -Dversion=$SPARK_VERSION \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar \ -DpomFile=$COREPOMPATH/$COREPOM @@ -69,7 +70,7 @@ mvn -B install:install-file \ -Dfile=$JARDIR/$CATALYSTJAR \ -DgroupId=org.apache.spark \ -DartifactId=spark-catalyst_$SCALA_VERSION \ - -Dversion=$SPARK_VERSION \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar mvn -B install:install-file \ @@ -77,7 +78,7 @@ mvn -B install:install-file \ -Dfile=$JARDIR/$SQLJAR \ -DgroupId=org.apache.spark \ -DartifactId=spark-sql_$SCALA_VERSION \ - -Dversion=$SPARK_VERSION \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar mvn -B install:install-file \ @@ -85,17 +86,18 @@ mvn -B install:install-file \ -Dfile=$JARDIR/$ANNOTJAR \ -DgroupId=org.apache.spark \ -DartifactId=spark-annotation_$SCALA_VERSION \ - -Dversion=$SPARK_VERSION \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar mvn -B '-Pdatabricks,!snapshot-shims' clean package -DskipTests + # Copy so we pick up new built jar and latesty CuDF jar. Note that the jar names has to be # exactly what is in the staticly setup Databricks cluster we use. -echo "Copying rapids jars: dist/target/$RAPIDS_BUILT_JAR $DB_RAPIDS_JAR_LOC" -sudo cp dist/target/$RAPIDS_BUILT_JAR $DB_RAPIDS_JAR_LOC -echo "Copying cudf jars: $CUDF_JAR $DB_CUDF_JAR_LOC" -sudo cp $CUDF_JAR $DB_CUDF_JAR_LOC +echo "Copying rapids jars: dist/target/$RAPIDS_BUILT_JAR $DB_JAR_LOC" +sudo cp dist/target/$RAPIDS_BUILT_JAR $DB_JAR_LOC +echo "Copying cudf jars: $CUDF_JAR $DB_JAR_LOC" +sudo cp $CUDF_JAR $DB_JAR_LOC # tests export PATH=/databricks/conda/envs/databricks-ml-gpu/bin:/databricks/conda/condabin:$PATH diff --git a/jenkins/databricks/clusterutils.py b/jenkins/databricks/clusterutils.py new file mode 100644 index 00000000000..9f899a93ec9 --- /dev/null +++ b/jenkins/databricks/clusterutils.py @@ -0,0 +1,151 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import json +import time +import os +import requests +import sys + +class ClusterUtils(object): + + @staticmethod + def generate_create_templ(sshKey, cluster_name, runtime, idle_timeout, + num_workers, driver_node_type, worker_node_type, + printLoc=sys.stdout): + timeStr = str(int(time.time())) + uniq_name = cluster_name + "-" + timeStr + templ = {} + templ['cluster_name'] = uniq_name + print("cluster name is going to be %s" % uniq_name, file=printLoc) + templ['spark_version'] = runtime + templ['aws_attributes'] = { + "zone_id": "us-west-2a", + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + } + templ['autotermination_minutes'] = idle_timeout + templ['enable_elastic_disk'] = 'false' + templ['enable_local_disk_encryption'] = 'false' + templ['node_type_id'] = worker_node_type + templ['driver_node_type_id'] = driver_node_type + templ['ssh_public_keys'] = [ sshKey ] + templ['num_workers'] = num_workers + return templ + + + @staticmethod + def create_cluster(workspace, jsonCreateTempl, token, printLoc=sys.stdout): + resp = requests.post(workspace + "/api/2.0/clusters/create", headers={'Authorization': 'Bearer %s' % token}, json=jsonCreateTempl) + print("create response is %s" % resp.text, file=printLoc) + clusterid = resp.json()['cluster_id'] + print("cluster id is %s" % clusterid, file=printLoc) + return clusterid + + + @staticmethod + def wait_for_cluster_start(workspace, clusterid, token, retries=20, printLoc=sys.stdout): + p = 0 + waiting = True + jsonout = None + while waiting: + time.sleep(30) + jsonout = ClusterUtils.cluster_state(workspace, clusterid, token, printLoc=printLoc) + current_state = jsonout['state'] + print(clusterid + " state:" + current_state, file=printLoc) + if current_state in ['RUNNING']: + break + if current_state in ['INTERNAL_ERROR', 'SKIPPED', 'TERMINATED'] or p >= 20: + if p >= retries: + print("Waited %d times already, stopping" % p) + sys.exit(4) + p = p + 1 + print("Done starting cluster", file=printLoc) + return jsonout + + + @staticmethod + def is_cluster_running(jsonout): + current_state = jsonout['state'] + if current_state in ['RUNNING', 'RESIZING']: + return True + else: + return False + + + @staticmethod + def terminate_cluster(workspace, clusterid, token, printLoc=sys.stdout): + jsonout = ClusterUtils.cluster_state(workspace, clusterid, token, printLoc=printLoc) + if not ClusterUtils.is_cluster_unning(jsonout): + print("Cluster is not running", file=printLoc) + sys.exit(1) + + print("Stopping cluster: " + clusterid, file=printLoc) + resp = requests.post(workspace + "/api/2.0/clusters/delete", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) + print("stop response is %s" % resp.text, file=printLoc) + print("Done stopping cluster", file=printLoc) + + + @staticmethod + def delete_cluster(workspace, clusterid, token, printLoc=sys.stdout): + print("Deleting cluster: " + clusterid, file=printLoc) + resp = requests.post(workspace + "/api/2.0/clusters/permanent-delete", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) + print("delete response is %s" % resp.text, file=printLoc) + print("Done deleting cluster", file=printLoc) + + + @staticmethod + def start_existing_cluster(workspace, clusterid, token, printLoc=sys.stdout): + print("Starting cluster: " + clusterid, file=printLoc) + resp = requests.post(workspace + "/api/2.0/clusters/start", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) + print("start response is %s" % resp.text, file=printLoc) + + + @staticmethod + def cluster_state(workspace, clusterid, token, printLoc=sys.stdout): + clusterresp = requests.get(workspace + "/api/2.0/clusters/get?cluster_id=%s" % clusterid, headers={'Authorization': 'Bearer %s' % token}) + clusterjson = clusterresp.text + print("cluster response is %s" % clusterjson, file=printLoc) + jsonout = json.loads(clusterjson) + return jsonout + + + @staticmethod + def get_master_addr_from_json(jsonout): + master_addr = None + if ClusterUtils.is_cluster_running(jsonout): + driver = jsonout['driver'] + master_addr = driver["public_dns"] + return master_addr + + + @staticmethod + def cluster_list(workspace, token, printLoc=sys.stdout): + clusterresp = requests.get(workspace + "/api/2.0/clusters/list", headers={'Authorization': 'Bearer %s' % token}) + clusterjson = clusterresp.text + print("cluster list is %s" % clusterjson, file=printLoc) + jsonout = json.loads(clusterjson) + return jsonout + + + @staticmethod + def cluster_get_master_addr(workspace, clusterid, token, printLoc=sys.stdout): + jsonout = ClusterUtils.cluster_state(workspace, clusterid, token, printLoc=printLoc) + addr = ClusterUtils.get_master_addr_from_json(jsonout) + print("master addr is %s" % addr, file=printLoc) + return addr + diff --git a/jenkins/databricks/create.py b/jenkins/databricks/create.py new file mode 100644 index 00000000000..6e4429b80ee --- /dev/null +++ b/jenkins/databricks/create.py @@ -0,0 +1,96 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from clusterutils import ClusterUtils +import getopt +import sys + +# This scripts create and starts a Databricks cluster and waits for it to be running. +# +# The name parameter is meant to be a unique name used when creating the cluster. Note we +# append the epoch time to the end of it to help prevent collisions. +# +# Returns cluster id to stdout, all other logs default to stderr +# +# User is responsible for removing cluster if a failure or when done with cluster. +def main(): + workspace = 'https://dbc-9ff9942e-a9c4.cloud.databricks.com' + token = '' + sshkey = '' + cluster_name = 'CI-GPU-databricks-0.3.0-SNAPSHOT' + idletime = 240 + runtime = '7.0.x-gpu-ml-scala2.12' + num_workers = 1 + worker_type = 'g4dn.xlarge' + driver_type = 'g4dn.xlarge' + + try: + opts, args = getopt.getopt(sys.argv[1:], 'hw:t:k:n:i:r:o:d:e:', + ['workspace=', 'token=', 'sshkey=', 'clustername=', 'idletime=', + 'runtime=', 'workertype=', 'drivertype=', 'numworkers=']) + except getopt.GetoptError: + print( + 'create.py -w -t -k -n -i -r -o -d -e ') + sys.exit(2) + + for opt, arg in opts: + if opt == '-h': + print( + 'create.py -w -t -k -n -i -r -o -d -e ') + sys.exit() + elif opt in ('-w', '--workspace'): + workspace = arg + elif opt in ('-t', '--token'): + token = arg + elif opt in ('-k', '--sshkey'): + sshkey = arg + elif opt in ('-n', '--clustername'): + cluster_name = arg + elif opt in ('-i', '--idletime'): + idletime = arg + elif opt in ('-r', '--runtime'): + runtime = arg + elif opt in ('-o', '--workertype'): + worker_type = arg + elif opt in ('-d', '--drivertype'): + driver_type = arg + elif opt in ('-e', '--numworkers'): + num_workers = arg + + print('-w is ' + workspace, file=sys.stderr) + print('-k is ' + sshkey, file=sys.stderr) + print('-n is ' + cluster_name, file=sys.stderr) + print('-i is ' + str(idletime), file=sys.stderr) + print('-r is ' + runtime, file=sys.stderr) + print('-o is ' + worker_type, file=sys.stderr) + print('-d is ' + driver_type, file=sys.stderr) + print('-e is ' + str(num_workers), file=sys.stderr) + + if not sshkey: + print("You must specify an sshkey!", file=sys.stderr) + sys.exit(2) + + if not token: + print("You must specify an token!", file=sys.stderr) + sys.exit(2) + + templ = ClusterUtils.generate_create_templ(sshkey, cluster_name, runtime, idletime, + num_workers, driver_type, worker_type, printLoc=sys.stderr) + clusterid = ClusterUtils.create_cluster(workspace, templ, token, printLoc=sys.stderr) + ClusterUtils.wait_for_cluster_start(workspace, clusterid, token, printLoc=sys.stderr) + + # only print the clusterid to stdout so a calling script can get it easily + print(clusterid, file=sys.stdout) + +if __name__ == '__main__': + main() diff --git a/jenkins/databricks/run-tests.py b/jenkins/databricks/run-tests.py index 8b4bd4ba753..7d94ee235bf 100644 --- a/jenkins/databricks/run-tests.py +++ b/jenkins/databricks/run-tests.py @@ -18,48 +18,34 @@ import time import os import subprocess +from clusterutils import ClusterUtils -def cluster_state(workspace, clusterid, token): - clusterresp = requests.get(workspace + "/api/2.0/clusters/get?cluster_id=%s" % clusterid, headers={'Authorization': 'Bearer %s' % token}) - clusterjson = clusterresp.text - print("cluster response is %s" % clusterjson) - jsonout = json.loads(clusterjson) - return jsonout - -def get_master_addr(jsonout): - current_state = jsonout['state'] - if current_state in ['RUNNING']: - driver = jsonout['driver'] - master_addr = driver["public_dns"] - return master_addr - else: - return None def main(): workspace = 'https://dbc-9ff9942e-a9c4.cloud.databricks.com' token = '' - clusterid = '0617-140138-umiak14' private_key_file = "~/.ssh/id_rsa" - skip_start = None local_script = 'build.sh' script_dest = '/home/ubuntu/build.sh' source_tgz = 'spark-rapids-ci.tgz' tgz_dest = '/home/ubuntu/spark-rapids-ci.tgz' ci_rapids_jar = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' - db_version = '0.1-databricks-SNAPSHOT' + # the plugin version to use for the jar we build against databricks + db_version = '0.3.0-SNAPSHOT' scala_version = '2.12' spark_version = '3.0.0' cudf_version = '0.16-SNAPSHOT' cuda_version = 'cuda10-1' ci_cudf_jar = 'cudf-0.14-cuda10-1.jar' base_spark_pom_version = '3.0.0' + clusterid = '' try: - opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:p:l:nd:z:j:b:k:a:f:u:m:v:', - ['workspace=', 'token=', 'clusterid=', 'private=', 'nostart=', 'localscript=', 'dest=', 'sparktgz=', 'cirapidsjar=', 'databricksversion=', 'sparkversion=', 'scalaversion=', 'cudfversion=', 'cudaversion=', 'cicudfjar=', 'basesparkpomversion=']) + opts, args = getopt.getopt(sys.argv[1:], 'hw:t:c:p:l:d:z:j:b:k:a:f:u:m:v:', + ['workspace=', 'token=', 'clusterid=', 'private=', 'localscript=', 'dest=', 'sparktgz=', 'cirapidsjar=', 'databricksversion=', 'sparkversion=', 'scalaversion=', 'cudfversion=', 'cudaversion=', 'cicudfjar=', 'basesparkpomversion=']) except getopt.GetoptError: print( - 'run-tests.py -s -t -c -p -n -l -d -z -j -b -k -a -f -u -m -v ') + 'run-tests.py -s -t -c -p -l -d -z -j -b -k -a -f -u -m -v ') sys.exit(2) for opt, arg in opts: @@ -67,7 +53,7 @@ def main(): print( 'run-tests.py -s -t -c -p -n -l -d , -z -j -b -k -a -f -u -m -v ') sys.exit() - elif opt in ('-s', '--workspace'): + elif opt in ('-w', '--workspace'): workspace = arg elif opt in ('-t', '--token'): token = arg @@ -75,8 +61,6 @@ def main(): clusterid = arg elif opt in ('-p', '--private'): private_key_file = arg - elif opt in ('-n', '--nostart'): - skip_start = arg elif opt in ('-l', '--localscript'): local_script = arg elif opt in ('-d', '--dest'): @@ -100,13 +84,9 @@ def main(): elif opt in ('-v', '--basesparkpomversion'): base_spark_pom_version = arg - print('-s is ' + workspace) + print('-w is ' + workspace) print('-c is ' + clusterid) print('-p is ' + private_key_file) - if skip_start is not None: - print("-n: skip start") - else: - print("-n: don't skip start") print('-l is ' + local_script) print('-d is ' + script_dest) print('-z is ' + source_tgz) @@ -119,41 +99,10 @@ def main(): print('-m is ' + ci_cudf_jar) print('-v is ' + base_spark_pom_version) - if skip_start is None: - jsonout = cluster_state(workspace, clusterid, token) - current_state = jsonout['state'] - if current_state in ['RUNNING']: - print("Cluster is already running - perhaps build/tests already running?") - sys.exit(3) - - print("Starting cluster: " + clusterid) - resp = requests.post(workspace + "/api/2.0/clusters/start", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) - print("start response is %s" % resp.text) - p = 0 - waiting = True - master_addr = None - while waiting: - time.sleep(30) - jsonout = cluster_state(workspace, clusterid, token) - current_state = jsonout['state'] - print(clusterid + " state:" + current_state) - if current_state in ['RUNNING']: - master_addr = get_master_addr(jsonout) - break - if current_state in ['INTERNAL_ERROR', 'SKIPPED', 'TERMINATED'] or p >= 20: - if p >= 20: - print("Waited %d times already, stopping" % p) - sys.exit(4) - p = p + 1 - - print("Done starting cluster") - else: - jsonout = cluster_state(workspace, clusterid, token) - master_addr = get_master_addr(jsonout) - + master_addr = ClusterUtils.cluster_get_master_addr(workspace, clusterid, token) if master_addr is None: print("Error, didn't get master address") - sys.exit(5) + sys.exit(1) print("Master node address is: %s" % master_addr) print("Copying script") rsync_command = "rsync -I -Pave \"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -p 2200 -i %s\" %s ubuntu@%s:%s" % (private_key_file, local_script, master_addr, script_dest) diff --git a/jenkins/databricks/shutdown.py b/jenkins/databricks/shutdown.py index 54252e56026..654594ae53a 100644 --- a/jenkins/databricks/shutdown.py +++ b/jenkins/databricks/shutdown.py @@ -11,28 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json -import requests -import sys +from clusterutils import ClusterUtils import getopt -import time -import os - -def cluster_state(workspace, clusterid, token): - clusterresp = requests.get(workspace + "/api/2.0/clusters/get?cluster_id=%s" % clusterid, headers={'Authorization': 'Bearer %s' % token}) - clusterjson = clusterresp.text - print("cluster response is %s" % clusterjson) - jsonout = json.loads(clusterjson) - return jsonout +import sys +# shutdown or delete a databricks cluster def main(): workspace = 'https://dbc-9ff9942e-a9c4.cloud.databricks.com' token = '' clusterid = '0617-140138-umiak14' + delete = False try: - opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:', - ['workspace=', 'token=', 'clusterid=']) + opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:d', + ['workspace=', 'token=', 'clusterid=', 'delete']) except getopt.GetoptError: print( 'shutdown.py -s -t -c ') @@ -49,20 +41,24 @@ def main(): token = arg elif opt in ('-c', '--clusterid'): clusterid = arg + elif opt in ('-d', '--delete'): + delete = True print('-s is ' + workspace) print('-c is ' + clusterid) - jsonout = cluster_state(workspace, clusterid, token) - current_state = jsonout['state'] - if current_state not in ['RUNNING', 'RESIZING']: - print("Cluster is not running") + if not clusterid: + print("You must specify clusterid!") + sys.exit(1) + + if not token: + print("You must specify token!") sys.exit(1) - print("Stopping cluster: " + clusterid) - resp = requests.post(workspace + "/api/2.0/clusters/delete", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) - print("stop response is %s" % resp.text) - print("Done stopping cluster") + if delete: + ClusterUtils.delete_cluster(workspace, clusterid, token) + else: + ClusterUtils.terminate_cluster(workspace, clusterid, token) if __name__ == '__main__': main()