From afde66662c816f3625a33dc3fe0321a23a692134 Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Wed, 18 Sep 2024 16:01:53 +0200 Subject: [PATCH] Make sure we always end up at 100% --- .../resources/python/00-databricks-init.py | 59 ++++--------------- 1 file changed, 13 insertions(+), 46 deletions(-) diff --git a/packages/databricks-vscode/resources/python/00-databricks-init.py b/packages/databricks-vscode/resources/python/00-databricks-init.py index 0883fc598..4cf9bb42f 100644 --- a/packages/databricks-vscode/resources/python/00-databricks-init.py +++ b/packages/databricks-vscode/resources/python/00-databricks-init.py @@ -3,11 +3,14 @@ import json from typing import Any, Union, List import os +import sys import time import shlex import warnings import tempfile +# prevent sum from pyskaprk.sql.functions from shadowing the builtin sum +builtinSum = sys.modules['builtins'].sum def logError(function_name: str, e: Union[str, Exception]): import sys @@ -403,55 +406,20 @@ def init_ui(self): def update_ticks( self, stages, - inflight_tasks: int + inflight_tasks: int, + done: bool ) -> None: - total_tasks = sum(map(lambda x: x.num_tasks, stages)) - completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages)) + total_tasks = builtinSum(map(lambda x: x.num_tasks, stages)) + completed_tasks = builtinSum(map(lambda x: x.num_completed_tasks, stages)) if total_tasks > 0: self._ticks = total_tasks self._tick = completed_tasks - self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages)) - if self._tick is not None and self._tick >= 0: - self.output() - self._running = inflight_tasks + self._bytes_read = builtinSum(map(lambda x: x.num_bytes_read, stages)) - def output(self) -> None: - if self._tick is not None and self._ticks is not None: - percent_complete = (self._tick / self._ticks) * 100 - elapsed = int(time.time() - self._started) - scanned = self._bytes_to_string(self._bytes_read) - running = self._running - self.w_progress.value = percent_complete - self.w_status.value = f"{percent_complete:.2f}% Complete ({running} Tasks running, {elapsed}s, Scanned {scanned})" + if done: + self._tick = self._ticks + self._running = 0 - @staticmethod - def _bytes_to_string(size: int) -> str: - """Helper method to convert a numeric bytes value into a human-readable representation""" - i = 0 - while i < len(Progress.SI_BYTE_SIZES) - 1 and size < 2 * Progress.SI_BYTE_SIZES[i]: - i += 1 - result = float(size) / Progress.SI_BYTE_SIZES[i] - return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}" - - - class ProgressHandler: - def __init__(self): - self.op_id = "" - - def reset(self): - self.p = Progress() - - def update_ticks( - self, - stages, - inflight_tasks: int - ) -> None: - total_tasks = sum(map(lambda x: x.num_tasks, stages)) - completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages)) - if total_tasks > 0: - self._ticks = total_tasks - self._tick = completed_tasks - self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages)) if self._tick is not None and self._tick >= 0: self.output() self._running = inflight_tasks @@ -473,14 +441,13 @@ def _bytes_to_string(size: int) -> str: i += 1 result = float(size) / Progress.SI_BYTE_SIZES[i] return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}" - class ProgressHandler: def __init__(self): self.op_id = "" def reset(self): - self.p = Progress(cfg) + self.p = Progress() def __call__(self, stages, @@ -495,7 +462,7 @@ def __call__(self, self.op_id = operation_id self.reset() - self.p.update_ticks(stages, inflight_tasks) + self.p.update_ticks(stages, inflight_tasks, done) spark.clearProgressHandlers() spark.registerProgressHandler(ProgressHandler())