| Viewing file:  lveinfolib.py (41.82 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 # pylint: disable=too-many-lines
 
 import copy
 import datetime
 import logging
 import pwd
 import time
 import warnings
 
 from sqlalchemy import Float, Integer, text
 from sqlalchemy import exc as sa_exc
 from sqlalchemy.sql import and_, asc, desc, label, not_, or_, select
 from sqlalchemy.sql.expression import ColumnElement, alias, case, cast, func
 
 from lvestats.core.plugin import LveStatsPluginTerminated
 from lvestats.lib.commons.dateutil import (
 gm_datetime_to_unixtimestamp,
 gm_to_local,
 local_to_gm,
 round_1m,
 str_to_timedelta,
 unixtimestamp_to_gm_datetime,
 )
 from lvestats.lib.commons.func import (
 get_current_max_lve_id,
 skip_user_by_maxuid,
 )
 from lvestats.lib.commons.sizeutil import convert_bytes, convert_powers_of_1000, mempages_to_bytes
 from lvestats.lib.config import (
 HIDE_MAX_UID_LVE_PARAMETER,
 read_config,
 str_to_bool,
 )
 from lvestats.orm import LVE_STATS_2_TABLENAME_PREFIX, history, history_x60, servers
 
 __all__ = ('HistoryShow', 'HistoryShowUnion', 'OutputFormatter', 'get_lve_version')
 
 
 def get_lve_version(dbengine, server_id):
 sql_query_lve_version = select([servers.lve_version]).where(servers.server_id == server_id)
 with dbengine.begin() as transaction_:
 cursor_ = transaction_.execute(sql_query_lve_version)
 if cursor_.returns_rows:
 res = cursor_.fetchone()
 if res is None:
 return 6
 else:
 return int(res['lve_version'])
 else:
 return 6
 
 
 def servers_info(dbengine):
 sql_query_lve_version = select([servers.server_id, servers.lve_version])
 return dbengine.execute(sql_query_lve_version)
 
 
 def convert_key_to_label(key_):
 return str.lower(key_)
 
 
 def add_labes_to_column(func_dict):
 """
 Add label (on SQL add ".. as ..") to aggregate_func_dict
 key of dict convert to lowercase and add as label
 :param dict func_dict:
 :return dict:
 """
 func_dict_labeled = {}
 for key_, sqlalchemy_func in list(func_dict.items()):
 if issubclass(sqlalchemy_func.__class__, ColumnElement):
 func_labeled = label(convert_key_to_label(key_), sqlalchemy_func)
 else:
 func_labeled = sqlalchemy_func
 func_dict_labeled[key_] = func_labeled
 return func_dict_labeled
 
 
 def convert_to_list(arg):
 if isinstance(arg, (str, int)):
 return [arg]
 return arg
 
 
 class OutputFormatter(object):
 def __init__(self, fields, rows=None, orders=None):
 self.rows = rows or []
 self.fields = list(fields)
 self._fields_lowered = [_.lower() for _ in fields]  # self.fields use for output
 self._orders = []
 self._hidden_fields = set()  # self._fields indexes list fields  to hide
 
 if orders:
 for field, order in orders:
 self.add_order(field, order)
 
 def get_fields(self):
 if self._hidden_fields:
 return [f_ for f_ in self.fields if f_ not in self._hidden_fields]
 else:
 return self.fields
 
 def set_rows(self, rows):
 """
 Use this method if you used hide_fields
 number items in row must bee same as in fields after hide
 """
 self.rows = rows
 
 def hide_fields(self, h_fields):
 """
 :param tuple|list h_fields:
 :return:
 """
 self._hidden_fields = self._hidden_fields.union(set(h_fields))
 
 def add_order(self, fields, order):
 """
 :param list|tuples fields:
 :param order: field to use to order result
 :return:
 """
 if isinstance(order, str):
 try:
 order = getattr(self, order)
 except AttributeError as e:
 raise ValueError(f'Non such {order} order') from e
 if not hasattr(order, '__call__'):
 raise ValueError(f'input object {order} must be callable')
 self._orders.append(({_.lower() for _ in fields}, order))
 
 def _convert_line(self, row_):
 """
 :param iterable row_:
 :return:
 """
 row_ = list(row_)
 row_out = []
 for field_, r_ in zip(self._fields_lowered, row_):
 if field_ in self._hidden_fields:  # continue if field must be hide
 continue
 # use many orders to one cell
 for order_fields, order in self._orders:
 if field_ in order_fields:
 try:
 r_ = order(r_)
 except (TypeError, ValueError, KeyError, IndexError):
 pass
 row_out.append(r_)
 return row_out
 
 def __iter__(self):
 for row_ in self.rows:
 yield self._convert_line(row_)
 
 def __getitem__(self, index):
 if isinstance(index, slice):
 return list(map(self._convert_line, self.rows[index.start: index.stop]))
 return self._convert_line(self.rows[index])
 
 def __len__(self):
 return len(self.rows)
 
 def get_corrected_list(self):
 return list(self)
 
 # build-in orders
 @staticmethod
 def strftime(value, format_='%m-%d %H:%M'):
 return value.strftime(format_)
 
 @staticmethod
 def percentage(value):
 if value is None:
 return '-'
 try:
 float(value)
 except ValueError:
 return value
 return str(value * 100) + '%'
 
 @staticmethod
 def bytes(value):
 if value is None:
 return '-'
 return convert_bytes(value)
 
 @staticmethod
 def powers_of_1000(value):
 if value is None:
 return '-'
 return convert_powers_of_1000(value)
 
 @staticmethod
 def username(value):
 try:
 return pwd.getpwuid(int(value)).pw_name
 except KeyError:
 return value
 
 @staticmethod
 def datetime(value):
 """
 Convert unix timestamp to datetime (local timezone)
 """
 return datetime.datetime.fromtimestamp(value)
 
 
 def enumerate_duplicate_columns(columns):
 """
 Enumerate if columns name or functions are duplicated
 Use for force add duplicate columns to select operator
 :param list|tuple columns:
 :return list|tuple: enumerated columns
 """
 output = []
 output_str = []  # for controlling columns count
 for c_ in columns:
 c_str = str(c_)
 dubl_count = output_str.count(c_str)
 if dubl_count >= 1:  # check whether there is a duplicate; "c_ in columns" for sqlalchemy classes not work
 # check if column is string or function and get column name
 if isinstance(c_, str):
 c_name = c_
 else:
 c_name = c_.name
 
 # numbering duplicate columns
 c_name += '_' + str(dubl_count + 1)
 
 c_ = label(c_name, c_)  # rename column
 output.append(c_)
 output_str.append(c_str)
 return output
 
 
 usage_to_limit_dict = {
 'aCPU'.lower(): 'lCPU'.lower(),
 'mCPU'.lower(): 'lCPU'.lower(),
 'aVMem'.lower(): 'lVMem'.lower(),
 'mVMem'.lower(): 'lVMem'.lower(),
 'aEP'.lower(): 'lEP'.lower(),
 'mEP'.lower(): 'lEP'.lower(),
 'aPMem'.lower(): 'lPMem'.lower(),
 'mPMem'.lower(): 'lPMem'.lower(),
 'aNproc'.lower(): 'lNproc'.lower(),
 'mNproc'.lower(): 'lNproc'.lower(),
 'aIO'.lower(): 'lIO'.lower(),
 'mIO'.lower(): 'lIO'.lower(),
 'aIOPS'.lower(): 'lIOPS'.lower(),
 'mIOPS'.lower(): 'lIOPS'.lower(),
 }
 
 FIELD_TO_TABLE_COLUMN = {
 'ID': 'id',
 'aCPU': 'cpu',
 'aVMem': 'mem',
 'aEP': 'mep',
 'aPMem': 'memphy',
 'aIO': 'io',
 'aNproc': 'nproc',
 'aIOPS': 'iops',
 'lCPU': 'cpu_limit',
 'lEP': 'mep_limit',
 'lVMem': 'mem_limit',
 'lPMem': 'lmemphy',
 'lIO': 'io_limit',
 'lNproc': 'lnproc',
 'lIOPS': 'liops',
 'VMemF': 'mem_fault',
 'PMemF': 'memphy_fault',
 'EPf': 'mep_fault',
 'NprocF': 'nproc_fault',
 'CPUf': 'cpu_fault',
 'IOf': 'io_fault',
 'IOPSf': 'iops_fault',
 'uCPU': 'cpu',
 'uEP': 'mep',
 'uVMem': 'mem',
 'uPMem': 'memphy',
 'uIO': 'io',
 'uNproc': 'nproc',
 'uIOPS': 'iops',
 'mCPU': 'cpu',
 'mEP': 'mep',
 'mVMem': 'mem',
 'mPMem': 'memphy',
 'mNproc': 'nproc',
 'mIO': 'io',
 'mIOPS': 'iops',
 }
 
 FIELD_AVERAGE = ['aCPU', 'aVMem', 'aPMem', 'aEP', 'aNproc', 'aIO', 'aIOPS']
 FIELD_LIMIT = ['lCPU', 'lVMem', 'lPMem', 'lEP', 'lNproc', 'lIO', 'lIOPS']
 FIELD_FAULT = ['CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF', 'IOf', 'IOPSf']
 FIELD_USAGE = ['uCPU', 'uVMem', 'uPMem', 'uEP', 'uNproc', 'uIO', 'uIOPS']
 FIELD_MAX = ['mCPU', 'mVMem', 'mPMem', 'mEP', 'mNproc', 'mIO', 'mIOPS']
 
 KEYS_NORMALIZATION_LOOKUP_TABLE = {'FROM': 'From', 'TO': 'To', 'ANYF': 'anyF'}
 for key in list(FIELD_TO_TABLE_COLUMN.keys()):
 KEYS_NORMALIZATION_LOOKUP_TABLE[key.upper()] = key
 
 
 def normalize_optional_column_names(names):
 if names is None:
 return None
 return normalize_column_names(names)
 
 
 def normalize_column_names(names):
 result = []
 for name in names:
 result.append(normalize_column_name(name))
 return result
 
 
 def normalize_column_name(name):
 if name:
 return KEYS_NORMALIZATION_LOOKUP_TABLE[name.upper()]
 return None
 
 
 time_unit_orders = [
 ('10m', lambda dt: dt - datetime.timedelta(minutes=dt.minute % 10 + 10)),
 ('1h', lambda dt: dt.replace(minute=0) if dt.minute else dt - datetime.timedelta(hours=1)),  # round to hour
 ('1d', lambda dt: dt.replace(hour=0, minute=0)),  # round to day
 ]
 
 
 def dyn_time_unit_groups(period_from, period_to):
 period_groups = [round_1m(period_from), round_1m(period_to + datetime.timedelta(minutes=1))]
 time_unit_list = ['1m']
 _from_order_fun = round_1m
 for time_unit, order_fun in time_unit_orders:
 from_to_point = order_fun(period_groups[1])
 if from_to_point - period_groups[0] >= str_to_timedelta(time_unit):
 _from_order_fun = order_fun
 period_groups.insert(1, from_to_point)
 time_unit_list.append(time_unit)
 period_groups[0] = _from_order_fun(period_groups[0])
 # prepare to output as list of tuples
 # (<From|datetime>, <To|datetime>, <time-unit|int>)
 from_to_groups = []
 for index, time_unit in enumerate(time_unit_list):
 from_to_groups.append(
 (period_groups[-index - 2], period_groups[-index - 1], int(str_to_timedelta(time_unit).total_seconds()))
 )
 return from_to_groups
 
 
 class HistoryShow(object):
 def __init__(
 self,
 dbengine,
 period_from,
 period_to,
 uid=None,
 show_columns=None,
 server_id='localhost',
 time_unit=None,
 order_by=None,
 by_usage=None,
 by_usage_percentage=0.9,
 by_fault=None,
 threshold=1,
 limit=0,
 table=None,
 log=None,
 time_count=None,
 show_idle=False,
 ):
 """
 Show different statistics from history table
 :param sqlalchemy.engine.base.Engine dbengine: database engine to use
 :param datetime.datetime|float|int period_from:  start time retrieve data
 :param datetime.datetime|float|int period_to:    end time retrieve data
 :param int|None|list|tuple uid:                  filter the output information to the user uid
 :param tuple|list show_columns:        display columns in the order specified. If not, show all supported
 valid column names: 'aCPU', 'lPMem', 'uIO', 'uEP', 'lEP', 'aVMem', 'PMemF', 'lVMem', 'NprocF', 'anyF',
 'aNproc', 'VMemF', 'ID', 'lCPU', 'aIOPS', 'aEP', 'aPMem', 'uPMem', 'lIO', 'lIOPS', 'uCPU',
 'lNproc', 'aIO', 'uIOPS', 'EPf', 'uVMem', 'uNproc'
 
 :param str server_id:                  filtering the output for "server id"
 :param int time_unit:                  grouping output over an interval of time (in seconds)
 :param str|None order_by:              sorting output by column name (supported by columns)
 :param str|tuple|list by_usage:        filtering are grouped data for the percentage of the use of resources
 :param float by_usage_percentage:      percent for the parameter setting 'by_usage'
 :param tuple|list|None by_fault:       filtering data are grouped for quantity faults
 (None if it is not filtered)
 valid names: 'aCPU', 'lPMem', 'uIO', 'uEP', 'lEP', 'aVMem', 'PMemF', 'lVMem', 'NprocF', 'anyF', 'aNproc',
 'VMemF', 'ID', 'lCPU', 'aIOPS', 'aEP', 'aPMem', 'uPMem', 'lIO', 'lIOPS', 'uCPU', 'lNproc', 'aIO', 'uIOPS',
 'EPf', 'uVMem', 'uNproc'
 :param threshold:                      number faults for filtering the data are grouped
 (used together with by_fault)
 :param int|None limit:                 limit on the number of output data
 (if 0 or None, then the limit is not set)
 :return generator:                     returns a list/generator of data with the order set out
 in the 'show_columns'
 """
 self.dbengine = dbengine
 self.uid = uid
 self._is_multi_uids = not isinstance(uid, int)
 if show_columns is None:
 show_columns = get_supported_columns(lve_version=get_lve_version(dbengine=dbengine, server_id=server_id))
 show_columns.insert(0, 'ID')
 self.show_columns = normalize_column_names(show_columns)
 self.server_id = server_id
 self.time_unit = time_unit
 self.by_fault = normalize_optional_column_names(by_fault)
 if order_by:
 self.order_by = normalize_column_name(order_by)
 elif self.by_fault and normalize_column_name('anyF') in self.by_fault:
 self.order_by = normalize_column_name('CPUf')
 else:
 self.order_by = self.by_fault and self.by_fault[0]
 self.by_usage = normalize_optional_column_names(by_usage)
 self.by_usage_percentage = by_usage_percentage
 self.threshold = threshold
 self.limit = limit
 self.log = log or logging.getLogger('SQL')
 
 self.table = table if table is not None else history.__table__  # "or" not supported in this
 self._table_alive = alias(self.table, 'alive')  # alias of main table to using in self join
 self.period_from = (
 period_from if isinstance(period_from, (int, float)) else gm_datetime_to_unixtimestamp(period_from)
 )
 self.period_to = period_to if isinstance(period_to, (int, float)) else gm_datetime_to_unixtimestamp(period_to)
 self.time_count = time_count or self.get_time_count()
 # correct cpu/100
 # we still have to round it, as it seems <[27.333/100] = [0.27332999999999996] - who knows why :(
 self.result_corrector = OutputFormatter(
 fields=self.show_columns,
 orders=[
 [FIELD_MAX + FIELD_AVERAGE + FIELD_LIMIT + FIELD_USAGE, lambda x: round(x, 3)],
 [['aCPU', 'lCPU', 'mCPU'], lambda item: round(float(item) / 100.0, 5)],
 ],
 )
 self.hide_maxuid_lve = str_to_bool(read_config().get(HIDE_MAX_UID_LVE_PARAMETER, 'true'))
 
 def set_normalised_output(self):
 # correct data obtained from database
 # round EP IOPS Nproc output
 self.result_corrector.add_order(
 fields=['aEP', 'mEP', 'lEP', 'aNproc', 'mNproc', 'lNproc', 'aIOPS', 'mIOPS', 'lIOPS'],
 order=lambda x: int(round(x)),
 )
 if self.dbengine.url.drivername != "sqlite":
 self.result_corrector.add_order(
 fields=[
 'aVMem',
 'mVMem',
 'lVMem',
 'aPMem',
 'mPMem',
 'lPMem',
 'aIO',
 'mIO',
 'lIO',
 'uCPU',
 'uEP',
 'uVMem',
 'uPMem',
 'uIO',
 'uNproc',
 'uIOPS',
 ],
 order=float,
 )
 self.result_corrector.add_order(
 fields=['EPf', 'VMemF', 'CPUf', 'PMemF', 'NprocF', 'IOf', 'IOPSf'], order=int
 )
 
 # convert Mem to bytes
 self.result_corrector.add_order(
 fields=['aVMem', 'mVMem', 'lVMem', 'aPMem', 'mPMem', 'lPMem'], order=mempages_to_bytes
 )
 
 def _where_time_period(self, table=None):
 """
 Generate WHERE created BETWEEN xxxxxxxxx AND yyyyyyyy
 :return:
 """
 # filtering condition by time period
 if table is None:
 table = self.table
 return table.c.created.between(self.period_from, self.period_to)
 
 def _where_server_id(self):
 """
 Generate WHERE server_id = 'server_name'
 :return:
 """
 return self.table.c.server_id == self.server_id
 
 def _where_uid(self, uid=-1, table=None):
 """Generate WHERE id = 'user_uid'"""
 if table is None:
 table = self.table
 if uid == -1:
 uid = self.uid
 
 if uid is None:
 if self.hide_maxuid_lve:
 # skip ids in range(MAX_UID, MAX_LVE_ID), because ids > MAX_LVE_ID may contain info
 # about reseller`s limits
 return and_(
 table.c.id > 0, or_(not_(skip_user_by_maxuid(table.c.id)), table.c.id > get_current_max_lve_id())
 )
 return table.c.id > 0
 
 elif isinstance(uid, (list, tuple)):
 if self.dbengine.url.drivername == 'sqlite':
 # little workaround for sqlite's limit of 999 variables
 # let's compile query manually
 return text(table.c.id.in_(list(uid)).expression.compile(compile_kwargs={"literal_binds": True}).string)
 else:
 # mysql or postgresql do not have such limit
 return table.c.id.in_(list(uid))
 else:
 return table.c.id == uid
 
 def get_time_count(self):
 """
 SELECT count(*) FROM lve_stats2_history WHERE id = 0 AND created BETWEN xxxx AND yyyy server_id = 'localhost'
 """
 where = and_(
 history.created.between(self.period_from, self.period_to),
 history.id == 0,
 history.server_id == self.server_id,
 )
 query = select([text('count(*)')]).where(where)
 time_start = time.time()
 q = str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ')
 self.log.debug(q)
 data = self.dbengine.execute(query)
 self.log.debug('query time: %s', time.time() - time_start)
 return data.fetchall()[0][0]
 
 def _fun_avg(self, item):
 """
 Generate aggregate function for calculate average
 for example sum(lve_stats2_history.cpu) / 60
 :param item:
 :return:
 """
 return cast(func.sum(item) / cast(self._fun_time_count(), Float), Float)
 
 def _fun_limit(self, item):
 """
 Generate aggregate function for calculate limit
 if column_limit is zero its mean no limit
 for example CASE WHEN (min(lve_stats2_history.cpu_limit) > 0) THEN max(lve_stats2_history.cpu_limit) ELSE 0 END
 :param item:
 :return:
 """
 return case([(func.min(item) > 0, func.max(item))], else_=0)
 
 def _fun_fault(self, item):
 """
 Generate aggregate function for calculate fault
 for example sum(lve_stats2_history.cpu_fault)
 :param item:
 :return:
 """
 return func.sum(item)
 
 def _fun_usage(self, item, item_limit):
 """
 Generate aggregate function for calculate resource usage equivalent average/limit
 for example
 CASE
 WHEN (
 CASE
 WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit)
 ELSE 0
 END IS NULL
 ) THEN NULL
 WHEN (
 CASE
 WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit)
 ELSE 0
 END > 0
 ) THEN (
 sum(lve_stats2_history.cpu) / 1422
 ) / CASE
 WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit)
 ELSE 0
 END
 END
 :param item:
 :return:
 """
 # noinspection PyComparisonWithNone
 return case(
 [
 # Don't use "is None" here.
 (self._fun_limit(item_limit) == None, None),  # NOQA
 (self._fun_limit(item_limit) > 0, self._fun_avg(item) / self._fun_limit(item_limit)),
 ],
 else_=None,
 )
 
 def _fun_max(self, item, item_limit, item_fault):
 """
 Generate aggregate function for calculate maximum resource usage; for backward capability with lve-stats 0.x
 for example:
 CASE
 WHEN
 (sum(lve_stats2_history.cpu_fault) > 0)
 THEN
 max(lve_stats2_history.cpu_limit)
 ELSE
 max(lve_stats2_history.cpu) END
 :param item:
 :param item_limit:
 :param item_fault:
 :return:
 """
 return case([(func.sum(item_fault) > 0, func.max(item_limit))], else_=func.max(item))
 
 def _fun_time_count(self):
 if self._check_need_join():
 return text('count(*)')
 else:
 return self.time_count
 
 def _fun_time_from(self):
 if self._is_multi_uids:
 return self.period_from
 else:
 if self._check_need_join():
 _table = self._table_alive
 else:
 _table = self.table
 if self.dbengine.url.drivername == 'sqlite':
 # cast(..., Integer) using for compatibility with lve-stats-2.1-8 database; 'created' saved as float
 return (
 cast((_table.c.created - self.period_from) / self.time_unit, Integer) * self.time_unit
 + self.period_from
 )
 else:
 return (
 func.floor((_table.c.created - self.period_from) / self.time_unit) * self.time_unit
 + self.period_from
 )
 
 def _fun_time_to(self):
 # in case of changes here don't forget to check _group_by_query
 if self._is_multi_uids:
 return self.period_to
 else:
 return self._fun_time_from() + self.time_unit
 
 def _fun_user_id(self):
 if self._is_multi_uids:
 return label('ID', self.table.c.id)
 else:
 return label('ID', text(str(self.uid)))
 
 def _aggregate_fun_case(self, item):
 """
 Function for obtain aggregate function (or column name) by column name
 :param item: 'aCPU', 'aVMem', 'aPMem', 'aEP', 'aNproc',  'aIO', 'aIOPS', 'lCPU', 'lVMem', 'lPMem', 'lEP',
 'lNproc',  'lIO', 'lIOPS', 'CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF',  'IOf', 'IOPSf', 'uCPU',
 'uVMem', 'uPMem', 'uEP', 'uNproc',  'uIO', 'uIOPS', 'mCPU', 'mVMem', 'mPMem', 'mEP',  'mNproc',
 'mIO', 'mIOPS', 'anyF', 'ID', 'From', 'To'
 :type item: str
 :return:
 """
 if item == 'anyF':
 fun_ = func.sum(
 self.table.c.mem_fault
 + self.table.c.memphy_fault
 + self.table.c.mep_fault
 + self.table.c.nproc_fault
 + self.table.c.cpu_fault
 + self.table.c.io_fault
 + self.table.c.iops_fault
 )
 elif item == 'ID':
 fun_ = self._fun_user_id()
 elif item == 'From':
 fun_ = self._fun_time_from()
 elif item == 'To':
 fun_ = self._fun_time_to()
 else:
 column_name = FIELD_TO_TABLE_COLUMN[item]
 table_column = getattr(self.table.c, column_name)
 if item in FIELD_AVERAGE:
 fun_ = self._fun_avg(table_column)
 elif item in FIELD_LIMIT:
 fun_ = self._fun_limit(table_column)
 elif item in FIELD_FAULT:
 fun_ = self._fun_fault(table_column)
 elif item in FIELD_USAGE:
 column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_USAGE.index(item)]]
 table_column_limit = getattr(self.table.c, column_name_limit)
 fun_ = self._fun_usage(table_column, table_column_limit)
 elif item in FIELD_MAX:
 column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]]
 column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]]
 table_column_limit = getattr(self.table.c, column_name_limit)
 table_column_fault = getattr(self.table.c, column_name_fault)
 fun_ = self._fun_max(table_column, table_column_limit, table_column_fault)
 return fun_
 
 def _group_by_query(self, select_query):
 by_usage_list = convert_to_list(self.by_usage)
 by_fault_list = convert_to_list(self.by_fault)
 if self._is_multi_uids:
 query = select_query.group_by(self.table.c.id)
 else:
 # it is correct because in query result all records having the same _fun_time_from()
 # also have same _fun_time_to()
 # so .group_by(_fun_time_to()) doesn't create new groups after .group_by(_fun_time_from())
 # but we need both this group conditions because mysql throws an error otherwise
 query = select_query.group_by(self._fun_time_from()).group_by(self._fun_time_to())
 if self.order_by:
 order_by_aggregate_func = self._aggregate_fun_case(self.order_by)
 query.append_order_by(desc(order_by_aggregate_func))
 else:
 if self._is_multi_uids:
 query.append_order_by(asc(self.table.c.id))
 else:
 query.append_order_by(self._fun_time_from())
 
 # add filter for having grouped data
 if by_usage_list:  # add filtering by user id
 filter_fun_list = []
 for item in by_usage_list:
 if item in FIELD_AVERAGE:
 index = FIELD_AVERAGE.index(item)
 filter_fun = self.by_usage_percentage <= self._aggregate_fun_case(FIELD_USAGE[index])
 else:
 index = FIELD_MAX.index(item)
 filter_fun = self.by_usage_percentage * self._aggregate_fun_case(
 FIELD_LIMIT[index]
 ) <= self._aggregate_fun_case(item)
 filter_fun_list.append(filter_fun)
 query.append_having(or_(*filter_fun_list))
 
 if by_fault_list:
 by_fault_filter = [self.threshold <= self._aggregate_fun_case(funk_key_) for funk_key_ in by_fault_list]
 query.append_having(or_(*by_fault_filter))
 if self.limit != 0 and self.limit is not None:
 query = query.limit(self.limit)
 
 return query
 
 def _columns_query(self):
 """
 Generate output columns for SELECT <_columns_query(self)> FROM ...
 :return:
 """
 columns_agregate_func = []
 for column_key in self.show_columns:
 column_fun = self._aggregate_fun_case(column_key)
 if isinstance(column_fun, list):
 columns_agregate_func.extend(column_fun)
 else:
 if column_key not in ('From', 'To'):  # digest not support label
 column_fun = label(column_key, column_fun)
 columns_agregate_func.append(column_fun)  # add label
 return columns_agregate_func
 
 def _check_need_time_count(self):
 columns = {
 'aCPU',
 'uCPU',
 'aEP',
 'uEP',
 'aVMem',
 'uVMem',
 'aPMem',
 'uPMem',
 'aNproc',
 'uNproc',
 'aIO',
 'uIO',
 'aIOPS',
 'uIOPS',
 }
 return bool(columns & (set(self.show_columns) | {self.order_by} | set(self.by_usage or set())))
 
 def _check_need_join(self):
 return self._check_need_time_count() and not self._is_multi_uids
 
 def select_query(self, columns_=None, need_join=False):
 """
 :type need_join: bool
 """
 if columns_ is None:
 columns_ = self._columns_query()
 if need_join:
 where_query = and_(
 self._where_time_period(table=self._table_alive), self._where_uid(uid=0, table=self._table_alive)
 )
 else:
 where_query = and_(self._where_time_period(), self._where_uid())
 if self.server_id:  # add filtering by server id
 where_query = and_(where_query, self._where_server_id())
 query = select(columns_).where(where_query)
 if need_join:
 _table_joined = self._table_alive.outerjoin(
 self.table, and_(self._table_alive.c.created == self.table.c.created, self._where_uid(uid=self.uid))
 )
 query = query.select_from(_table_joined)
 return query
 
 def main_query(self):
 columns_ = self._columns_query()
 query = self.select_query(columns_=columns_, need_join=self._check_need_join())
 query = self._group_by_query(query)
 return query
 
 def _min_max_created(self):
 """
 SELECT
 MIN(created) AS MinCreated,
 MAX(created) AS MaxCreated
 FROM
 lve_stats2_history
 WHERE
 id = <ID> AND
 created BETWEEN 'xxxx' AND 'yyyy' AND
 server_id = 'localhost';
 """
 where_query = and_(self._where_time_period(), self._where_uid(), self._where_server_id())
 query = select([func.min(self.table.c.created), func.max(self.table.c.created)]).where(where_query)
 time_start = time.time()
 q = str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ')
 self.log.debug(q)
 data = self.dbengine.execute(query)
 self.log.debug('query time: %s', time.time() - time_start)
 return data.fetchall()[0]
 
 def proceed_dyn_time_unit(self):
 min_created, max_created = self._min_max_created()
 if max_created is None:  # no data
 return self.result_corrector
 # we need manipulate with datetime data in local timezone
 period_from = gm_to_local(unixtimestamp_to_gm_datetime(min_created))
 period_to = gm_to_local(unixtimestamp_to_gm_datetime(max_created))
 time_unit_groups = dyn_time_unit_groups(period_from, period_to)
 
 rows = []
 for _from, _to, _time_unit in reversed(time_unit_groups):
 # create instance copy for modify some attributes
 self_copy = copy.copy(self)
 self_copy.period_from = gm_datetime_to_unixtimestamp(local_to_gm(_from))
 self_copy.period_to = gm_datetime_to_unixtimestamp(local_to_gm(_to)) - 1
 self_copy.time_unit = _time_unit
 self_copy.limit = 0
 rows.extend(self_copy.proceed())
 return rows
 
 def proceed(self):
 # check and return some data without run sql query
 if self.uid == tuple() or self.uid == []:
 return []
 if self.uid is not None and not isinstance(self.uid, (list, tuple)) and self.uid <= 0:
 return []
 
 query = self.main_query()
 time_start = time.time()
 q = str(query.compile(compile_kwargs={"literal_binds": True}))
 self.log.debug(q.replace('\n', ' '))
 conn = self.dbengine.connect()
 try:
 cursor = conn.execute(query)
 self.log.debug('query time: %s', time.time() - time_start)
 self.result_corrector.rows = cursor.fetchall()
 except LveStatsPluginTerminated as e:
 conn.close()
 raise LveStatsPluginTerminated() from e
 else:
 conn.close()
 return self.result_corrector
 
 def proceed_dict(self):
 return [dict(zip(self.show_columns, items_val)) for items_val in self.proceed()]
 
 
 class _HistoryShowX1(HistoryShow):
 def __init__(self, *args, **kwargs):
 HistoryShow.__init__(self, *args, **kwargs)
 if 'ID' not in self.show_columns:
 self.show_columns = ['ID'] + self.show_columns
 self._labels = []  # variable for control duplicated labels
 
 def _aggregate_fun_case(self, item):
 """
 :type item: str
 """
 if item == 'anyF':
 fun_ = [
 self.table.c.mem_fault,
 self.table.c.memphy_fault,
 self.table.c.mep_fault,
 self.table.c.nproc_fault,
 self.table.c.cpu_fault,
 self.table.c.io_fault,
 self.table.c.iops_fault,
 ]
 elif item == 'ID':
 fun_ = label('id', self.table.c.id)
 elif item == 'From':
 fun_ = self.period_from
 elif item == 'To':
 fun_ = self.period_to
 else:
 column_name = FIELD_TO_TABLE_COLUMN[item]
 table_column = getattr(self.table.c, column_name)
 if item in (FIELD_AVERAGE + FIELD_LIMIT + FIELD_FAULT):
 fun_ = label(column_name, table_column)
 elif item in FIELD_USAGE:
 column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_USAGE.index(item)]]
 table_column_limit = label(column_name_limit, getattr(self.table.c, column_name_limit))
 fun_ = [table_column, table_column_limit]
 elif item in FIELD_MAX:
 column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]]
 column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]]
 table_column_fault = label(column_name_fault, getattr(self.table.c, column_name_fault))
 table_column_limit = label(column_name_limit, getattr(self.table.c, column_name_limit))
 table_column_max = label(column_name + '_max', table_column)
 fun_ = [table_column, table_column_limit, table_column_fault, table_column_max]
 return fun_
 
 def _columns_query(self):
 columns_agregate_func = []
 
 show_columns = self.show_columns + (self.by_fault or [])
 if self.by_usage:
 for item in convert_to_list(self.by_usage):
 if item in FIELD_AVERAGE:
 index = FIELD_AVERAGE.index(item)
 show_columns.append(FIELD_USAGE[index])
 else:
 index = FIELD_MAX.index(item)
 show_columns.extend([FIELD_FAULT[index], item])
 
 if self.order_by:
 show_columns.append(self.order_by)
 
 for column_key in show_columns:
 column_fun = self._aggregate_fun_case(column_key)
 if isinstance(column_fun, list):
 for fun_ in column_fun:
 if hasattr(fun_, 'name') and fun_.name not in self._labels:  # prevent alias duplication
 columns_agregate_func.append(fun_)
 self._labels.append(fun_.name)
 else:
 if hasattr(column_fun, 'name') and column_fun.name not in self._labels:  # prevent alias duplication
 columns_agregate_func.append(column_fun)
 self._labels.append(column_fun.name)
 return columns_agregate_func
 
 
 class _HistoryShowX60(_HistoryShowX1):
 AGGREGATE_PERIOD = 60 * 60
 
 def __init__(self, *args, **kwargs):
 _HistoryShowX1.__init__(self, *args, table=history_x60.__table__, **kwargs)
 
 # correct and rewrite time count and period
 self.period_from, self.period_to = self.get_history_x60_from_to()
 self.time_count = kwargs.get('time_count') or self.get_time_count()
 
 def get_time_count(self):
 if (self.period_from, self.period_to) == (None, None):
 return 0
 return _HistoryShowX1.get_time_count(self)
 
 def get_history_x60_from_to(self):
 """
 calculate present in aggregate table from and to time
 """
 if self.period_to - self.period_from <= self.AGGREGATE_PERIOD:
 return None, None
 between_query = self.table.c.created.between(self.period_from + self.AGGREGATE_PERIOD, self.period_to)
 query = select([func.min(self.table.c.created), func.max(self.table.c.created)]).where(
 and_(between_query, self._where_server_id())
 )
 time_start = time.time()
 self.log.debug(str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' '))
 result = self.dbengine.execute(query).fetchall()[0]
 self.log.debug('query time: %s', time.time() - time_start)
 create_min, create_max = result
 if create_max is not None:
 return create_min - self.AGGREGATE_PERIOD + 1, create_max  # "+1" for exclude from timestamp
 else:
 return result
 
 # rewrite average function generating
 def _aggregate_fun_case(self, item):
 """
 :type item: str
 """
 if item in FIELD_AVERAGE:
 column_name = FIELD_TO_TABLE_COLUMN[item]
 table_column = getattr(self.table.c, column_name)
 return label(column_name, table_column * self.table.c.time)
 else:
 return _HistoryShowX1._aggregate_fun_case(self, item)
 
 
 class HistoryShowUnion(HistoryShow):
 """
 Class for retrieve statistics data using two tables
 """
 
 def __init__(self, *args, **kwargs):
 HistoryShow.__init__(self, *args, **kwargs)
 self._alias = LVE_STATS_2_TABLENAME_PREFIX + 'union'
 kwargs.update({"time_count": self.time_count})
 self.x60 = _HistoryShowX60(*args, **kwargs)
 self.x1 = _HistoryShowX1(*args, **kwargs)
 self._need_union = self.x60.period_to is not None and self._is_multi_uids  # detect need union tables
 if self._need_union:
 self.table = self._select_union_query()
 
 # rewrite '_aggregate_fun_case' for correct calculate maximum
 def _aggregate_fun_case(self, item):
 """
 :type item: str
 """
 if self._need_union and item in FIELD_MAX:
 column_name = FIELD_TO_TABLE_COLUMN[item]
 column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]]
 column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]]
 column_limit = getattr(self.table.c, column_name_limit)
 column_fault = getattr(self.table.c, column_name_fault)
 column_max = getattr(self.table.c, column_name + '_max')
 fun_ = self._fun_max(column_max, column_limit, column_fault)
 return fun_
 else:
 return HistoryShow._aggregate_fun_case(self, item)
 
 def _select_union_query(self):
 """
 union two tables
 """
 with warnings.catch_warnings():
 warnings.simplefilter("ignore", category=sa_exc.SAWarning)
 union_query = self.x1.select_query().where(
 not_(self.x1.table.c.created.between(self.x60.period_from, self.x60.period_to))
 )
 union_query = union_query.union_all(self.x60.select_query())
 union_query = alias(union_query, self._alias)
 return union_query
 
 def select_query(self, columns_=None, need_join=None):
 if self._need_union:
 return select(columns_)
 else:
 return HistoryShow.select_query(self, columns_=columns_, need_join=need_join)
 
 
 def get_supported_columns(lve_version=None, mode=None):
 """
 preparation list columns depending of the lve version
 :type mode: Union[None, str]
 :type lve_version: Union[None, int]
 """
 columns = []
 if mode == 'v1':
 columns = [
 'aCPU',
 'mCPU',
 'lCPU',
 'aEP',
 'mEP',
 'lEP',
 'aVMem',
 'mVMem',
 'lVMem',
 'VMemF',
 'EPf',
 'aPMem',
 'mPMem',
 'lPMem',
 'aNproc',
 'mNproc',
 'lNproc',
 'PMemF',
 'NprocF',
 'aIO',
 'mIO',
 'lIO',
 ]
 if lve_version is None or lve_version > 6:
 columns.extend(['aIOPS', 'mIOPS', 'lIOPS'])
 elif mode == 'v2':
 columns = [
 'aCPU',
 'lCPU',
 'CPUf',
 'aEP',
 'lEP',
 'EPf',
 'aVMem',
 'lVMem',
 'VMemF',
 'aPMem',
 'lPMem',
 'PMemF',
 'aNproc',
 'lNproc',
 'NprocF',
 'aIO',
 'lIO',
 'IOf',
 ]
 if lve_version is None or lve_version > 6:
 columns.extend(['aIOPS', 'lIOPS', 'IOPSf'])
 elif mode is None:  # show all columns, v1 and v2
 columns = [
 'aCPU',
 'uCPU',
 'mCPU',
 'lCPU',
 'CPUf',
 'aEP',
 'uEP',
 'mEP',
 'lEP',
 'EPf',
 'aVMem',
 'uVMem',
 'mVMem',
 'lVMem',
 'VMemF',
 'aPMem',
 'uPMem',
 'mPMem',
 'lPMem',
 'PMemF',
 'aNproc',
 'uNproc',
 'mNproc',
 'lNproc',
 'NprocF',
 'aIO',
 'uIO',
 'mIO',
 'lIO',
 'IOf',
 ]
 if lve_version is None or lve_version > 6:
 columns.extend(['aIOPS', 'mIOPS', 'uIOPS', 'lIOPS', 'IOPSf'])
 return columns
 
 |