| Viewing file:  ustate.py (8.62 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 import logging
 from typing import Dict, List, Optional  # NOQA
 
 from clcommon import mysql_lib
 from clcommon.cpapi import cpusers, db_access, dblogin_cplogin_pairs
 from clcommon.utils import run_command
 
 log = logging.getLogger('ustate')
 
 
 class MySQLOperationalError(Exception):
 pass
 
 
 def _parse_lveps_output(lveps_output):
 """
 parse /usr/sbin/lveps -c 1 -p -d -n -x result
 example returned data:
 {504:
 {'CPU': '26%', 'IO': '0', 'MEM': '1', 'EP': '0', 'IOPS': 'N/A', 'PNO': '3', 'TNO': '3', 'TID':
 {4400:
 {'CPU': '26%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'md5sum', 'IOPS': 'N/A'},
 4381:
 {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'su', 'IOPS': 'N/A'},
 4382:
 {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'bash', 'IOPS': 'N/A'}}},
 500:
 {'CPU': '13%', 'IO': '0', 'MEM': '1', 'EP': '0', 'IOPS': 'N/A', 'PNO': '3', 'TNO': '3', 'TID':
 {4266:
 {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'su', 'IOPS': 'N/A'},
 4299:
 {'CPU': '13%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'cat', 'IOPS': 'N/A'},
 4267:
 {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'bash', 'IOPS': 'N/A'}}}}
 
 example of data manipulation:
 getting a list of user id
 >>> lveps_data = _parse_lveps_output()
 >>> user_id_list = lveps_data.keys()
 a list of processes tid particular user id 504
 >>> user_tid_list = lveps_data[504]['TID'].keys()
 getting CPU load user
 >>> user_cpu = lveps_data[504]['CPU']
 getting CPU load specific process
 >>> lveps_data[504]['TID'][4400]
 """
 lveps_lines = lveps_output.split('\n')
 header_line = lveps_lines.pop(0)
 
 columns_name = header_line.split()
 # replace columns name to standart
 replace_col = {'SPEED': 'CPU', 'COM': 'CMD'}
 columns_name = [replace_col.get(col_name, col_name) for col_name in columns_name]
 
 lveps_data = {}
 user_id = 0
 for lveps_line_index, lveps_line in enumerate(lveps_lines):
 if not lveps_line:
 continue
 lveps_line_splited = lveps_line.split(None, len(columns_name) - 1)
 # if the first one is number - use it as lve_id
 has_id = (len(lveps_line_splited) == len(columns_name)) and lveps_line_splited[0].isdigit()
 if not has_id:
 lveps_line_splited = lveps_line.split(None, len(columns_name) - 2)
 lveps_line_splited.insert(0, '')
 if len(lveps_line_splited) != len(columns_name):
 log.error(
 "lveps output was incorrect: %s",
 lveps_line,
 extra={"data": {"lveps_lines": lveps_lines[max(0, lveps_line_index - 5):lveps_line_index + 15]}},
 )
 break
 lveps_dict_line = dict(zip(columns_name, lveps_line_splited))
 if has_id:
 # FIXME: need to add a filter to a minimum uid
 user_id = int(lveps_dict_line.pop('ID'))
 try:
 del lveps_dict_line['CMD']
 except KeyError:
 pass
 lveps_data[user_id] = lveps_dict_line
 lveps_data[user_id]['TID'] = {}
 lveps_data[user_id].pop('PID', None)
 else:
 lveps_dict_line.pop('EP', None)
 lveps_dict_line.pop('PNO', None)
 lveps_dict_line.pop('TNO', None)
 lveps_dict_line.pop('ID', None)
 
 try:
 lveps_data[user_id]['TID'][int(lveps_dict_line.pop('TID'))] = lveps_dict_line
 except (ValueError, KeyError) as e:
 log.error("Can't parse lveps output: %s", str(e))
 return lveps_data
 
 
 def get_lveps():
 lveps_output = _get_lveps_output()
 return _parse_lveps_output(lveps_output)
 
 
 def _get_lveps_output():
 lveps_output: str | bytes = run_command(
 [
 '/usr/sbin/lveps',
 '-c',
 '1',
 '-p',
 '-d',
 '-n',
 '-x',  # tells lveps to put at least one space between columns for correct parsing
 '-o',
 'id:10,ep:10,pno:10,pid:15,tno:5,tid:15,cpu:7,mem:15,com:256',
 ],
 convert_to_str=False,
 )
 # Ignore any non-utf8 symbols from the output.
 # We only show the output to the user, so dropping some symbols is acceptable.
 
 # Convert bytes to string if needed.
 if isinstance(lveps_output, bytes):
 lveps_output = lveps_output.decode('utf-8', 'replace')
 else:
 lveps_output = lveps_output.encode('utf-8', 'replace').decode('utf-8')
 
 return lveps_output
 
 
 class SQLSnapshot(object):
 def __init__(self):
 self._mysql_conn = None
 self._dblogin_cplogin_map = {}
 self._db_users = set()  # attribute to detect new database users
 
 def __enter__(self):
 self.connect()
 return self
 
 def __exit__(self, exc_type, exc_val, exc_tb):
 self.close()
 
 def connect(self):
 """
 Obtain access data and connect to mysql database
 """
 access = db_access()
 mysql_login = access['login']
 mysql_pass = access['pass']
 mysql_host = access.get('host', 'localhost')
 connector = mysql_lib.MySQLConnector(
 host=mysql_host,
 user=mysql_login,
 passwd=mysql_pass,
 use_unicode=True,
 charset="utf8mb4",
 )
 try:
 self._mysql_conn = connector.connect()
 except mysql_lib.MySQLError as e:
 raise MySQLOperationalError(str(e)) from e
 
 def _refresh_map(self):
 """
 Refresh <database user>:<system user> map
 """
 self._dblogin_cplogin_map = dict(dblogin_cplogin_pairs())
 
 def close(self):
 """
 Close Mysql connection
 """
 self._mysql_conn.close()
 
 def _raw_processlist(self):
 result = tuple()
 try:
 cursor = self._mysql_conn.cursor()
 cursor.execute('SHOW FULL PROCESSLIST')
 result = cursor.fetchall()
 except (UnicodeDecodeError, mysql_lib.MySQLError) as e:
 log.warning('Error occurred during executing the `SHOW FULL PROCESSLIST` command', exc_info=e)
 return result
 
 def _get_sql_process_list(self):
 """
 Group processlist by database user name
 :rtype: dict
 """
 process_snapshot = {}
 for sql_tuple in self._raw_processlist():
 db_username = sql_tuple[1]
 sql_cmd = sql_tuple[4]  # (CMD) type/state of the request, in this case we are only interested 'Query'
 sql_time = sql_tuple[5]  # (Time) Time
 sql_query = sql_tuple[7] or ''  # (SQL-query) sql query, None if no sql query
 if sql_cmd == 'Sleep':  # filter 'Sleep' state
 continue
 snapshot_line = [sql_cmd, sql_time, sql_query]
 # group by database user name
 grouped_by_user = process_snapshot.get(db_username, [])
 grouped_by_user.append(snapshot_line)
 process_snapshot[db_username] = grouped_by_user
 return process_snapshot
 
 def get(self, cplogin_lst=None):
 # type: (Optional[List[str]]) -> Dict[str, List]
 """
 :param cplogin_lst: a list of users to retrieve data;
 None if the data is returned for all users registered in the control panel
 :return: sql queries for each user
 """
 process_snapshot = self._get_sql_process_list()
 
 # refresh map if new database users detect
 new_db_users = set(process_snapshot.keys()) - self._db_users
 if new_db_users:
 self._refresh_map()
 log.debug(
 'New database user(s) %s detected; database users map refreshed',
 str(list(new_db_users))[1:-1],
 )
 # refresh self._db_users to detect new users
 for new_db_user in new_db_users:
 self._db_users.add(new_db_user)
 
 # group and filter by control panel users
 sql_snapshot = {}
 cplogin_lst_ = cplogin_lst or cpusers() or []
 for db_username, sql_snap in list(process_snapshot.items()):
 cp_username = self._dblogin_cplogin_map.get(db_username)
 if cp_username is not None and cp_username in cplogin_lst_:
 # group sql snapshots by control panel user
 sql_snap_ = sql_snapshot.get(cp_username, [])
 sql_snap_.extend(sql_snap)
 sql_snapshot[cp_username] = sql_snap_
 return sql_snapshot
 
 |