From 31ff6996feb1f390c0a825455346fb374722faf7 Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Mon, 16 Sep 2024 11:08:43 +0200 Subject: [PATCH 1/6] Inline code from pyspark --- .../resources/python/00-databricks-init.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/packages/databricks-vscode/resources/python/00-databricks-init.py b/packages/databricks-vscode/resources/python/00-databricks-init.py index 3292b43f5..15541b95a 100644 --- a/packages/databricks-vscode/resources/python/00-databricks-init.py +++ b/packages/databricks-vscode/resources/python/00-databricks-init.py @@ -379,7 +379,11 @@ class Progress: SI_BYTE_SUFFIXES = ("EiB", "PiB", "TiB", "GiB", "MiB", "KiB", "B") def __init__( +<<<<<<< HEAD self +======= + self, +>>>>>>> b1ceb52 (Inline code from pyspark) ) -> None: self._ticks = None self._tick = None @@ -434,6 +438,47 @@ def _bytes_to_string(size: int) -> str: return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}" + class ProgressHandler: + def __init__(self): + self.op_id = "" + + def reset(self): + self.p = Progress() + + def update_ticks( + self, + stages, + inflight_tasks: int + ) -> None: + total_tasks = sum(map(lambda x: x.num_tasks, stages)) + completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages)) + if total_tasks > 0: + self._ticks = total_tasks + self._tick = completed_tasks + self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages)) + if self._tick is not None and self._tick >= 0: + self.output() + self._running = inflight_tasks + + def output(self) -> None: + if self._tick is not None and self._ticks is not None: + percent_complete = (self._tick / self._ticks) * 100 + elapsed = int(time.time() - self._started) + scanned = self._bytes_to_string(self._bytes_read) + running = self._running + self.w_progress.value = percent_complete + self.w_status.value = f"{percent_complete:.2f}% Complete ({running} Tasks running, {elapsed}s, Scanned {scanned})" + + @staticmethod + def _bytes_to_string(size: int) -> str: + """Helper method to convert a numeric bytes value into a human-readable representation""" + i = 0 + while i < len(Progress.SI_BYTE_SIZES) - 1 and size < 2 * Progress.SI_BYTE_SIZES[i]: + i += 1 + result = float(size) / Progress.SI_BYTE_SIZES[i] + return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}" + + class ProgressHandler: def __init__(self): self.op_id = "" From ea9d9995b59f45090d63575352b761e5b43a0dbb Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Mon, 16 Sep 2024 11:19:23 +0200 Subject: [PATCH 2/6] Add option to disable progress through an environment variable --- .../databricks-vscode/resources/python/00-databricks-init.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/databricks-vscode/resources/python/00-databricks-init.py b/packages/databricks-vscode/resources/python/00-databricks-init.py index 15541b95a..11c6cd02e 100644 --- a/packages/databricks-vscode/resources/python/00-databricks-init.py +++ b/packages/databricks-vscode/resources/python/00-databricks-init.py @@ -379,11 +379,7 @@ class Progress: SI_BYTE_SUFFIXES = ("EiB", "PiB", "TiB", "GiB", "MiB", "KiB", "B") def __init__( -<<<<<<< HEAD self -======= - self, ->>>>>>> b1ceb52 (Inline code from pyspark) ) -> None: self._ticks = None self._tick = None From 914a97807b7b85b984cc388973d1dbe5bb5032eb Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Mon, 16 Sep 2024 14:15:00 +0200 Subject: [PATCH 3/6] Expose db connect progress as a settings --- .../databricks-vscode/resources/python/00-databricks-init.py | 3 ++- packages/databricks-vscode/src/utils/envVarGenerators.ts | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/databricks-vscode/resources/python/00-databricks-init.py b/packages/databricks-vscode/resources/python/00-databricks-init.py index 11c6cd02e..e16f1502e 100644 --- a/packages/databricks-vscode/resources/python/00-databricks-init.py +++ b/packages/databricks-vscode/resources/python/00-databricks-init.py @@ -386,6 +386,7 @@ def __init__( self._started = time.time() self._bytes_read = 0 self._running = 0 + self.show_progress = cfg.show_progress self.init_ui() def init_ui(self): @@ -480,7 +481,7 @@ def __init__(self): self.op_id = "" def reset(self): - self.p = Progress() + self.p = Progress(cfg) def __call__(self, stages, diff --git a/packages/databricks-vscode/src/utils/envVarGenerators.ts b/packages/databricks-vscode/src/utils/envVarGenerators.ts index 3160e0030..e119b6890 100644 --- a/packages/databricks-vscode/src/utils/envVarGenerators.ts +++ b/packages/databricks-vscode/src/utils/envVarGenerators.ts @@ -5,6 +5,7 @@ import {logging, Headers} from "@databricks/databricks-sdk"; import {ConnectionManager} from "../configuration/ConnectionManager"; import {ConfigModel} from "../configuration/models/ConfigModel"; import {TerraformMetadata} from "./terraformUtils"; +import {workspaceConfigs} from "../vscode-objs/WorkspaceConfigs"; // eslint-disable-next-line @typescript-eslint/no-var-requires const packageJson = require("../../package.json"); From 44200c7706f55dc50d75aea327ea1e89d052769c Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Mon, 16 Sep 2024 14:45:47 +0200 Subject: [PATCH 4/6] Reload ipy kernel on settings change --- packages/databricks-vscode/src/utils/envVarGenerators.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/databricks-vscode/src/utils/envVarGenerators.ts b/packages/databricks-vscode/src/utils/envVarGenerators.ts index e119b6890..3160e0030 100644 --- a/packages/databricks-vscode/src/utils/envVarGenerators.ts +++ b/packages/databricks-vscode/src/utils/envVarGenerators.ts @@ -5,7 +5,6 @@ import {logging, Headers} from "@databricks/databricks-sdk"; import {ConnectionManager} from "../configuration/ConnectionManager"; import {ConfigModel} from "../configuration/models/ConfigModel"; import {TerraformMetadata} from "./terraformUtils"; -import {workspaceConfigs} from "../vscode-objs/WorkspaceConfigs"; // eslint-disable-next-line @typescript-eslint/no-var-requires const packageJson = require("../../package.json"); From 37d1458a258247831a5de58b71d6ecd9c492ee38 Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Tue, 17 Sep 2024 10:20:24 +0200 Subject: [PATCH 5/6] Also show progress for running python files --- .../databricks-vscode/resources/python/00-databricks-init.py | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/databricks-vscode/resources/python/00-databricks-init.py b/packages/databricks-vscode/resources/python/00-databricks-init.py index e16f1502e..0883fc598 100644 --- a/packages/databricks-vscode/resources/python/00-databricks-init.py +++ b/packages/databricks-vscode/resources/python/00-databricks-init.py @@ -386,7 +386,6 @@ def __init__( self._started = time.time() self._bytes_read = 0 self._running = 0 - self.show_progress = cfg.show_progress self.init_ui() def init_ui(self): From afde66662c816f3625a33dc3fe0321a23a692134 Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Wed, 18 Sep 2024 16:01:53 +0200 Subject: [PATCH 6/6] Make sure we always end up at 100% --- .../resources/python/00-databricks-init.py | 59 ++++--------------- 1 file changed, 13 insertions(+), 46 deletions(-) diff --git a/packages/databricks-vscode/resources/python/00-databricks-init.py b/packages/databricks-vscode/resources/python/00-databricks-init.py index 0883fc598..4cf9bb42f 100644 --- a/packages/databricks-vscode/resources/python/00-databricks-init.py +++ b/packages/databricks-vscode/resources/python/00-databricks-init.py @@ -3,11 +3,14 @@ import json from typing import Any, Union, List import os +import sys import time import shlex import warnings import tempfile +# prevent sum from pyskaprk.sql.functions from shadowing the builtin sum +builtinSum = sys.modules['builtins'].sum def logError(function_name: str, e: Union[str, Exception]): import sys @@ -403,55 +406,20 @@ def init_ui(self): def update_ticks( self, stages, - inflight_tasks: int + inflight_tasks: int, + done: bool ) -> None: - total_tasks = sum(map(lambda x: x.num_tasks, stages)) - completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages)) + total_tasks = builtinSum(map(lambda x: x.num_tasks, stages)) + completed_tasks = builtinSum(map(lambda x: x.num_completed_tasks, stages)) if total_tasks > 0: self._ticks = total_tasks self._tick = completed_tasks - self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages)) - if self._tick is not None and self._tick >= 0: - self.output() - self._running = inflight_tasks + self._bytes_read = builtinSum(map(lambda x: x.num_bytes_read, stages)) - def output(self) -> None: - if self._tick is not None and self._ticks is not None: - percent_complete = (self._tick / self._ticks) * 100 - elapsed = int(time.time() - self._started) - scanned = self._bytes_to_string(self._bytes_read) - running = self._running - self.w_progress.value = percent_complete - self.w_status.value = f"{percent_complete:.2f}% Complete ({running} Tasks running, {elapsed}s, Scanned {scanned})" + if done: + self._tick = self._ticks + self._running = 0 - @staticmethod - def _bytes_to_string(size: int) -> str: - """Helper method to convert a numeric bytes value into a human-readable representation""" - i = 0 - while i < len(Progress.SI_BYTE_SIZES) - 1 and size < 2 * Progress.SI_BYTE_SIZES[i]: - i += 1 - result = float(size) / Progress.SI_BYTE_SIZES[i] - return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}" - - - class ProgressHandler: - def __init__(self): - self.op_id = "" - - def reset(self): - self.p = Progress() - - def update_ticks( - self, - stages, - inflight_tasks: int - ) -> None: - total_tasks = sum(map(lambda x: x.num_tasks, stages)) - completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages)) - if total_tasks > 0: - self._ticks = total_tasks - self._tick = completed_tasks - self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages)) if self._tick is not None and self._tick >= 0: self.output() self._running = inflight_tasks @@ -473,14 +441,13 @@ def _bytes_to_string(size: int) -> str: i += 1 result = float(size) / Progress.SI_BYTE_SIZES[i] return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}" - class ProgressHandler: def __init__(self): self.op_id = "" def reset(self): - self.p = Progress(cfg) + self.p = Progress() def __call__(self, stages, @@ -495,7 +462,7 @@ def __call__(self, self.op_id = operation_id self.reset() - self.p.update_ticks(stages, inflight_tasks) + self.p.update_ticks(stages, inflight_tasks, done) spark.clearProgressHandlers() spark.registerProgressHandler(ProgressHandler())