| Viewing file:  common.py (4.56 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import math
 import enum
 import json
 import typing
 from enum import StrEnum
 from types import SimpleNamespace
 from collections.abc import Sequence, Mapping
 from typing import NewType, TypedDict, Protocol, NamedTuple, TYPE_CHECKING
 
 if TYPE_CHECKING:
 from lveapi import PyLve
 
 
 SerializedLveId = NewType('SerializedLveId', int)
 LveId = NewType('LveId', int)
 Timestamp = NewType('Timestamp', int)
 
 
 class BurstingMultipliers(NamedTuple):
 cpu: float
 io: float
 
 
 @enum.unique
 class LveState(StrEnum):
 EXISTED = enum.auto()
 UNBURSTED = enum.auto()
 BURSTED = enum.auto()
 OVERUSING = enum.auto()
 
 def __str__(self) -> str:
 return self.name
 
 
 class LveStats(Protocol):
 cpu: int
 io: int  # in KB
 reseller_id: int
 
 
 empty_stats = typing.cast(LveStats, SimpleNamespace(
 cpu=0,
 cpu_usage=0,
 io=0,
 io_usage=0,
 reseller_id=0,
 ))
 
 
 class LveUsage(Protocol):
 cpu_usage: int
 io_usage: int  # in bytes
 
 
 empty_usage = typing.cast(LveUsage, SimpleNamespace(
 cpu_usage=0,
 io_usage=0,
 ))
 
 
 class AdjustStepData(NamedTuple):
 now: Timestamp
 lve_active_ids: Sequence[SerializedLveId]
 stats: Mapping[SerializedLveId, LveStats]
 lve_usages_by_id: Mapping[SerializedLveId, LveUsage]
 
 
 class LveLimits(TypedDict):
 cpu: int
 io: int
 
 
 class InvalidStateError(RuntimeError):
 pass
 
 
 class ApplyLveSettings(Protocol):
 def __call__(self, lve_id: LveId, lve_limits: LveLimits) -> None:
 ...
 
 
 class GetNormalLimits(Protocol):
 def __call__(self) -> Mapping[LveId, LveLimits]:
 ...
 
 
 def read_normal_limits_from_proc() -> Mapping[LveId, LveLimits]:
 with open('/var/run/cloudlinux/effective-normal-limits', 'r', encoding='utf-8') as f:
 result = json.load(f)
 result = {LveId(int(k)): v for k, v in result.items()}
 return result
 
 
 class PyLveSettingsApplier:
 def __init__(self, pylve: 'PyLve') -> None:
 self._pylve = pylve
 
 def __call__(self, lve_id: LveId, lve_limits: LveLimits) -> None:
 if lve_id == 0:
 raise RuntimeError('Cannot alter LVE with id 0')
 
 lve_settings = self._pylve.liblve_settings()
 
 lve_settings.ls_io = int(lve_limits['io'])
 lve_settings.ls_cpu = int(lve_limits['cpu'])
 lve_settings.ls_cpus = int(lve_limits['ncpu'])
 lve_settings.ls_memory = int(lve_limits['mem'])
 lve_settings.ls_enters = int(lve_limits['ep'])
 lve_settings.ls_memory_phy = int(lve_limits['pmem'])
 lve_settings.ls_nproc = int(lve_limits['nproc'])
 lve_settings.ls_iops = int(lve_limits['iops'])
 
 self._pylve.lve_setup(
 lve_id,
 lve_settings,
 err_msg=f'Can`t setup lve with id {lve_id}; error code {{code}}',
 )
 
 
 def infer_lve_state(
 stats: LveStats,
 normal_limits: LveLimits,
 usage: LveUsage,
 ) -> LveState:
 normal_cpu_limit = normal_limits['cpu']
 normal_io_limit = normal_limits['io'] * 1024  # to be in bytes
 
 kernel_cpu_limits = stats.cpu
 kernel_io_limits = stats.io * 1024  # to be in bytes
 
 current_cpu_usage = usage.cpu_usage
 current_io_usage = usage.io_usage  # in bytes
 
 if math.isclose(kernel_cpu_limits, normal_cpu_limit) and math.isclose(kernel_io_limits, normal_io_limit):
 assert current_cpu_usage <= normal_cpu_limit, 'CPU usage is not expected to be greater than CPU limit!'
 assert current_io_usage <= normal_io_limit, 'IO usage is not expected to be greater than IO limit!'
 return LveState.UNBURSTED
 elif current_cpu_usage < normal_cpu_limit and current_io_usage < normal_io_limit:
 return LveState.BURSTED
 else:
 return LveState.OVERUSING
 
 
 def get_deserialized_lve_id(serialized_lve_id: SerializedLveId) -> LveId:
 # NOTE(vlebedev): This import requires some shared library to be present in order to succeed,
 #                 so deffer it until it's really needed to make unittests writing/running easier.
 from lvestats.lib.commons.func import deserialize_lve_id  # pylint: disable=import-outside-toplevel
 
 lve_id, _ = deserialize_lve_id(serialized_lve_id)
 
 return typing.cast(LveId, lve_id)
 
 
 def calc_bursted_limits(normal_limits: LveLimits, bursting_multipliers: BurstingMultipliers) -> LveLimits:
 return {
 **normal_limits,
 'cpu': int(normal_limits['cpu'] * bursting_multipliers.cpu),
 'io': int(normal_limits['io'] * bursting_multipliers.io)
 }
 
 |