| Viewing file:  overload.py (3.23 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 from collections import deque
 from typing import NamedTuple, Self, Protocol
 
 from ._logs import logger
 
 
 class StatsTimes(NamedTuple):
 @classmethod
 def from_proc_stat(cls, line: str) -> Self:
 # NOTE(vlebedev): First element of the line is a name of a CPU core, not time of any kind.
 return cls(*(int(v) for v in line.strip().split()[1:]))
 
 user: int
 nice: int
 system: int
 idle: int
 iowait: int
 irq: int
 softirq: int
 steal: int
 guest: int
 guest_nice: int
 
 
 class GetStats(Protocol):
 def __call__(self) -> StatsTimes:
 ...
 
 
 class OverloadCheckResult(NamedTuple):
 is_overloaded: bool
 server_load: float
 
 def __bool__(self) -> bool:
 return self.is_overloaded
 
 
 class OverloadChecker:
 def __init__(
 self,
 idle_time_threshold: float,
 get_stats: GetStats,
 max_samples_number: int,
 ) -> None:
 self._prev_stats = None
 self._idle_time_threshold = idle_time_threshold
 self._weights = _get_weights(num=max_samples_number)
 self._idle_times_history = deque(maxlen=max_samples_number)
 self._get_stats = get_stats
 self._max_samples_number = max_samples_number
 
 def __call__(self) -> OverloadCheckResult:
 stats = self._get_stats()
 
 # NOTE(vlebedev): 'delta' represents total time spent by CPU
 #                 on all kinds of activities since the previous measurement.
 delta = stats if self._prev_stats is None else StatsTimes(*(
 new - old
 for (new, old)
 in zip(stats, self._prev_stats)
 ))
 self._prev_stats = stats
 
 # NOTE(vlebedev): Fraction of total delta time spent by CPU on being 'idle'.
 try:
 idle_time = delta.idle / sum(delta)
 except ZeroDivisionError:
 idle_time = 0.0
 
 self._idle_times_history.append(idle_time)
 
 samples_num = len(self._idle_times_history)
 relevant_weights = self._weights[:samples_num]
 weighted_idle_times = [
 weight * idle_time
 for weight, idle_time
 in zip(relevant_weights, self._idle_times_history)
 ]
 weighted_idle_time = sum(weighted_idle_times)
 if samples_num < self._max_samples_number:
 weighted_idle_time /= sum(relevant_weights)
 
 logger.debug('Current idle time: %02.2f, threshold %s', weighted_idle_time, self._idle_time_threshold)
 
 return OverloadCheckResult(
 is_overloaded=weighted_idle_time < self._idle_time_threshold,
 server_load=1 - weighted_idle_time,
 )
 
 
 def _get_weights(num: int = 10, alpha: float = 0.9):
 # NOTE(vlebedev): Generate an exponentially decaying weight list
 weights = [alpha ** i for i in range(num)]
 # NOTE(vlebedev): Normalize the weights so they sum to 1
 total = sum(weights)
 return [weight / total for weight in weights]
 
 
 def read_times_from_proc() -> StatsTimes:
 with open('/proc/stat', encoding='ascii') as fd:
 return StatsTimes.from_proc_stat(fd.readline())
 
 |