| Viewing file:  lves_tracker.py (8.92 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import logging
 import typing
 import pprint
 from dataclasses import dataclass
 from functools import cached_property
 from datetime import timedelta
 from types import MappingProxyType
 from typing import Callable, Mapping
 from collections.abc import Set
 
 from ._logs import logger
 from .notify import Emitter, Signal
 from .common import (
 BurstingMultipliers,
 LveId,
 Timestamp,
 LveLimits,
 ApplyLveSettings,
 SerializedLveId,
 get_deserialized_lve_id,
 LveStats,
 LveUsage,
 LveState,
 empty_usage,
 )
 from .history import IntervalType, LveHistory
 from .lve_sm import LveStateManager, LveStateSummary
 
 
 @dataclass(frozen=True)
 class LveStateManagerFactory:
 _lve_to_history: dict[LveId, LveHistory]
 _apply_lve_settings: ApplyLveSettings
 _quota: timedelta
 _quota_window: timedelta
 _bursting_multipliers: BurstingMultipliers
 _fail_fast: bool = True
 
 def __post_init__(self):
 lves_with_broken_history = {
 str(lve_id)
 for lve_id, history
 in self._lve_to_history.items()
 if history.ongoing_interval_type == IntervalType.OVERUSING
 }
 if lves_with_broken_history:
 raise ValueError(
 'LVEs ' + ', '.join(lves_with_broken_history) + ' are marked as "overusing" '
 ' in initial history loaded from persistent storage!'
 )
 
 def __call__(
 self,
 lve_id: LveId,
 now: Timestamp,
 normal_limits: LveLimits,
 stats: LveStats,
 usage: LveUsage,
 ) -> LveStateManager:
 try:
 history = self._lve_to_history.pop(lve_id)
 except KeyError:
 history = LveHistory()
 else:
 cutoff = typing.cast(Timestamp, now - self._quota_window.total_seconds())
 history = history.trim(cutoff)
 
 return LveStateManager(
 now=now,
 lve_id=lve_id,
 initial_history=history,
 bursting_multipliers=self._bursting_multipliers,
 initial_normal_limits=normal_limits,
 initial_stats=stats,
 initial_usage=usage,
 quota=self._quota,
 quota_window=self._quota_window,
 apply_lve_settings=self._apply_lve_settings,
 fail_fast=self._fail_fast,
 )
 
 
 class LvesTracker:
 def __init__(
 self,
 create_lve_manager: LveStateManagerFactory,
 fail_fast: bool = True,
 deserialize_lve_id: Callable[[SerializedLveId], LveId] = get_deserialized_lve_id,
 ) -> None:
 self._create_lve_manager = create_lve_manager
 self._fail_fast = fail_fast
 self._deserialize_lve_id = deserialize_lve_id
 
 self._serialized_id_to_manager = dict[SerializedLveId, LveStateManager]()
 
 # FIXME(vlebedev): Remove these state-sets and replace them with signals and external handlers,
 #                  which are interested in state switches.
 self._state_sets = (
 self._bursted,
 self._unbursted,
 self._overusing,
 self._exceeded,
 ) = (
 set[LveStateManager](),
 set[LveStateManager](),
 set[LveStateManager](),
 set[LveStateManager](),
 )
 
 self._on_manager_added = Emitter()
 
 @cached_property
 def serialized_id_to_manager(self) -> Mapping[SerializedLveId, LveStateManager]:
 return MappingProxyType(self._serialized_id_to_manager)
 
 @property
 def bursted(self) -> Set[LveStateManager]:
 return self._bursted
 
 @property
 def unbursted(self) -> Set[LveStateManager]:
 return self._unbursted
 
 @property
 def overusing(self) -> Set[LveStateManager]:
 return self._overusing
 
 @property
 def quota_exceeded(self) -> Set[LveStateManager]:
 return self._exceeded
 
 @property
 def on_manager_added(self) -> Signal:
 return self._on_manager_added
 
 def update(
 self,
 now: Timestamp,
 normal_limits_by_id: Mapping[LveId, LveLimits],
 stats_by_id: Mapping[SerializedLveId, LveStats],
 usages_by_id: Mapping[SerializedLveId, LveUsage],
 ) -> None:
 # TODO(vlebedev): Filter out users belonging to resellers (LVEStat contains reseller_id field)
 currently_existing_ids = stats_by_id.keys()
 
 newly_appeared_raw_ids = currently_existing_ids - self._serialized_id_to_manager.keys()
 for serialized_lve_id in newly_appeared_raw_ids:
 if serialized_lve_id == 0:
 continue
 # NOTE(vlebedev): Users under resellers are not supported for now.
 if stats_by_id[serialized_lve_id].reseller_id != 0:
 continue
 
 lve_id: LveId = self._deserialize_lve_id(serialized_lve_id)
 # TODO(vlebedev): Can it be that there are no normal limits and/or stats are available?
 #                 What to do in this case?
 errors = []
 try:
 normal_limits = normal_limits_by_id[lve_id]
 except KeyError:
 errors.append('normal limits')
 
 try:
 stats = stats_by_id[serialized_lve_id]
 except KeyError:
 errors.append('stats')
 
 if errors:
 # TODO(vlebedev): Raise exception when `fail_fast` is set.
 logger.warning(
 'LVE "%s": some "get_initial_readings" listeners failed: %s readings are absent!',
 lve_id,
 ' and '.join(f'"{e}"' for e in errors),
 )
 continue
 
 manager = self._create_lve_manager(
 now=now,
 lve_id=lve_id,
 normal_limits=normal_limits,
 stats=stats,
 usage=usages_by_id.get(serialized_lve_id, empty_usage),
 )
 self._serialized_id_to_manager[serialized_lve_id] = manager
 logger.debug('LVE "%s": unknown LVE appeared - created manager for it', lve_id)
 
 if logger.isEnabledFor(logging.DEBUG):
 logger.debug('LVEs known to adjuster: \n%s', pprint.pformat({
 slid: str(LveStateSummary.for_lve(m))
 for slid, m
 in self._serialized_id_to_manager.items()
 }, width=-1))
 
 disappeared_exc, to_delete = LveStateManager.Disappered(now=now), set()
 for serialized_lve_id, manager in self._serialized_id_to_manager.items():
 lve_id = manager.lve_id
 
 manager.trim_history(now)
 
 manager.step(LveStateManager.UpdateReadings(
 now=now,
 normal_limits=normal_limits_by_id.get(lve_id),
 stats=stats_by_id.get(serialized_lve_id),
 usage=usages_by_id.get(serialized_lve_id, empty_usage),
 ))
 
 if serialized_lve_id not in currently_existing_ids and manager.state != LveState.EXISTED:
 manager.step(disappeared_exc)
 for state_set in self._state_sets:
 state_set.discard(manager)
 if not manager.history_contains_overusing:
 to_delete.add(serialized_lve_id)
 continue
 
 lve_quota_exceeded = manager.check_quota_exceeded(now)
 
 (self._overusing.add if manager.is_overusing else self._overusing.discard)(manager)
 (self._bursted.add if manager.is_bursted else self._bursted.discard)(manager)
 (self._unbursted.add if manager.is_unbursted else self._unbursted.discard)(manager)
 (self._exceeded.add if lve_quota_exceeded else self._exceeded.discard)(manager)
 
 if self._fail_fast:
 if self._bursted.intersection(self._unbursted) != set():
 raise AssertionError('LVEs can`t be both bursted and unbursted!')
 if self._overusing.intersection(self._unbursted) != set():
 raise AssertionError('LVEs can`t be both overusing and unbursted!')
 
 for serialized_lve_id in to_delete:
 manager = self._serialized_id_to_manager.pop(serialized_lve_id)
 logger.debug(
 'LVE "%s": LVE is no longer active and has empty history - forgetting corresponding manager',
 manager.lve_id,
 )
 
 # NOTE(vlebedev): Trigger signal listeners only after internal state is finished to be updated.
 for serialized_id in newly_appeared_raw_ids:
 try:
 manager = self._serialized_id_to_manager[serialized_id]
 except KeyError:
 continue
 
 try:
 self._on_manager_added(manager)
 except Exception:
 if self._fail_fast:
 raise
 logger.exception('LVE "%s": some "on_manager_created" listeners failed!', manager.lve_id)
 
 |