forked from NVIDIA/spark-rapids
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change databricks build to dynamically create a cluster (NVIDIA#981)
* Add some more checks to databricks build scripts Signed-off-by: Thomas Graves <tgraves@nvidia.com> * remove extra newline * use the right -gt for bash * Add new python file for databricks cluster utils * Fix up scripts * databricks scripts working Signed-off-by: Thomas Graves <tgraves@nvidia.com> * Pass in sshkey Signed-off-by: Thomas Graves <tgraves@nvidia.com> * cluster creation script mods * fix * fix pub key * fix missing quote * fix $ * update public key to be param Signed-off-by: Thomas Graves <tgraves@nvidia.com> * Add public key value * clenaup Signed-off-by: Thomas Graves <tgraves@nvidia.com> * modify permissions Signed-off-by: Thomas Graves <tgraves@nvidia.com> * change loc cluster id file * fix extra / * quote public key * try different setting cluster id * debug * try again * try readfile * try again * try quotes * cleanup * Add option to control number of partitions when converting from CSV to Parquet (NVIDIA#915) * Add command-line arguments for applying coalesce and repartition on a per-table basis Signed-off-by: Andy Grove <andygrove@nvidia.com> * Move command-line validation logic and address other feedback Signed-off-by: Andy Grove <andygrove@nvidia.com> * Update copyright years and fix import order Signed-off-by: Andy Grove <andygrove@nvidia.com> * Update docs/benchmarks.md Co-authored-by: Jason Lowe <jlowe@nvidia.com> * Remove withPartitioning option from TPC-H and TPC-xBB file conversion Signed-off-by: Andy Grove <andygrove@nvidia.com> Co-authored-by: Jason Lowe <jlowe@nvidia.com> * Benchmark runner script (NVIDIA#918) * Benchmark runner script Signed-off-by: Andy Grove <andygrove@nvidia.com> * Add argument for number of iterations Signed-off-by: Andy Grove <andygrove@nvidia.com> * Fix docs Signed-off-by: Andy Grove <andygrove@nvidia.com> * add license Signed-off-by: Andy Grove <andygrove@nvidia.com> * improve documentation for the configuration files Signed-off-by: Andy Grove <andygrove@nvidia.com> * Add missing line-continuation symbol in example Signed-off-by: Andy Grove <andygrove@nvidia.com> * Remove hard-coded spark-submit-template.txt and add --template argument. Also make all arguments required. Signed-off-by: Andy Grove <andygrove@nvidia.com> * Update benchmarking guide to link to the benchmark python script Signed-off-by: Andy Grove <andygrove@nvidia.com> * Add --template to example and fix markdown header Signed-off-by: Andy Grove <andygrove@nvidia.com> * Add legacy config to clear active Spark 3.1.0 session in tests (NVIDIA#970) Signed-off-by: Jason Lowe <jlowe@nvidia.com> * XFail tests until final fix can be put in (NVIDIA#968) Signed-off-by: Robert (Bobby) Evans <bobby@apache.org> * Stop reporting totalTime metric for GpuShuffleExchangeExec (NVIDIA#973) Signed-off-by: Andy Grove <andygrove@nvidia.com> * Add some more checks to databricks build scripts Signed-off-by: Thomas Graves <tgraves@nvidia.com> * Pass in sshkey * Add create script, add more parameters, etc Signed-off-by: Thomas Graves <tgraves@nvidia.com> * add create script * rework some scripts Signed-off-by: Thomas Graves <tgraves@nvidia.com> * fix is_cluster_running Signed-off-by: Thomas Graves <tgraves@nvidia.com> * put slack back in * update text * cleanup Signed-off-by: Thomas Graves <tgraves@nvidia.com> * remove datetime * send output to stderr Signed-off-by: Thomas Graves <tgraves@nvidia.com> Co-authored-by: Andy Grove <andygrove@users.noreply.github.com> Co-authored-by: Jason Lowe <jlowe@nvidia.com> Co-authored-by: Robert (Bobby) Evans <bobby@apache.org>
- Loading branch information
1 parent
6626ff1
commit 9304dc6
Showing
6 changed files
with
311 additions
and
107 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
# Copyright (c) 2020, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import time | ||
import json | ||
import time | ||
import os | ||
import requests | ||
import sys | ||
|
||
class ClusterUtils(object): | ||
|
||
@staticmethod | ||
def generate_create_templ(sshKey, cluster_name, runtime, idle_timeout, | ||
num_workers, driver_node_type, worker_node_type, | ||
printLoc=sys.stdout): | ||
timeStr = str(int(time.time())) | ||
uniq_name = cluster_name + "-" + timeStr | ||
templ = {} | ||
templ['cluster_name'] = uniq_name | ||
print("cluster name is going to be %s" % uniq_name, file=printLoc) | ||
templ['spark_version'] = runtime | ||
templ['aws_attributes'] = { | ||
"zone_id": "us-west-2a", | ||
"first_on_demand": 1, | ||
"availability": "SPOT_WITH_FALLBACK", | ||
"spot_bid_price_percent": 100, | ||
"ebs_volume_count": 0 | ||
} | ||
templ['autotermination_minutes'] = idle_timeout | ||
templ['enable_elastic_disk'] = 'false' | ||
templ['enable_local_disk_encryption'] = 'false' | ||
templ['node_type_id'] = worker_node_type | ||
templ['driver_node_type_id'] = driver_node_type | ||
templ['ssh_public_keys'] = [ sshKey ] | ||
templ['num_workers'] = num_workers | ||
return templ | ||
|
||
|
||
@staticmethod | ||
def create_cluster(workspace, jsonCreateTempl, token, printLoc=sys.stdout): | ||
resp = requests.post(workspace + "/api/2.0/clusters/create", headers={'Authorization': 'Bearer %s' % token}, json=jsonCreateTempl) | ||
print("create response is %s" % resp.text, file=printLoc) | ||
clusterid = resp.json()['cluster_id'] | ||
print("cluster id is %s" % clusterid, file=printLoc) | ||
return clusterid | ||
|
||
|
||
@staticmethod | ||
def wait_for_cluster_start(workspace, clusterid, token, retries=20, printLoc=sys.stdout): | ||
p = 0 | ||
waiting = True | ||
jsonout = None | ||
while waiting: | ||
time.sleep(30) | ||
jsonout = ClusterUtils.cluster_state(workspace, clusterid, token, printLoc=printLoc) | ||
current_state = jsonout['state'] | ||
print(clusterid + " state:" + current_state, file=printLoc) | ||
if current_state in ['RUNNING']: | ||
break | ||
if current_state in ['INTERNAL_ERROR', 'SKIPPED', 'TERMINATED'] or p >= 20: | ||
if p >= retries: | ||
print("Waited %d times already, stopping" % p) | ||
sys.exit(4) | ||
p = p + 1 | ||
print("Done starting cluster", file=printLoc) | ||
return jsonout | ||
|
||
|
||
@staticmethod | ||
def is_cluster_running(jsonout): | ||
current_state = jsonout['state'] | ||
if current_state in ['RUNNING', 'RESIZING']: | ||
return True | ||
else: | ||
return False | ||
|
||
|
||
@staticmethod | ||
def terminate_cluster(workspace, clusterid, token, printLoc=sys.stdout): | ||
jsonout = ClusterUtils.cluster_state(workspace, clusterid, token, printLoc=printLoc) | ||
if not ClusterUtils.is_cluster_unning(jsonout): | ||
print("Cluster is not running", file=printLoc) | ||
sys.exit(1) | ||
|
||
print("Stopping cluster: " + clusterid, file=printLoc) | ||
resp = requests.post(workspace + "/api/2.0/clusters/delete", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) | ||
print("stop response is %s" % resp.text, file=printLoc) | ||
print("Done stopping cluster", file=printLoc) | ||
|
||
|
||
@staticmethod | ||
def delete_cluster(workspace, clusterid, token, printLoc=sys.stdout): | ||
print("Deleting cluster: " + clusterid, file=printLoc) | ||
resp = requests.post(workspace + "/api/2.0/clusters/permanent-delete", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) | ||
print("delete response is %s" % resp.text, file=printLoc) | ||
print("Done deleting cluster", file=printLoc) | ||
|
||
|
||
@staticmethod | ||
def start_existing_cluster(workspace, clusterid, token, printLoc=sys.stdout): | ||
print("Starting cluster: " + clusterid, file=printLoc) | ||
resp = requests.post(workspace + "/api/2.0/clusters/start", headers={'Authorization': 'Bearer %s' % token}, json={'cluster_id': clusterid}) | ||
print("start response is %s" % resp.text, file=printLoc) | ||
|
||
|
||
@staticmethod | ||
def cluster_state(workspace, clusterid, token, printLoc=sys.stdout): | ||
clusterresp = requests.get(workspace + "/api/2.0/clusters/get?cluster_id=%s" % clusterid, headers={'Authorization': 'Bearer %s' % token}) | ||
clusterjson = clusterresp.text | ||
print("cluster response is %s" % clusterjson, file=printLoc) | ||
jsonout = json.loads(clusterjson) | ||
return jsonout | ||
|
||
|
||
@staticmethod | ||
def get_master_addr_from_json(jsonout): | ||
master_addr = None | ||
if ClusterUtils.is_cluster_running(jsonout): | ||
driver = jsonout['driver'] | ||
master_addr = driver["public_dns"] | ||
return master_addr | ||
|
||
|
||
@staticmethod | ||
def cluster_list(workspace, token, printLoc=sys.stdout): | ||
clusterresp = requests.get(workspace + "/api/2.0/clusters/list", headers={'Authorization': 'Bearer %s' % token}) | ||
clusterjson = clusterresp.text | ||
print("cluster list is %s" % clusterjson, file=printLoc) | ||
jsonout = json.loads(clusterjson) | ||
return jsonout | ||
|
||
|
||
@staticmethod | ||
def cluster_get_master_addr(workspace, clusterid, token, printLoc=sys.stdout): | ||
jsonout = ClusterUtils.cluster_state(workspace, clusterid, token, printLoc=printLoc) | ||
addr = ClusterUtils.get_master_addr_from_json(jsonout) | ||
print("master addr is %s" % addr, file=printLoc) | ||
return addr | ||
|
Oops, something went wrong.