| Viewing file:  __init__.py (8.53 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import atexit
 import contextlib
 import time
 import typing
 from datetime import timedelta
 from contextlib import ExitStack
 from typing import TypedDict, Callable, TYPE_CHECKING, Generator
 
 import sqlalchemy as sa
 
 if TYPE_CHECKING:
 from lvestat import LVEStat
 from lvestats.plugins.generic.analyzers import LVEUsage
 
 from lvestats.orm import BurstingEventType
 
 from ._logs import logger
 from .utils import bootstrap_gen
 from .config import (
 StartupParams,
 PluginConfig,
 Config,
 ConfigUpdate,
 is_bursting_supported,
 MissingKeysInRawConfig,
 )
 from .common import (
 BurstingMultipliers,
 LveState,
 SerializedLveId,
 GetNormalLimits,
 ApplyLveSettings,
 AdjustStepData,
 read_normal_limits_from_proc,
 Timestamp,
 PyLveSettingsApplier,
 )
 from .overload import OverloadChecker, GetStats, read_times_from_proc
 from .storage import (
 init_db_schema,
 load_bursting_enabled_intervals_from_db,
 events_saver_running,
 cleanup_running,
 InBurstingEventRow,
 )
 from .adjust import Adjuster, StepCalculator
 from .lve_sm import LveStateManager
 from .lves_tracker import LvesTracker, LveStateManagerFactory
 
 
 class _ExecutePayload(TypedDict):
 lve_active_ids: list[SerializedLveId]
 stats: dict[SerializedLveId, 'LVEStat']
 lve_usage_5s: dict[SerializedLveId, 'LVEUsage']
 
 
 class LveLimitsBurster:
 """
 Limits Burster plugin
 """
 
 def __init__(self, *args, **kwargs) -> None:
 super().__init__(*args, **kwargs)
 
 if is_bursting_supported():
 driver_gen = self._create_driver_gen()
 step = driver_gen.send
 
 # NOTE(vlebedev): It seems that plugins interface of lvestats does not contain any sane way
 #                 to get norified about server being stopped. Let's resort to hacks =/
 @atexit.register
 def cleanup():
 with contextlib.suppress(StopIteration, GeneratorExit):
 driver_gen.close()
 else:
 logger.info('Bursting Limits feature is not supported in current environment')
 
 def step(_, /):
 pass
 self._step: Callable[[sa.engine.Engine | ConfigUpdate | _ExecutePayload], None] = step
 
 @bootstrap_gen
 def _create_driver_gen(self):
 # NOTE(vlebedev): This import requires some shared library to be present in order to succeed,
 #                 so deffer it until it's really needed to make unittests writing/running easier.
 from lveapi import PyLve  # pylint: disable=import-outside-toplevel
 
 # NOTE(vlebedev): This is supposed to be a composition root.
 # NOTE(vlebedev): Wait until all data required for proper startup is received.
 engine, initial_config = yield from StartupParams.wait()
 
 pylve = PyLve()
 if not pylve.initialize():
 raise RuntimeError('Failed to initialize PyLve!')
 
 with adjuster_machinery_running(
 initial_config=initial_config,
 engine=engine,
 get_normal_limits=read_normal_limits_from_proc,
 apply_lve_settings=PyLveSettingsApplier(
 pylve=pylve,
 ),
 read_stats=read_times_from_proc,
 ) as adjuster:
 logger.info('LveLimitsBurster initialized')
 
 while True:
 msg = yield
 if not isinstance(msg, dict):
 logger.warning('Unexpected message type: %s', type(msg))
 continue
 now = Timestamp(int(time.time()))
 lve_active_ids = msg.get('lve_active_ids', [])
 stats = msg.get('stats', {})
 try:
 lve_usage_by_id = msg["lve_usages_5s"][-1]
 except (KeyError, IndexError):
 lve_usage_by_id = {}
 adjuster.step(AdjustStepData(
 now=now,
 lve_active_ids=lve_active_ids,
 stats=stats,
 lve_usages_by_id=lve_usage_by_id,
 ))
 
 def set_config(self, config: PluginConfig) -> None:
 # NOTE(vlebedev): Currently config dict contains all the keys from _all_ .cfg files parsed by
 #                 lvestats. So there is no point as report fields not present in `Confg` typing
 #                 as "unknown" or something like that - they might well belong to some other plugin =/
 try:
 config_update = ConfigUpdate.from_plugin_config(config)
 except MissingKeysInRawConfig as e:
 logger.info('Missing config keys: %s', e.missing_raw_keys)
 else:
 self._step(config_update)
 
 def set_db_engine(self, engine: sa.engine.Engine) -> None:
 # NOTE(vlebedev): 'Engine' is thread safe, so there is no problem in requesting connections
 #                 from it on different threads. For more info have a look at this:
 #                 https://groups.google.com/g/sqlalchemy/c/t8i3RSKZGb0/m/QxWshAS3iKgJ
 self._step(engine)
 
 def execute(self, lve_data: _ExecutePayload) -> None:
 self._step(lve_data)
 
 
 @contextlib.contextmanager
 def adjuster_machinery_running(
 initial_config: Config,
 engine: sa.engine.Engine,
 apply_lve_settings: ApplyLveSettings,
 get_normal_limits: GetNormalLimits,
 read_stats: GetStats,
 ) -> Generator[Adjuster, None, None]:
 now = Timestamp(int(time.time()))
 cutoff = Timestamp(int(now - initial_config.bursting_quota_window.total_seconds()))
 init_db_schema(engine)
 lve_to_history = load_bursting_enabled_intervals_from_db(
 engine=engine,
 cutoff=typing.cast(Timestamp, cutoff),
 server_id=initial_config.server_id,
 )
 logger.debug('Loaded intervals: %s', len(lve_to_history))
 
 with ExitStack() as deffer:
 deffer.enter_context(cleanup_running(
 engine=engine,
 server_id=initial_config.server_id,
 cleanup_interval=timedelta(days=1),
 history_window=timedelta(days=30),
 fail_fast=initial_config.fail_fast,
 ))
 
 write_event = deffer.enter_context(events_saver_running(
 engine=engine,
 server_id=initial_config.server_id,
 dump_interval=initial_config.db_dump_period,
 ))
 
 adjuster = Adjuster(
 lves_tracker=(lves_tracker := LvesTracker(
 create_lve_manager=LveStateManagerFactory(
 _lve_to_history=lve_to_history,
 _apply_lve_settings=apply_lve_settings,
 _quota=initial_config.bursting_quota,
 _quota_window=initial_config.bursting_quota_window,
 _bursting_multipliers=BurstingMultipliers(
 initial_config.bursting_cpu_multiplier,
 initial_config.bursting_io_multiplier,
 ),
 _fail_fast=initial_config.fail_fast,
 ),
 fail_fast=initial_config.fail_fast,
 )),
 get_normal_limits=get_normal_limits,
 step_calculator=StepCalculator(
 overload_threshold=1.0 - initial_config.idle_time_threshold,
 ),
 is_server_overloaded=OverloadChecker(
 idle_time_threshold=initial_config.idle_time_threshold,
 get_stats=read_stats,
 max_samples_number=initial_config.idle_time_samples,
 ),
 fail_fast=initial_config.fail_fast,
 )
 
 @lves_tracker.on_manager_added.register
 def on_new_lve_manager_created(manager: LveStateManager) -> None:
 lve_id = manager.lve_id
 
 @manager.on_state_changed.register
 def on_lve_state_chagned(old_state: LveState, new_state: LveState) -> None:
 assert old_state != new_state
 
 now = Timestamp(int(time.time()))
 
 if new_state == LveState.OVERUSING:
 write_event(InBurstingEventRow(
 lve_id=lve_id,
 timestamp=now,
 event_type=BurstingEventType.STARTED,
 ))
 elif old_state == LveState.OVERUSING:
 write_event(InBurstingEventRow(
 lve_id=lve_id,
 timestamp=now,
 event_type=BurstingEventType.STOPPED,
 ))
 
 yield adjuster
 
 |