| Viewing file:  history.py (9.79 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import enum
 import itertools
 from enum import StrEnum
 from typing import Self, Iterator, Iterable, NamedTuple, Any, TypeAlias
 
 from lvestats.orm import BurstingEventType
 
 
 _InTimestamp: TypeAlias = int | float
 
 
 @enum.unique
 class IntervalType(StrEnum):
 OVERUSING = enum.auto()
 NORMAL = enum.auto()
 
 
 class Interval(NamedTuple):
 start: float
 end: float
 
 @property
 def duration(self) -> float:
 return self.end - self.start
 
 def __str__(self) -> str:
 return _format_pair(round(self.start), round(self.end))
 
 
 class _Intervals(tuple[Interval, ...]):
 def __str__(self) -> str:
 return _format_joined_intervals(self)
 
 
 class LveHistory:
 # NOTE(vlebedev): Empty history is special case: `.first_interval_type` == `.ongoing_interval_type`
 __slots__ = ('_first_interval_type', '_timestamps', '_finished_overusing_duration')
 
 def __init__(
 self,
 first_interval_type: IntervalType | None = None,
 timestamps: tuple[float, ...] = tuple(),
 closed_overusing_duration: float | None = None,
 ) -> None:
 if not (
 (first_interval_type, timestamps) == (None, tuple()) or
 first_interval_type is not None and len(timestamps) > 0
 ):
 raise ValueError('Either both `first_interval_type` and `timestamps` must be set or both unset!')
 
 self._first_interval_type = first_interval_type
 self._timestamps = timestamps
 
 if closed_overusing_duration is None:
 closed_overusing_duration = sum(i.duration for i in _overusing_intervals_iter(
 first_interval_type, timestamps,
 )) if first_interval_type is not None else 0.0
 self._finished_overusing_duration = closed_overusing_duration
 
 @property
 def first_interval_type(self) -> IntervalType | None:
 return self._first_interval_type
 
 @property
 def timestamps(self) -> tuple[float, ...]:
 return self._timestamps
 
 @property
 def empty(self) -> bool:
 if len(self._timestamps) == 0:
 assert self._first_interval_type is None
 return True
 return False
 
 @property
 def contains_overusing(self) -> bool:
 ts_len = len(self._timestamps)
 if ts_len == 0:
 return False
 if self._first_interval_type == IntervalType.OVERUSING:
 return True
 if ts_len > 1:
 return True
 return False
 
 @property
 def ongoing_interval_type(self) -> IntervalType | None:
 ts_len = len(self._timestamps)
 if ts_len == 0:
 return None
 assert self._first_interval_type is not None
 return get_interval_type_after(self._first_interval_type, ts_len)
 
 def trim(self, cutoff: _InTimestamp) -> Self:
 cutoff = float(cutoff)
 if len(self._timestamps) == 0 or cutoff <= self._timestamps[0]:
 return self
 cls = type(self)
 if cutoff >= self._timestamps[-1]:
 return cls(self.ongoing_interval_type, (cutoff,))
 assert self._first_interval_type is not None
 trimmed_duration, cutoff_pos = _get_trimmed_overusing_duration_and_position(
 self._first_interval_type,
 self._timestamps,
 cutoff,
 )
 new_first_interval_type = self._first_interval_type
 new_timestamps = self._timestamps[cutoff_pos:]
 if self._timestamps[cutoff_pos] == cutoff:
 switch_new_first_interval = cutoff_pos % 2 != 0
 else:
 switch_new_first_interval = cutoff_pos % 2 == 0
 new_timestamps = (cutoff, *new_timestamps)
 if switch_new_first_interval:
 new_first_interval_type = get_other_interval_type(new_first_interval_type)
 new_overusing_duration = self._finished_overusing_duration - trimmed_duration
 return cls(new_first_interval_type, new_timestamps, new_overusing_duration)
 
 def append(self, timestamp: _InTimestamp, event_type: BurstingEventType) -> Self:
 timestamp = float(timestamp)
 cls = type(self)
 timestamps = self._timestamps
 
 new_interval_type = {
 BurstingEventType.STARTED: IntervalType.OVERUSING,
 BurstingEventType.STOPPED: IntervalType.NORMAL,
 }[event_type]
 
 try:
 if timestamp <= timestamps[-1]:
 raise ValueError('Timestamp must be greater than the latest known one!')
 except IndexError:
 return cls(new_interval_type, (timestamp,))
 
 if len(timestamps) == 0:
 return cls(new_interval_type, (timestamp,))
 
 if self.ongoing_interval_type == new_interval_type:
 # NOTE(vlebedev): Ongoing interval has not changed so history is not altered.
 return self
 
 new_overusing_duration = self._finished_overusing_duration
 if len(timestamps) > 0 and new_interval_type == IntervalType.NORMAL:
 # NOTE(vlebedev): Previous interval was of overusing type so add it to finished overusing duration.
 new_overusing_duration += timestamp - timestamps[-1]
 return cls(self._first_interval_type, (*timestamps, timestamp), new_overusing_duration)
 
 def get_overusing_duration(self, now: int | float) -> float:
 duration = self._finished_overusing_duration
 if len(self._timestamps) > 0 and self.ongoing_interval_type == IntervalType.OVERUSING:
 # NOTE(vlebedev): Overusing interval is still ongoing so add its current duration to the total one.
 duration += float(now) - self._timestamps[-1]
 return duration
 
 def get_overusing_intervals(self, now: int | float) -> _Intervals:
 now = float(now)
 return _Intervals(self.get_intervals_iter(now, IntervalType.OVERUSING))
 
 def get_intervals(self, now: int | float, intervals_type: IntervalType) -> _Intervals:
 return _Intervals(self.get_intervals_iter(now, intervals_type))
 
 def get_intervals_iter(
 self,
 now: int | float,
 intervals_type: IntervalType = IntervalType.OVERUSING,
 ) -> Iterator[Interval]:
 now = float(now)
 if len(self._timestamps) > 0 and now < self._timestamps[0]:
 raise ValueError('Final timestamp must be greater than the latest known one!')
 for start, end in self._pairs_iter(intervals_type, now):
 yield Interval(start, end)
 
 def _pairs_iter(self, intervals_type: IntervalType, final_item=None) -> Iterator[tuple[Any, Any]]:
 if len(self._timestamps) == 0:
 return
 assert self._first_interval_type is not None
 for start, type_, end in _typed_pairs_iter(
 self._first_interval_type,
 self._timestamps,
 final_item,
 ):
 if type_ != intervals_type:
 continue
 yield start, end
 
 def __eq__(self, other: object) -> bool:
 cls = type(self)
 
 if not isinstance(other, cls):
 return False
 
 if (self._first_interval_type, self._timestamps) != (other._first_interval_type, other._timestamps):
 return False
 
 assert self._finished_overusing_duration == other._finished_overusing_duration
 
 return True
 
 def __repr__(self) -> str:
 if self.empty:
 return f'{type(self).__name__}()'
 return f'{type(self).__name__}({self._first_interval_type.name}, {self._timestamps!r})'
 
 def __str__(self) -> str:
 return _format_joined_intervals(
 _format_pair(*p)
 for p in self._pairs_iter(IntervalType.OVERUSING, "-")
 )
 
 
 def get_interval_type_after(starts_with: IntervalType, ts_num: int) -> IntervalType:
 if ts_num in {0, 1} or ts_num % 2 != 0:
 return starts_with
 return get_other_interval_type(starts_with)
 
 
 def get_other_interval_type(current_event_type: IntervalType) -> IntervalType:
 return IntervalType.OVERUSING if current_event_type == IntervalType.NORMAL else IntervalType.NORMAL
 
 
 class _TrimmedDurationAndPosition(NamedTuple):
 duration: float
 position: int
 
 
 def _get_trimmed_overusing_duration_and_position(
 first_interval_type: IntervalType,
 timestamps: Iterable[float],
 cutoff: float,
 ) -> _TrimmedDurationAndPosition:
 position = 0
 
 def cutted_timestamps_iter():
 nonlocal position
 
 for position, ts in enumerate(timestamps):
 ts = min(ts, cutoff)
 yield ts
 if ts >= cutoff:
 break
 
 duration = sum(i.duration for i in _overusing_intervals_iter(first_interval_type, cutted_timestamps_iter()))
 
 return _TrimmedDurationAndPosition(duration, position)
 
 
 def _overusing_intervals_iter(
 first_interval_type: IntervalType,
 timestamps: Iterable[float],
 ) -> Iterable[Interval]:
 for start, interval_type, end in _typed_pairs_iter(first_interval_type, timestamps):
 if interval_type != IntervalType.OVERUSING:
 continue
 yield Interval(start, end)
 
 
 def _typed_pairs_iter(
 first_interval_type: IntervalType,
 timestamps: Iterable[Any],
 final_item: Any | None = None,
 ) -> Iterable[tuple[Any, IntervalType, Any]]:
 if final_item is not None:
 timestamps = itertools.chain(timestamps, (final_item,))
 intervals_iter = itertools.pairwise(timestamps)
 intervals_types = itertools.cycle([
 first_interval_type,
 get_other_interval_type(first_interval_type),
 ])
 for (start, end), type_ in zip(intervals_iter, intervals_types):
 yield start, type_, end
 
 
 def _format_pair(formatted_start, formatted_stop) -> str:
 return f'({formatted_start}-{formatted_stop})'
 
 
 def _format_joined_intervals(formatted_intervals: Iterable) -> str:
 return 'U'.join(str(i) for i in formatted_intervals) or "()"
 
 |