| Viewing file:  config.py (7.96 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import typing
 import dataclasses
 from pathlib import Path
 from logging import Logger
 from datetime import timedelta
 from dataclasses import dataclass
 from collections import ChainMap
 from typing import Any, Generator, TypedDict, Mapping, Self, NamedTuple, Iterable, NotRequired
 
 import sqlalchemy as sa
 
 from ._logs import logger
 
 
 # TODO(vlebedev): Extract from package metadata instead of hardcoding?
 CONFIG_FILE = Path('/etc/sysconfig/lvestats.config/LveLimitsBurster.cfg')
 FEATURE_FLAG_FILE = Path('/opt/cloudlinux/flags/enabled-flags.d/burstable-limits.flag')
 
 
 class PluginConfig(TypedDict):
 bursting_enabled: NotRequired[str]
 server_id: NotRequired[str]
 bursting_quota_sec: str
 bursting_quota_window_sec: str
 bursting_idle_time_threshold: str
 bursting_cpu_multiplier: str
 bursting_io_multiplier: str
 bursting_database_dump_period_sec: str
 bursting_idle_time_samples_num: str
 bursting_debug_mode: NotRequired[str]
 
 
 _all_burster_plugin_config_keys = frozenset(PluginConfig.__annotations__.keys())
 
 
 @dataclass(frozen=True)
 class Config:
 server_id: str
 bursting_quota: timedelta
 bursting_quota_window: timedelta
 bursting_cpu_multiplier: float
 bursting_io_multiplier: float
 idle_time_threshold: float
 db_dump_period: timedelta
 idle_time_samples: int
 fail_fast: bool = True
 
 def __post_init__(self) -> None:
 if self.bursting_quota > self.bursting_quota_window:
 raise ValueError('Bursting quota must be less than or equal to bursting quota window!')
 
 
 _all_config_keys = frozenset(f.name for f in dataclasses.fields(Config))
 
 
 def is_bursting_enabled(config_file=CONFIG_FILE) -> bool:
 try:
 raw_config = read_raw_config(config_file)
 except FileNotFoundError:
 return False
 
 raw_key = 'bursting_enabled'
 assert raw_key in _all_burster_plugin_config_keys
 try:
 raw_value = raw_config[raw_key]
 except KeyError:
 return False
 
 try:
 return get_boolean(raw_value)
 except ValueError:
 return False
 
 
 def is_bursting_supported(feature_flag_file: Path = FEATURE_FLAG_FILE) -> bool:
 # NOTE(vlebedev): These imports requires some shared library to be present in order to succeed,
 #                 so deffer it until it's really needed to make unittests writing/running easier.
 from clcommon.utils import get_cl_version, is_ubuntu  # pylint: disable=import-outside-toplevel
 from clcommon.cpapi import Feature, is_panel_feature_supported  # pylint: disable=import-outside-toplevel
 
 if not is_panel_feature_supported(Feature.LVE):
 return False
 
 if is_ubuntu():
 return False
 
 cl_version = get_cl_version()
 if cl_version is None:
 return False
 
 try:
 if int(cl_version.removeprefix('cl').removesuffix('h')) < 8:
 return False
 except ValueError:
 return False
 
 return feature_flag_file.exists()
 
 
 def _identity(raw_value: str) -> str:
 return raw_value
 
 
 def get_boolean(raw_value: str) -> bool:
 value = raw_value.lower()
 
 if value not in {'true', 'false'}:
 raise ValueError(f'Unexpected value: {value}')
 
 return value == 'true'
 
 
 def _get_timedelta_from_seconds(raw_value: str) -> timedelta:
 seconds = int(raw_value)
 return timedelta(seconds=seconds)
 
 
 _raw_key_to_spec = {
 'bursting_enabled': ('enabled', get_boolean),
 'server_id': ('server_id', _identity),
 'bursting_debug_mode': ('fail_fast', get_boolean),
 'bursting_quota_sec': ('bursting_quota', _get_timedelta_from_seconds),
 'bursting_quota_window_sec': ('bursting_quota_window', _get_timedelta_from_seconds),
 'bursting_cpu_multiplier': ('bursting_cpu_multiplier', float),
 'bursting_io_multiplier': ('bursting_io_multiplier', float),
 'bursting_idle_time_threshold': ('idle_time_threshold', float),
 'bursting_database_dump_period_sec': ('db_dump_period', _get_timedelta_from_seconds),
 'bursting_idle_time_samples_num': ('idle_time_samples', int),
 }
 _config_to_raw_key = {v[0]: k for k, v in _raw_key_to_spec.items()}
 
 assert _raw_key_to_spec.keys() == _all_burster_plugin_config_keys
 assert {k for k, _ in _raw_key_to_spec.values()}.issuperset(_all_config_keys)
 
 
 def _process_raw_config(raw_config: Mapping[str, str]) -> dict[str, Any]:
 cfg_key_to_parsed_value, errors_by_cfg_key = {}, {}
 for config_key, raw_value in raw_config.items():
 try:
 _, extractor = _raw_key_to_spec[config_key]
 except KeyError:
 # NOTE(vlebedev): Currently config dict contains all the keys from _all_ .cfg files parsed by
 #                 lvestats. So there is no point as report fields not present in `Confg` typing
 #                 as "unknown" or something like that - they might well belong to some other plugin =/
 # errors_by_cfg_key[config_key] = f'Unknown config key'
 continue
 
 try:
 value = extractor(raw_value)
 except ValueError as e:
 errors_by_cfg_key[config_key] = str(e)
 continue
 
 cfg_key_to_parsed_value[config_key] = value
 
 if len(errors_by_cfg_key) > 0:
 logger.warning(
 "Failed to parse some config keys: \n%s",
 "\n".join(f"* {k}: {e}" for k, e in errors_by_cfg_key.items()),
 )
 
 result = {_raw_key_to_spec[k][0]: v for k, v in cfg_key_to_parsed_value.items()}
 return result
 
 
 class MissingKeysInRawConfig(ValueError):
 def __init__(self, missing_raw_keys: Iterable[str]) -> None:
 missing_raw_keys = frozenset(missing_raw_keys)
 msg = "Missing config keys: " + ", ".join(missing_raw_keys) + "!"
 super().__init__(msg, missing_raw_keys)
 
 @property
 def missing_raw_keys(self) -> frozenset[str]:
 return typing.cast(frozenset[str], self.args[1])
 
 
 class ConfigUpdate(NamedTuple):
 @classmethod
 def from_plugin_config(cls, config: PluginConfig) -> Self:
 assert all(isinstance(v, str) for v in config.values())
 external_params = _process_raw_config(typing.cast(Mapping[str, str], config))
 default_params = {
 'enabled': False,
 'server_id': 'localhost',
 'fail_fast': False,
 }
 
 if (defaults_used := default_params.keys() - external_params.keys()):
 logger.info('Using default values for: %s', defaults_used)
 
 params = ChainMap(external_params, default_params)
 
 missing_config_keys = _all_config_keys - params.keys()
 if missing_config_keys:
 raise MissingKeysInRawConfig(_config_to_raw_key[k] for k in missing_config_keys)
 
 return cls(
 enabled=params['enabled'],
 config=Config(**{k: params[k] for k in _all_config_keys})
 )
 
 enabled: bool
 config: Config
 
 
 class StartupParams(NamedTuple):
 @classmethod
 def wait(cls) -> Generator[None, ConfigUpdate | sa.engine.Engine, Self]:
 required_keys = frozenset(cls._fields)
 result = {}
 
 enabled = False
 while enabled is False or result.keys() != required_keys:
 match (yield):
 case sa.engine.Engine() as engine:
 result['engine'] = engine
 case ConfigUpdate(enabled=enabled, config=config):
 result['config'] = config
 return cls(**result)
 
 engine: sa.engine.Engine
 config: Config
 
 
 def read_raw_config(file: Path = CONFIG_FILE, _logger: Logger = logger) -> Mapping[str, str]:
 result = {}
 for line in file.read_text(encoding='utf-8').splitlines():
 try:
 key, value = line.split('=', maxsplit=1)
 except ValueError:
 _logger.warning('Failed to parse config line: %s', line)
 continue
 if key in result:
 _logger.warning('Duplicate key %s - latest value will be used', key)
 result[key] = value
 return result
 
 |