| Viewing file:  lve_sm.py (18.4 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 # pylint: disable=not-an-iterable
 
 import inspect
 import contextlib
 from dataclasses import dataclass
 from datetime import timedelta
 from typing import TypeAlias, NamedTuple, AbstractSet, Self
 
 from lvestats.orm import BurstingEventType
 
 from .utils import bootstrap_gen
 from ._logs import logger
 from .common import (
 BurstingMultipliers,
 LveId,
 Timestamp,
 LveLimits,
 ApplyLveSettings,
 LveStats,
 InvalidStateError,
 LveUsage,
 LveState,
 calc_bursted_limits,
 infer_lve_state,
 )
 from .history import LveHistory
 from .notify import Emitter, Signal
 
 
 class LveStateManager:
 @dataclass(frozen=True, slots=True)
 class Disappered(RuntimeError):
 now: Timestamp
 
 @dataclass(frozen=True, slots=True)
 class Burst:
 now: Timestamp
 
 @dataclass(frozen=True, slots=True)
 class Unburst:
 now: Timestamp
 
 # NOTE(vlebedev): Seems that namedtuples are ~20% faster to create compared to
 #                 classes - dataclasses or usual ones.
 class UpdateReadings(NamedTuple):
 now: Timestamp
 normal_limits: LveLimits | None
 stats: LveStats | None
 usage: LveUsage | None
 
 Request: TypeAlias = UpdateReadings | Disappered | Burst | Unburst
 
 @dataclass(frozen=True, slots=True)
 class _StateSwitched(RuntimeError):
 pass
 
 def __init__(
 self,
 now: Timestamp,
 lve_id: LveId,
 bursting_multipliers: BurstingMultipliers,
 initial_normal_limits: LveLimits,
 initial_stats: LveStats,
 initial_usage: LveUsage,
 quota: timedelta,
 quota_window: timedelta,
 apply_lve_settings: ApplyLveSettings,
 initial_history: LveHistory = LveHistory(),
 fail_fast: bool = True,
 ):
 if quota > quota_window:
 raise ValueError('Bursting quota must not exceed bursting quota window!')
 
 # NOTE(vlebedev): Constants start here.
 self._lve_id = lve_id
 self._bursting_multipliers = bursting_multipliers
 self._quota_sec = quota.total_seconds()
 self._quota_window_sec = quota_window.total_seconds()
 self._fail_fast = fail_fast
 # NOTE(vlebedev): Constants end here.
 
 # NOTE(vlebedev): "Input" variables start here.
 #                 "Input" variables are those that are read from external source (Request) and must not
 #                  be altered by internal logic.
 self._now = now
 self._normal_limits = initial_normal_limits
 self._stats = initial_stats
 self._usage = initial_usage
 self._state = infer_lve_state(
 initial_stats,
 initial_normal_limits,
 initial_usage,
 )
 # NOTE(vlebedev): "Input" variables end here.
 
 # NOTE(vlebedev): "Output" variables start here.
 #                 "Output" variables are a channel that communicates internal state to external world.
 #                 Internal logic is responsible for mutating these vairables.
 self._history = initial_history
 # NOTE(vlebedev): "Output" variables end here.
 
 # NOTE(vlebedev): Utility stuff starts here.
 #                 These variables are not supposed to rewritten by internal logic.
 def apply_lve_settings_wrapper(cmd) -> None:
 if isinstance(cmd, self.Burst):
 limits = calc_bursted_limits(self._normal_limits, bursting_multipliers)
 elif isinstance(cmd, self.Unburst):
 limits = self._normal_limits
 else:
 raise ValueError(f'Unexpected command: {cmd}')
 
 apply_lve_settings(self._lve_id, limits)
 self._apply_lve_settings = apply_lve_settings_wrapper
 
 root_handler_gen = self._create_root_handler_gen()
 
 def step(request):
 response = root_handler_gen.send(request)
 if isinstance(response, Exception):
 raise response
 return response
 self.step = step
 
 self._on_state_changed = Emitter()
 
 @property
 def lve_id(self) -> LveId:
 return self._lve_id
 
 @property
 def now(self) -> Timestamp:
 return self._now
 
 @property
 def state(self) -> LveState:
 return self._state
 
 @property
 def is_bursted(self) -> bool:
 return self._state in {LveState.BURSTED, LveState.OVERUSING}
 
 @property
 def is_unbursted(self) -> bool:
 return self._state == LveState.UNBURSTED
 
 @property
 def is_overusing(self) -> bool:
 return self._state == LveState.OVERUSING
 
 @property
 def history_contains_overusing(self) -> bool:
 return self._history.contains_overusing
 
 @property
 def history(self) -> LveHistory:
 return self._history
 
 def check_quota_exceeded(self, now: float) -> bool:
 return self._history.get_overusing_duration(now) >= self._quota_sec
 
 @property
 def on_state_changed(self) -> Signal:
 return self._on_state_changed
 
 def trim_history(self, now: Timestamp) -> None:
 cutoff = Timestamp(int(now - self._quota_window_sec))
 self._history = self._history.trim(cutoff)
 
 @bootstrap_gen
 def _create_root_handler_gen(self):
 # NOTE(vlebedev): This coroutine is solely responsible for modifying "input" variables.
 #                 No mutations of "input" variables must happen outside of it.
 
 state_to_handler_factory = {
 LveState.EXISTED: self._create_existed_state_handler_gen,
 LveState.UNBURSTED: self._create_unbursted_state_handler_gen,
 LveState.BURSTED: self._create_bursted_state_handler_gen,
 LveState.OVERUSING: self._create_overusing_state_handler_gen,
 }
 
 prev_state = next_state = self._state
 subhandler = state_to_handler_factory[next_state](next_state)
 request, response = None, None
 while True:
 try:
 request, response = (yield response), None
 
 # NOTE(vlebedev): "Input" variables mutations start here.
 #                 It's important that it happens before invoking state handler
 #                 so that state handler sees updated state.
 assert request.now >= self._now, "Time is not expected to go backward!"
 self._now = request.now
 if isinstance(request, self.UpdateReadings):
 if request.normal_limits is not None:
 self._normal_limits = request.normal_limits
 if request.stats is not None:
 self._stats = request.stats
 if request.usage is not None:
 self._usage = request.usage
 next_state = infer_lve_state(
 self._stats,
 self._normal_limits,
 self._usage,
 )
 elif isinstance(request, self.Disappered):
 next_state = LveState.EXISTED
 self._state = next_state
 # NOTE(vlebedev): "Input" variables mutations end here.
 
 # TODO(vlebedev): Refactor subhandler management - it turned out to be quite messy.
 if inspect.getgeneratorstate(subhandler) != inspect.GEN_CLOSED:
 subhandler_exc = None
 if isinstance(request, self.Disappered):
 subhandler_exc = request
 elif next_state != prev_state:
 subhandler_exc = self._StateSwitched()
 
 if subhandler_exc is not None:
 try:
 with contextlib.suppress(StopIteration, type(subhandler_exc)):
 subhandler.throw(subhandler_exc)
 except (StopIteration, type(subhandler_exc)):
 logger.debug(
 'LVE "%s": state subhandler for "%s" state terminated',
 self._lve_id, next_state
 )
 except Exception:
 if self._fail_fast:
 raise
 logger.exception(
 'LVE "%s": state subhandler for "%s" state unexpectedly failed during termination!',
 self._lve_id, next_state
 )
 else:
 try:
 subhandler.close()
 except Exception:
 if self._fail_fast:
 raise
 logger.exception(
 'LVE "%s": state subhandler for "%s" state did not terminate properly '
 'and failed during force-close!',
 self._lve_id, next_state
 )
 
 if inspect.getgeneratorstate(subhandler) == inspect.GEN_CLOSED and next_state != prev_state:
 subhandler = state_to_handler_factory[next_state](next_state)
 logger.debug('LVE "%s": state subhandler for "%s" state created', self._lve_id, next_state)
 
 if inspect.getgeneratorstate(subhandler) != inspect.GEN_CLOSED:
 try:
 response = subhandler.send(request)
 except StopIteration:
 logger.debug('LVE "%s": state subhandler for "%s" state finished', self._lve_id, next_state)
 except Exception:
 if self._fail_fast:
 raise
 logger.exception(
 'LVE "%s": state subhandler for "%s" state failed while handling "%s" request!',
 self._lve_id, next_state, request,
 )
 
 # NOTE(vlebedev): Trigger signal handlers only after all internal state has been updated.
 if next_state != prev_state:
 logger.debug(
 'LVE "%s": transitioned from "%s" to "%s" state',
 self._lve_id, prev_state, next_state,
 )
 try:
 self._on_state_changed(prev_state, next_state)
 except Exception:
 if self._fail_fast:
 raise
 logger.exception('LVE "%s": some "on_state_changed" listeners failed!', self._lve_id)
 
 prev_state = next_state
 except Exception as e:
 if self._fail_fast:
 raise e
 logger.critical('LVE "%s": top handler unexpectadly failed!', self._lve_id, exc_info=e)
 
 @bootstrap_gen
 def _create_unbursted_state_handler_gen(self, prev_state: LveState):
 request, response = None, None
 try:
 while True:
 request, response = (yield response), None
 
 if isinstance(request, self.Unburst):
 response = InvalidStateError('Already unbursted!')
 elif isinstance(request, self.Burst):
 try:
 self._apply_lve_settings(request)
 except Exception as e:
 response = e
 else:
 break
 except self._StateSwitched:
 if self._state == LveState.BURSTED:
 logger.warning(
 'LVE "%s": was set to bursted externally, without internal request from adjuster',
 self._lve_id,
 )
 raise
 
 logger.debug('LVE "%s": waiting for bursting enabled confirmation', self._lve_id)
 
 try:
 yield from self._wait_for_confirmation_from_readings({LveState.BURSTED, LveState.OVERUSING})
 except self.Disappered:
 logger.debug(
 'LVE "%s": disappeared while waiting for bursting enabled or overusing started confirmation',
 self._lve_id,
 )
 raise
 except self._StateSwitched:
 logger.warning(
 'LVE "%s": was switched to "%s" while waiting for bursting enabled or overusing started confirmation',
 self._lve_id, self._state,
 )
 raise
 
 assert self._state in {LveState.BURSTED, LveState.OVERUSING}
 
 logger.debug(
 'LVE "%s": confirmed from readings that bursting switched to enabled%s',
 self._lve_id,
 " and overusing has started" if self._state == LveState.OVERUSING else "",
 )
 
 @bootstrap_gen
 def _create_bursted_state_handler_gen(self, prev_state: LveState):
 request, response = None, None
 try:
 while True:
 request, response = (yield response), None
 
 if isinstance(request, self.Unburst):
 try:
 self._apply_lve_settings(request)
 except Exception as e:
 response = e
 else:
 break
 elif isinstance(request, self.Burst):
 response = InvalidStateError('Already bursted!')
 except self._StateSwitched:
 if self._state == LveState.UNBURSTED:
 logger.warning(
 'LVE "%s": was set to unbursted externally, without internal request from adjuster',
 self._lve_id,
 )
 raise
 
 logger.debug('LVE "%s": waiting for bursting disabled confirmation', self._lve_id)
 
 try:
 yield from self._wait_for_confirmation_from_readings({LveState.UNBURSTED})
 except self.Disappered:
 logger.debug('LVE "%s": disappeared while waiting for bursting disabled confirmation', self._lve_id)
 raise
 except self._StateSwitched:
 logger.warning(
 'LVE "%s": was switched to "%s" while waiting for bursting disabled confirmation',
 self._lve_id, self._state,
 )
 raise
 
 logger.debug('LVE "%s": confirmed from readings that bursting switched to disabled', self._lve_id)
 
 @bootstrap_gen
 def _create_overusing_state_handler_gen(self, prev_state: LveState):
 # TODO(vlebedev): Do not use absolute time but relative to the latest event with `now`.
 self._history = self._history.append(self._now, BurstingEventType.STARTED)
 
 request, response = None, None
 try:
 try:
 while True:
 request, response = (yield response), None
 
 if isinstance(request, self.Unburst):
 try:
 self._apply_lve_settings(request)
 except Exception as e:
 response = e
 else:
 break
 elif isinstance(request, self.Burst):
 response = InvalidStateError('LVE is overusing i.e. is also already bursted!')
 except self._StateSwitched:
 if self._state == LveState.UNBURSTED:
 logger.warning(
 'LVE "%s": was set to unbursted externally, without internal request from adjuster',
 self._lve_id,
 )
 raise
 
 yield from self._wait_for_unbursted_confirmation_from_readings(prev_state)
 finally:
 self._history = self._history.append(self._now, BurstingEventType.STOPPED)
 
 @bootstrap_gen
 def _create_existed_state_handler_gen(self, prev_state: LveState):
 request, response = None, None
 try:
 while True:
 try:
 request, response = (yield response), None
 except self.Disappered:
 logger.warning(
 'LVE "%s": disappeared again while waiting for LVE to re-appear',
 self._lve_id,
 )
 
 if isinstance(request, (self.Burst, self.Unburst)):
 response = InvalidStateError('LVE does not exist!')
 except self._StateSwitched:
 logger.debug('LVE "%s": got back from the dead', self._lve_id)
 return
 
 def _wait_for_unbursted_confirmation_from_readings(self, prev_state: LveState):
 try:
 yield from self._wait_for_confirmation_from_readings({LveState.UNBURSTED})
 except self.Disappered:
 logger.debug('LVE "%s": disappeared while waiting for bursting disabled confirmation', self._lve_id)
 raise
 except self._StateSwitched:
 logger.warning(
 'LVE "%s": was switched to "%s" while waiting for bursting disabled confirmation',
 self._lve_id, self._state,
 )
 raise
 
 logger.debug('LVE "%s": confirmed from readings that bursting switched to disabled', self._lve_id)
 
 def _wait_for_confirmation_from_readings(self, expected_states: AbstractSet[LveState]):
 request, response = None, None
 try:
 while True:
 request, response = (yield response), None
 
 if isinstance(request, (self.Burst, self.Unburst)):
 response = InvalidStateError('Waiting for confirmation from readings that limits have switched!')
 except self._StateSwitched:
 if self._state not in expected_states:
 raise
 
 
 _state_to_code = {
 LveState.EXISTED: "X",
 LveState.UNBURSTED: "U",
 LveState.BURSTED: "B",
 LveState.OVERUSING: "O",
 }
 
 assert _state_to_code.keys() == set(LveState)
 
 
 class LveStateSummary(NamedTuple):
 @classmethod
 def for_lve(cls, manager: LveStateManager) -> Self:
 return cls(manager.state, manager.history, manager.now)
 
 state: LveState
 history: LveHistory
 now: Timestamp
 
 def __str__(self) -> str:
 intervals = self.history.get_overusing_intervals(self.now)
 duration = self.history.get_overusing_duration(self.now)
 state_code = _state_to_code[self.state]
 return f"{state_code};[{intervals}~={round(duration)}s]"
 
 |