Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dbconnect progress #1355

Merged
merged 7 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion databricks-vscode.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
},
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
}
},
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------"
}
}
8 changes: 6 additions & 2 deletions packages/databricks-vscode/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,6 @@
"views.workspace"
],
"enumDescriptions": [
"Limited local notebook support using DB Connect v2.",
"Show cluster view in the explorer.",
"Show workspace browser in the explorer."
],
Expand All @@ -870,6 +869,11 @@
"default": true,
"description": "Enable/disable rearranging cells in wrapper files created when using `workspace` as the sync destination. **Note:** It is recommended to NOT disable this setting. If you do disable it, you will need to manually handle sys.path for local imports in your notebooks."
},
"databricks.connect.progress": {
"type": "boolean",
"default": true,
"description": "Show PySpark progress bar when using Databricks Connect."
},
"databricks.ipythonDir": {
"type": "string",
"description": "Absolute path to a directory for storing IPython files. Defaults to IPYTHONDIR environment variable (if set) or ~/.ipython."
Expand Down Expand Up @@ -1008,4 +1012,4 @@
],
"report-dir": "coverage"
}
}
}
111 changes: 110 additions & 1 deletion packages/databricks-vscode/resources/python/00-databricks-init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
from typing import Any, Union, List
import os
import time
import shlex
import warnings
import tempfile
Expand All @@ -24,6 +25,7 @@ def logError(function_name: str, e: Union[str, Exception]):

try:
from IPython import get_ipython
from IPython.display import display
from IPython.core.magic import magics_class, Magics, line_magic, needs_local_scope
except Exception as e:
logError("Ipython Imports", e)
Expand Down Expand Up @@ -101,7 +103,13 @@ def __init__(self, env_name: str, default: any = None, required: bool = False):

def __get__(self, instance, owner):
if self.env_name in os.environ:
return self.transform(os.environ[self.env_name])
if self.transform is not bool:
return self.transform(os.environ[self.env_name])

if os.environ[self.env_name].lower() == "true" or os.environ[self.env_name] == "1":
return True
elif os.environ[self.env_name].lower() == "false" or os.environ[self.env_name] == "0":
return False

if self.required:
raise AttributeError(
Expand All @@ -117,6 +125,7 @@ def __set__(self, instance, value):
class LocalDatabricksNotebookConfig:
project_root: str = EnvLoader("DATABRICKS_PROJECT_ROOT", required=True)
dataframe_display_limit: int = EnvLoader("DATABRICKS_DF_DISPLAY_LIMIT", 20)
show_progress: bool = EnvLoader("SPARK_CONNECT_PROGRESS_BAR_ENABLED", default=False)

def __new__(cls):
annotations = cls.__dict__['__annotations__']
Expand Down Expand Up @@ -357,6 +366,99 @@ def df_html(df):
html_formatter.for_type(SparkConnectDataframe, df_html)
html_formatter.for_type(DataFrame, df_html)

@logErrorAndContinue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will show a message saying notebook init failed. We can skip showing that message by not logging error here.

@disposable
def register_spark_progress(spark, show_progress: bool):
try:
import ipywidgets as widgets
except Exception as e:
return

class Progress:
SI_BYTE_SIZES = (1 << 60, 1 << 50, 1 << 40, 1 << 30, 1 << 20, 1 << 10, 1)
SI_BYTE_SUFFIXES = ("EiB", "PiB", "TiB", "GiB", "MiB", "KiB", "B")

def __init__(
self
) -> None:
self._ticks = None
self._tick = None
self._started = time.time()
self._bytes_read = 0
self._running = 0
self.init_ui()

def init_ui(self):
self.w_progress = widgets.IntProgress(
value=0,
min=0,
max=100,
bar_style='success',
orientation='horizontal'
)
self.w_status = widgets.Label(value="")
if show_progress:
display(widgets.HBox([self.w_progress, self.w_status]))

def update_ticks(
self,
stages,
inflight_tasks: int
) -> None:
total_tasks = sum(map(lambda x: x.num_tasks, stages))
completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages))
if total_tasks > 0:
self._ticks = total_tasks
self._tick = completed_tasks
self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages))
if self._tick is not None and self._tick >= 0:
self.output()
self._running = inflight_tasks
Comment on lines +414 to +416
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self._tick is not None and self._tick >= 0:
self.output()
self._running = inflight_tasks
self._running = inflight_tasks
self.output()


def output(self) -> None:
if self._tick is not None and self._ticks is not None:
percent_complete = (self._tick / self._ticks) * 100
elapsed = int(time.time() - self._started)
scanned = self._bytes_to_string(self._bytes_read)
running = self._running
self.w_progress.value = percent_complete
self.w_status.value = f"{percent_complete:.2f}% Complete ({running} Tasks running, {elapsed}s, Scanned {scanned})"

@staticmethod
def _bytes_to_string(size: int) -> str:
"""Helper method to convert a numeric bytes value into a human-readable representation"""
i = 0
while i < len(Progress.SI_BYTE_SIZES) - 1 and size < 2 * Progress.SI_BYTE_SIZES[i]:
i += 1
result = float(size) / Progress.SI_BYTE_SIZES[i]
return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}"


class ProgressHandler:
def __init__(self):
self.op_id = ""

def reset(self):
self.p = Progress()

def __call__(self,
stages,
inflight_tasks: int,
operation_id,
done: bool
):
if len(stages) == 0:
return

if self.op_id != operation_id:
self.op_id = operation_id
self.reset()

self.p.update_ticks(stages, inflight_tasks)

spark.clearProgressHandlers()
spark.registerProgressHandler(ProgressHandler())


@logErrorAndContinue
@disposable
Expand All @@ -382,9 +484,16 @@ def make_matplotlib_inline():
if not load_env_from_leaf(os.getcwd()):
sys.exit(1)
cfg = LocalDatabricksNotebookConfig()

# disable build-in progress bar
show_progress = cfg.show_progress
if "SPARK_CONNECT_PROGRESS_BAR_ENABLED" in os.environ:
del os.environ["SPARK_CONNECT_PROGRESS_BAR_ENABLED"]

create_and_register_databricks_globals()
register_magics(cfg)
register_formatters(cfg)
register_spark_progress(globals()["spark"], show_progress)
update_sys_path(cfg)
make_matplotlib_inline()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class DatabricksEnvFileManager implements Disposable {
private userEnvFileWatcherDisposables: Disposable[] = [];
private mutex = new Mutex();
private userEnvPath?: Uri;
private showDatabricksConnectProgess = true;

get databricksEnvPath() {
return Uri.joinPath(
Expand All @@ -44,7 +45,23 @@ export class DatabricksEnvFileManager implements Disposable {
private readonly featureManager: FeatureManager,
private readonly connectionManager: ConnectionManager,
private readonly configModel: ConfigModel
) {}
) {
this.showDatabricksConnectProgess =
workspaceConfigs.showDatabricksConnectProgress;
}

private async updateShowDatabricksConnectProgessWatcher() {
if (
this.showDatabricksConnectProgess ===
workspaceConfigs.showDatabricksConnectProgress
) {
return;
}

this.showDatabricksConnectProgess =
workspaceConfigs.showDatabricksConnectProgress;
await this.writeFile();
}

private updateUserEnvFileWatcher() {
const userEnvPath = workspaceConfigs.msPythonEnvFile
Expand Down Expand Up @@ -111,6 +128,11 @@ export class DatabricksEnvFileManager implements Disposable {
this,
this.disposables
),
workspace.onDidChangeConfiguration(
this.updateShowDatabricksConnectProgessWatcher,
this,
this.disposables
),
this.featureManager.onDidChangeState(
"environment.dependencies",
() => {
Expand Down Expand Up @@ -160,7 +182,8 @@ export class DatabricksEnvFileManager implements Disposable {
...(this.getDatabrickseEnvVars() || {}),
...((await EnvVarGenerators.getDbConnectEnvVars(
this.connectionManager,
this.workspacePath
this.workspacePath,
this.showDatabricksConnectProgess
)) || {}),
...this.getIdeEnvVars(),
...((await this.getUserEnvVars()) || {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ export function getSimplifiedRunState(run?: Run): SimplifiedRunState {
}
return "Terminated";
}

return "Unknown";
}
16 changes: 12 additions & 4 deletions packages/databricks-vscode/src/utils/envVarGenerators.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ describe(__filename, () => {

const actual = await getDbConnectEnvVars(
instance(mockConnectionManager),
mockWorkspacePath
mockWorkspacePath,
true
);

assert.deepEqual(actual, {
SPARK_CONNECT_USER_AGENT: "test/0.0.1",
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
});
});

Expand All @@ -118,12 +120,14 @@ describe(__filename, () => {

const actual = await getDbConnectEnvVars(
instance(mockConnectionManager),
mockWorkspacePath
mockWorkspacePath,
true
);

assert.deepEqual(actual, {
SPARK_CONNECT_USER_AGENT: "existing test/0.0.1",
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
});
});

Expand All @@ -139,12 +143,14 @@ describe(__filename, () => {

const actual = await getDbConnectEnvVars(
instance(mockConnectionManager),
mockWorkspacePath
mockWorkspacePath,
true
);

assert.deepEqual(actual, {
SPARK_CONNECT_USER_AGENT: "test/0.0.1",
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
SPARK_REMOTE: `sc://${
Uri.parse(mockHost).authority
}:443/;token=token;use_ssl=true;x-databricks-cluster-id=${mockClusterId}`,
Expand All @@ -163,12 +169,14 @@ describe(__filename, () => {

const actual = await getDbConnectEnvVars(
instance(mockConnectionManager),
mockWorkspacePath
mockWorkspacePath,
true
);

assert.deepEqual(actual, {
SPARK_CONNECT_USER_AGENT: "test/0.0.1",
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
});
});
});
Expand Down
7 changes: 6 additions & 1 deletion packages/databricks-vscode/src/utils/envVarGenerators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,20 @@ async function getSparkRemoteEnvVar(connectionManager: ConnectionManager) {

export async function getDbConnectEnvVars(
connectionManager: ConnectionManager,
workspacePath: Uri
workspacePath: Uri,
showDatabricksConnectProgess: boolean
) {
const userAgent = getUserAgent(connectionManager);
const existingSparkUa = process.env.SPARK_CONNECT_USER_AGENT ?? "";

/* eslint-disable @typescript-eslint/naming-convention */
return {
//We append our user agent to any existing SPARK_CONNECT_USER_AGENT defined in the
//environment of the parent process of VS Code.
SPARK_CONNECT_USER_AGENT: [existingSparkUa, userAgent].join(" ").trim(),
SPARK_CONNECT_PROGRESS_BAR_ENABLED: showDatabricksConnectProgess
? "1"
: "0",
DATABRICKS_PROJECT_ROOT: workspacePath.fsPath,
...((await getSparkRemoteEnvVar(connectionManager)) || {}),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ export const workspaceConfigs = {
);
},

get showDatabricksConnectProgress(): boolean {
return (
workspace
.getConfiguration("databricks")
.get<boolean>("connect.progress") ?? true
);
},

get ipythonDir(): string | undefined {
const dir = workspace
.getConfiguration("databricks")
Expand Down
Loading