Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose stats #279

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 66 additions & 27 deletions tasktiger/stats.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,72 @@
import threading
import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Callable, Optional

from ._internal import g_fork_lock
from ._internal import g_fork_lock, import_attribute

if TYPE_CHECKING:
from .worker import Worker
from .tasktiger import TaskTiger


class StatsThread(threading.Thread):
def __init__(self, tiger: "Worker") -> None:
super(StatsThread, self).__init__()
class Stats:
def __init__(
self,
tiger: "TaskTiger",
callback: Optional[Callable[[float], None]] = None,
) -> None:
super().__init__()

self.tiger = tiger
self._stop_event = threading.Event()

self._task_running = False
self._logging_thread: Optional[StatsLoggingThread] = None

self._time_start = time.monotonic()
self._time_busy: float = 0.0
self._time_busy = 0.0
self._task_start_time: Optional[float] = None
self.daemon = True # Exit process if main thread exits unexpectedly

# Lock that protects stats computations from interleaving. For example,
# we don't want report_task_start() to run at the same time as
# compute_stats(), as it might result in an inconsistent state.
self._computation_lock = threading.Lock()
# log(), as it might result in an inconsistent state.
self._lock = threading.Lock()

# Callback to receive the duration of each completed task.
self._callback = (
import_attribute(callback)
if (callback := self.tiger.config["STATS_CALLBACK"])
else None
)

def report_task_start(self) -> None:
now = time.monotonic()
with self._computation_lock:
with self._lock:
self._task_start_time = now
self._task_running = True

def report_task_end(self) -> None:
assert self._task_start_time
now = time.monotonic()
with self._computation_lock:
assert self._task_start_time is not None
self._time_busy += now - self._task_start_time
self._task_running = False

if self._callback:
self._callback(now - self._task_start_time)

with self._lock:
self._record_time_busy(now)
self._task_start_time = None

def compute_stats(self) -> None:
def log(self) -> None:
now = time.monotonic()

with self._computation_lock:
with self._lock:
time_total = now - self._time_start
time_busy = self._time_busy

if self._task_start_time is not None:
# Add busy time for the currently running task
self._record_time_busy(now)

time_busy = self._time_busy

self._time_start = now
self._time_busy = 0
if self._task_running:
assert self._task_start_time is not None
time_busy += now - self._task_start_time
self._task_start_time = now
else:
self._task_start_time = None

if time_total:
utilization = 100.0 / time_total * time_busy
Expand All @@ -64,9 +78,34 @@ def compute_stats(self) -> None:
utilization=utilization,
)

def start_logging_thread(self) -> None:
if not self._logging_thread:
self._logging_thread = StatsLoggingThread(self)
self._logging_thread.start()

def stop_logging_thread(self) -> None:
if self._logging_thread:
self._logging_thread.stop()
self._logging_thread = None

def _record_time_busy(self, now: float) -> None:
assert self._task_start_time
self._time_busy += now - max(self._task_start_time, self._time_start)


class StatsLoggingThread(threading.Thread):
def __init__(self, stats: Stats) -> None:
super().__init__()

self.tiger = stats.tiger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this ideally just say self._stats_interval = stats.tiger.config["STATS_INTERVAL"] rather than looking it up every time?

self._stats: Stats = stats
self._stop_event = threading.Event()

self.daemon = True # Exit process if main thread exits unexpectedly

def run(self) -> None:
while not self._stop_event.wait(self.tiger.config["STATS_INTERVAL"]):
self.compute_stats()
self._stats.log()

def stop(self) -> None:
self._stop_event.set()
6 changes: 6 additions & 0 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ def init(
"BATCH_QUEUES": {},
# How often to print stats.
"STATS_INTERVAL": 60,
# The function to call with the duration of each completed task.
# This can be useful to measure the worker's utilisation %.
# For example, the worker's utilisation over the last 30 minutes
# can be obtained by dividing the sum of task durations reported
# over the last 30 minutes by 30 minutes.
"STATS_CALLBACK": None,
Copy link
Member

@thomasst thomasst Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the point of the stats thread is to print out metrics periodically, I would have also expected this to be called periodically rather than at the end of the task, and report the same metrics that we log (time total, time busy, utilization). Otherwise this could be solved via something like CHILD_CONTEXT_MANAGERS, although that runs in the child process. Should this be called TASK_END_STATS_CALLBACK then?

the worker's utilisation over the last 30 minutes can be obtained by dividing the sum of task durations reported over the last 30 minutes by 30 minutes.

This is not actually correct. Example: If a task runs from 0m to 5m, and another task runs from 29m59s-34m59s, and the second task's stats callback looks at any reports in the past 30 minutes (i.e. 5m - 34m59s) , it would show 10m in task durations over 30m (~33% utilization), rather than the actual 5m1s (~17% utilization).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely have both start and end callbacks to measure and report utilization accurately while the task is still running. With our configuration, we often collect metrics on sub-minute intervals, and a single task can run for more than a few minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the point of the stats thread is to print out metrics periodically, I would have also expected this to be called periodically rather than at the end of the task, and report the same metrics that we log (time total, time busy, utilization).

Yep, that sounds better than what I did.

This is not actually correct. Example: If a task runs from 0m to 5m, and another task runs from 29m59s-34m59s, and the second task's stats callback looks at any reports in the past 30 minutes (i.e. 5m - 34m59s) , it would show 10m in task durations over 30m (~33% utilization), rather than the actual 5m1s (~17% utilization).

Good point.

We should definitely have both start and end callbacks to measure and report utilization accurately while the task is still running. With our configuration, we often collect metrics on sub-minute intervals, and a single task can run for more than a few minutes.

Couldn't we achieve it with simply moving the callback to the log method to be called at an interval?
I don't think we need to get notified of the start and stop of every task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get notified of the start and stop of every task.

Having that interface allows for reporting much more than what we do now. We could report utilization grouped per some other metric, for example task duration bucket, task name, or success/failure condition.

The interval of metric collection should be owned by the metric collection setup, not by tasktiger. Tasktiger does provide a simple log-only "monitoring" of periodic prints but in production we could use much more sophisticated stuff like opentelemetry. Reporting metrics on an interval is also not great if your metric collection is also pull-based periodic and the periods don't align well. For example, there was a task running between 1s and 16s. At 30s Tasktiger reports utilization of 50% (15s busy out of last 30s) and then does nothing for then next minute. At 59s (just before next log call at 60s) monitoring pulls the last reported value of 50%. The monitoring shows utilization of 50% at 59s while in reality it was 0%. The metric is lagging and can be very confusing when debugging performance issues. In contrast, with start/end notifications we can compute utilization metric right at the collection time - or better yet, report the up-to-date (not lagging) totals of busy/idle times and let the monitoring compute utilization.

# Single worker queues can reduce redis activity in some use cases
# by locking at the queue level instead of just at the task or task
# group level. These queues will only allow a single worker to
Expand Down
18 changes: 6 additions & 12 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
)
from .redis_semaphore import Semaphore
from .runner import get_runner_class
from .stats import StatsThread
from .stats import Stats
from .task import Task
from .timeouts import JobTimeoutException
from .utils import redis_glob_escape
Expand Down Expand Up @@ -107,7 +107,7 @@ def __init__(
self._key = tiger._key
self._did_work = True
self._last_task_check = 0.0
self.stats_thread: Optional[StatsThread] = None
self.stats = Stats(self.tiger)
self.id = str(uuid.uuid4())

if queues:
Expand Down Expand Up @@ -1008,13 +1008,11 @@ def _execute_task_group(
if not ready_tasks:
return True, []

if self.stats_thread:
self.stats_thread.report_task_start()
self.stats.report_task_start()
success = self._execute(
queue, ready_tasks, log, locks, queue_lock, all_task_ids
)
if self.stats_thread:
self.stats_thread.report_task_end()
self.stats.report_task_end()

for lock in locks:
try:
Expand Down Expand Up @@ -1280,9 +1278,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
self.log.warn("using old Redis version")

if self.config["STATS_INTERVAL"]:
stats_thread = StatsThread(self)
self.stats_thread = stats_thread
stats_thread.start()
self.stats.start_logging_thread()

# Queue any periodic tasks that are not queued yet.
self._queue_periodic_tasks()
Expand Down Expand Up @@ -1328,9 +1324,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
raise

finally:
if self.stats_thread:
self.stats_thread.stop()
self.stats_thread = None
self.stats.stop_logging_thread()

# Free up Redis connection
if self._pubsub:
Expand Down
Loading