Skip to content

Commit

Permalink
Make sure we always end up at 100%
Browse files Browse the repository at this point in the history
  • Loading branch information
fjakobs committed Sep 18, 2024
1 parent 37d1458 commit afde666
Showing 1 changed file with 13 additions and 46 deletions.
59 changes: 13 additions & 46 deletions packages/databricks-vscode/resources/python/00-databricks-init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import json
from typing import Any, Union, List
import os
import sys
import time
import shlex
import warnings
import tempfile

# prevent sum from pyskaprk.sql.functions from shadowing the builtin sum
builtinSum = sys.modules['builtins'].sum

def logError(function_name: str, e: Union[str, Exception]):
import sys
Expand Down Expand Up @@ -403,55 +406,20 @@ def init_ui(self):
def update_ticks(
self,
stages,
inflight_tasks: int
inflight_tasks: int,
done: bool
) -> None:
total_tasks = sum(map(lambda x: x.num_tasks, stages))
completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages))
total_tasks = builtinSum(map(lambda x: x.num_tasks, stages))
completed_tasks = builtinSum(map(lambda x: x.num_completed_tasks, stages))
if total_tasks > 0:
self._ticks = total_tasks
self._tick = completed_tasks
self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages))
if self._tick is not None and self._tick >= 0:
self.output()
self._running = inflight_tasks
self._bytes_read = builtinSum(map(lambda x: x.num_bytes_read, stages))

def output(self) -> None:
if self._tick is not None and self._ticks is not None:
percent_complete = (self._tick / self._ticks) * 100
elapsed = int(time.time() - self._started)
scanned = self._bytes_to_string(self._bytes_read)
running = self._running
self.w_progress.value = percent_complete
self.w_status.value = f"{percent_complete:.2f}% Complete ({running} Tasks running, {elapsed}s, Scanned {scanned})"
if done:
self._tick = self._ticks
self._running = 0

@staticmethod
def _bytes_to_string(size: int) -> str:
"""Helper method to convert a numeric bytes value into a human-readable representation"""
i = 0
while i < len(Progress.SI_BYTE_SIZES) - 1 and size < 2 * Progress.SI_BYTE_SIZES[i]:
i += 1
result = float(size) / Progress.SI_BYTE_SIZES[i]
return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}"


class ProgressHandler:
def __init__(self):
self.op_id = ""

def reset(self):
self.p = Progress()

def update_ticks(
self,
stages,
inflight_tasks: int
) -> None:
total_tasks = sum(map(lambda x: x.num_tasks, stages))
completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages))
if total_tasks > 0:
self._ticks = total_tasks
self._tick = completed_tasks
self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages))
if self._tick is not None and self._tick >= 0:
self.output()
self._running = inflight_tasks
Expand All @@ -473,14 +441,13 @@ def _bytes_to_string(size: int) -> str:
i += 1
result = float(size) / Progress.SI_BYTE_SIZES[i]
return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}"


class ProgressHandler:
def __init__(self):
self.op_id = ""

def reset(self):
self.p = Progress(cfg)
self.p = Progress()

def __call__(self,
stages,
Expand All @@ -495,7 +462,7 @@ def __call__(self,
self.op_id = operation_id
self.reset()

self.p.update_ticks(stages, inflight_tasks)
self.p.update_ticks(stages, inflight_tasks, done)

spark.clearProgressHandlers()
spark.registerProgressHandler(ProgressHandler())
Expand Down

0 comments on commit afde666

Please sign in to comment.