diff --git a/tasktiger/stats.py b/tasktiger/stats.py index 0f455f26..4a94fe79 100644 --- a/tasktiger/stats.py +++ b/tasktiger/stats.py @@ -1,58 +1,72 @@ import threading import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Callable, Optional -from ._internal import g_fork_lock +from ._internal import g_fork_lock, import_attribute if TYPE_CHECKING: - from .worker import Worker + from .tasktiger import TaskTiger -class StatsThread(threading.Thread): - def __init__(self, tiger: "Worker") -> None: - super(StatsThread, self).__init__() +class Stats: + def __init__( + self, + tiger: "TaskTiger", + callback: Optional[Callable[[float], None]] = None, + ) -> None: + super().__init__() + self.tiger = tiger - self._stop_event = threading.Event() - self._task_running = False + self._logging_thread: Optional[StatsLoggingThread] = None + self._time_start = time.monotonic() - self._time_busy: float = 0.0 + self._time_busy = 0.0 self._task_start_time: Optional[float] = None - self.daemon = True # Exit process if main thread exits unexpectedly # Lock that protects stats computations from interleaving. For example, # we don't want report_task_start() to run at the same time as - # compute_stats(), as it might result in an inconsistent state. - self._computation_lock = threading.Lock() + # log(), as it might result in an inconsistent state. + self._lock = threading.Lock() + + # Callback to receive the duration of each completed task. + self._callback = ( + import_attribute(callback) + if (callback := self.tiger.config["STATS_CALLBACK"]) + else None + ) def report_task_start(self) -> None: now = time.monotonic() - with self._computation_lock: + with self._lock: self._task_start_time = now - self._task_running = True def report_task_end(self) -> None: + assert self._task_start_time now = time.monotonic() - with self._computation_lock: - assert self._task_start_time is not None - self._time_busy += now - self._task_start_time - self._task_running = False + + if self._callback: + self._callback(now - self._task_start_time) + + with self._lock: + self._record_time_busy(now) self._task_start_time = None - def compute_stats(self) -> None: + def log(self) -> None: now = time.monotonic() - with self._computation_lock: + with self._lock: time_total = now - self._time_start time_busy = self._time_busy + + if self._task_start_time is not None: + # Add busy time for the currently running task + self._record_time_busy(now) + + time_busy = self._time_busy + self._time_start = now self._time_busy = 0 - if self._task_running: - assert self._task_start_time is not None - time_busy += now - self._task_start_time - self._task_start_time = now - else: - self._task_start_time = None if time_total: utilization = 100.0 / time_total * time_busy @@ -64,9 +78,34 @@ def compute_stats(self) -> None: utilization=utilization, ) + def start_logging_thread(self) -> None: + if not self._logging_thread: + self._logging_thread = StatsLoggingThread(self) + self._logging_thread.start() + + def stop_logging_thread(self) -> None: + if self._logging_thread: + self._logging_thread.stop() + self._logging_thread = None + + def _record_time_busy(self, now: float) -> None: + assert self._task_start_time + self._time_busy += now - max(self._task_start_time, self._time_start) + + +class StatsLoggingThread(threading.Thread): + def __init__(self, stats: Stats) -> None: + super().__init__() + + self.tiger = stats.tiger + self._stats: Stats = stats + self._stop_event = threading.Event() + + self.daemon = True # Exit process if main thread exits unexpectedly + def run(self) -> None: while not self._stop_event.wait(self.tiger.config["STATS_INTERVAL"]): - self.compute_stats() + self._stats.log() def stop(self) -> None: self._stop_event.set() diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 249d91b5..11f9ab7e 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -198,6 +198,12 @@ def init( "BATCH_QUEUES": {}, # How often to print stats. "STATS_INTERVAL": 60, + # The function to call with the duration of each completed task. + # This can be useful to measure the worker's utilisation %. + # For example, the worker's utilisation over the last 30 minutes + # can be obtained by dividing the sum of task durations reported + # over the last 30 minutes by 30 minutes. + "STATS_CALLBACK": None, # Single worker queues can reduce redis activity in some use cases # by locking at the queue level instead of just at the task or task # group level. These queues will only allow a single worker to diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 63fa8fd7..2ac92a50 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -55,7 +55,7 @@ ) from .redis_semaphore import Semaphore from .runner import get_runner_class -from .stats import StatsThread +from .stats import Stats from .task import Task from .timeouts import JobTimeoutException from .utils import redis_glob_escape @@ -107,7 +107,7 @@ def __init__( self._key = tiger._key self._did_work = True self._last_task_check = 0.0 - self.stats_thread: Optional[StatsThread] = None + self.stats = Stats(self.tiger) self.id = str(uuid.uuid4()) if queues: @@ -1008,13 +1008,11 @@ def _execute_task_group( if not ready_tasks: return True, [] - if self.stats_thread: - self.stats_thread.report_task_start() + self.stats.report_task_start() success = self._execute( queue, ready_tasks, log, locks, queue_lock, all_task_ids ) - if self.stats_thread: - self.stats_thread.report_task_end() + self.stats.report_task_end() for lock in locks: try: @@ -1280,9 +1278,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None: self.log.warn("using old Redis version") if self.config["STATS_INTERVAL"]: - stats_thread = StatsThread(self) - self.stats_thread = stats_thread - stats_thread.start() + self.stats.start_logging_thread() # Queue any periodic tasks that are not queued yet. self._queue_periodic_tasks() @@ -1328,9 +1324,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None: raise finally: - if self.stats_thread: - self.stats_thread.stop() - self.stats_thread = None + self.stats.stop_logging_thread() # Free up Redis connection if self._pubsub: diff --git a/tests/test_stats.py b/tests/test_stats.py index 41cbcc2c..a01b5741 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -1,9 +1,10 @@ +import threading import time from unittest import mock import pytest -from tasktiger.stats import StatsThread +from tasktiger.stats import Stats from tests.utils import get_tiger @@ -11,20 +12,209 @@ @pytest.fixture def tiger(): t = get_tiger() - t.config["STATS_INTERVAL"] = 0.07 + t.config["STATS_INTERVAL"] = 0.001 return t -def test_start_and_stop(tiger): - stats = StatsThread(tiger) - stats.compute_stats = mock.Mock() - stats.start() +@pytest.fixture +def time_monotonic_mock(): + with mock.patch("time.monotonic") as m: + yield m + + +def test_stats_logging_thread_start_and_stop(tiger): + semaphore = threading.Semaphore(0) + + stats = Stats(tiger) + stats.log = mock.Mock(side_effect=semaphore.acquire) + stats.start_logging_thread() + + thread = stats._logging_thread + + time.sleep(0.05) + assert len(stats.log.mock_calls) == 1 + assert thread.is_alive() + + for __ in range(17): + semaphore.release() + + while semaphore._value: + pass + + time.sleep(0.02) + assert len(stats.log.mock_calls) == 18 + assert thread.is_alive() + + for __ in range(24): + semaphore.release() + + while semaphore._value: + pass + + time.sleep(0.02) + assert len(stats.log.mock_calls) == 42 + assert thread.is_alive() + + stats.stop_logging_thread() + + for __ in range(20): + semaphore.release() + + thread.join() + assert len(stats.log.mock_calls) == 42 + assert not thread.is_alive() + + +def test_stats_without_callback(tiger, time_monotonic_mock): + time_monotonic_mock.return_value = 376263.4195456 + + stats = Stats(tiger) + assert stats._callback is None + + stats.report_task_start() + + time_monotonic_mock.return_value += 48.34 + stats.report_task_end() + + +def test_stats_with_callback(tiger, time_monotonic_mock): + time_monotonic_mock.return_value = 376263.4195456 + + stats = Stats(tiger) + stats._callback = mock.Mock() + + stats.report_task_start() + assert stats._callback.mock_calls == [] + + time_monotonic_mock.return_value = 376375.4737221 + stats.report_task_end() + + assert stats._callback.mock_calls == [mock.call(112.05417650000891)] + + time_monotonic_mock.return_value += 34.22 + stats.report_task_start() + + time_monotonic_mock.return_value += 185.03 + stats.report_task_end() + + assert stats._callback.mock_calls == [ + mock.call(112.05417650000891), + mock.call(185.03000000002794), + ] + + time_monotonic_mock.return_value += 73.26 + stats.report_task_start() + + # Test that calling stats.log during the task's execution is not + # breaking the calculation of the runtime for the callback + time_monotonic_mock.return_value += 217.9 + stats.log() + + time_monotonic_mock.return_value += 130.5 + stats.report_task_end() + + assert stats._callback.mock_calls == [ + mock.call(112.05417650000891), + mock.call(185.03000000002794), + mock.call(348.4000000000233), + ] + + +def test_stats_logging_in_between_task_runs(tiger, time_monotonic_mock): + time_monotonic_mock.return_value = 376203.2371821 + + stats = Stats(tiger) + tiger.log = mock.Mock() + + time_monotonic_mock.return_value += 65 + stats.report_task_start() + + time_monotonic_mock.return_value += 120.7 + stats.report_task_end() + + stats.log() + assert tiger.log.mock_calls == [ + mock.call.info( + "stats", + time_total=185.70000000001164, + time_busy=120.70000000001164, + utilization=64.99730748519337, + ) + ] + + time_monotonic_mock.return_value += 0.7 + stats.report_task_start() + + time_monotonic_mock.return_value += 50.6 + stats.report_task_end() + + time_monotonic_mock.return_value += 10 + stats.report_task_start() + + time_monotonic_mock.return_value += 40 + stats.report_task_end() + + stats.log() + assert tiger.log.mock_calls == [ + mock.call.info( + "stats", + time_total=185.70000000001164, + time_busy=120.70000000001164, + utilization=64.99730748519337, + ), + mock.call.info( + "stats", + time_total=101.29999999998836, + time_busy=90.59999999997672, + utilization=89.43731490620644, + ), + ] + + +def test_stats_logging_during_task_run(tiger, time_monotonic_mock): + time_monotonic_mock.return_value = 376203.2371821 + + stats = Stats(tiger) + tiger.log = mock.Mock() + + time_monotonic_mock.return_value += 65 + stats.report_task_start() + + time_monotonic_mock.return_value += 120.7 + + stats.log() + assert tiger.log.mock_calls == [ + mock.call.info( + "stats", + time_total=185.70000000001164, + # Busy time for unfinished task is measured + time_busy=120.70000000001164, + utilization=64.99730748519337, + ) + ] + + time_monotonic_mock.return_value += 350.5 + stats.report_task_end() - time.sleep(0.22) - stats.stop() + time_monotonic_mock.return_value += 33.1 + stats.report_task_start() - assert len(stats.compute_stats.mock_calls) == 3 + time_monotonic_mock.return_value += 17 - # Stats are no longer being collected - time.sleep(0.22) - assert len(stats.compute_stats.mock_calls) == 3 + stats.log() + assert tiger.log.mock_calls == [ + mock.call.info( + "stats", + time_total=185.70000000001164, + time_busy=120.70000000001164, + utilization=64.99730748519337, + ), + mock.call.info( + "stats", + time_total=400.5999999999767, + # Sum of the remainder of the previous task's runtime + # and the current unfinished task's runtime + time_busy=367.5, + utilization=91.73739390914163, + ), + ]