Viewing file: api.py (6.98 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""Gather information from Plesk via DB querries."""
import logging from collections import defaultdict from typing import Dict, List, Sequence
from defence360agent.subsys.panels.base import DomainData, PanelException from defence360agent.utils import ( CheckRunError, async_lru_cache, check_run, retry_on, )
logger = logging.getLogger(__name__)
async def raise_panel_exception(*args, **kwargs): raise PanelException(*args, **kwargs)
@retry_on(CheckRunError, max_tries=3, on_error=raise_panel_exception) async def _run_query(query: str) -> str: return (await check_run(["plesk", "db", "-N", "-e", query])).decode()
async def get_user_to_domain() -> Dict[str, List[str]]: """Return mapping: user -> user's domains."""
result = ( await _run_query( "select login, name from domains " " left join hosting on dom_id = domains.id " " right join sys_users on hosting.sys_user_id = sys_users.id" ) ).split() result_mapping = defaultdict(list) for user, domain in zip(result[0::2], result[1::2]): if domain != "NULL": result_mapping[user].append(domain) return result_mapping
async def get_domain_to_user() -> Dict[str, List[str]]: """Return mapping: domain -> user."""
result = ( await _run_query( "select name, login " "from domains " " left join hosting on dom_id = domains.id " " left join sys_users on hosting.sys_user_id = sys_users.id" ) ).split() return {domain: [user] for domain, user in zip(result[0::2], result[1::2])}
async def get_user_details() -> Dict[str, Dict[str, str]]: """ Returns dict with user to email and locale pairs
Not used, because MyImunify implemented for cPanel only yet """ user_details = {}
results = await _run_query( "SELECT CONCAT(login, ';',email, ';',locale) FROM clients;" ) for record in results.split("\n"): if not record: continue user, email, locale = record.split(";") user_details[user] = { "email": email, "locale": locale.replace("-", "_"), }
return user_details
async def get_domains() -> List[str]: """Return: list of domains"""
return (await _run_query("select name from domains")).split()
async def get_users() -> List[str]: """Return: users that created by Plesk"""
return ( await _run_query( "SELECT sys_users.login FROM sys_users JOIN hosting ON" " hosting.sys_user_id=sys_users.id JOIN domains ON" " hosting.dom_id=domains.id AND domains.webspace_id=0" ) ).split()
async def get_users_for_patchman() -> list[list[str]]: """ Returns [ ['admin', 'john.smith@tardis.gal', 'NULL', 'en-US', 'admin', 'NULL', 'NULL', 'NULL', '0'], ['user0', 'NULL', 'admin', 'en-US', 'client', '1', 'user0.com', '/var/www/vhosts/user0.com/httpdocs', '1'] ] There is only 1 return type. NULL is converted to 'NULL' Each possible empty string should be covered with IF(clients.email='', NULL, clients.email) or it will break data structure """ raw_data = await _run_query( """ SELECT clients.login, IF(clients.email='', NULL, clients.email), parent.login, IF(clients.locale='', NULL, clients.locale), clients.type, domains.name, hosting.www_root, clients.status=16 suspended FROM clients LEFT JOIN clients parent ON parent.id=clients.parent_id LEFT JOIN domains ON domains.cl_id=clients.id LEFT JOIN hosting ON domains.id=hosting.dom_id; """ ) tuples = [string.split() for string in raw_data.split("\n") if string] return tuples
async def count_customers_with_subscriptions() -> int: # pragma: no cover """Return: count active customers with at least one (any) domain"""
return int( await _run_query( "select count(distinct cl_id) from clients c join domains d on " "d.cl_id = c.id where c.status = 0" ) )
async def get_admin_emails() -> list: emails = await _run_query("SELECT email FROM clients WHERE type='admin';") return [email for email in emails.split("\n") if email]
async def list_docroots() -> Sequence[str]: query = "SELECT DISTINCT hosting.www_root FROM hosting;" return (await _run_query(query)).split()
async def list_docroots_domains_users(): sql = ( "select hosting.www_root, domains.name, sys_users.login" " from hosting" " inner join domains on hosting.dom_id = domains.id" " inner join sys_users on hosting.sys_user_id=sys_users.id" ) data = await _run_query(sql) retval = [string.split() for string in data.split("\n") if string] return retval
@async_lru_cache(maxsize=1, ttl=60) async def get_user_domains_details() -> list[DomainData]: sql = """ SELECT 'domain' AS object_type, d.id AS object_id, d.name AS domain_name, NULL AS target_domain_name, c.id AS client_id, c.login AS client_login, CASE WHEN d.parentDomainId != 0 THEN 'subdomain' WHEN EXISTS ( SELECT 1 FROM `Subscriptions` s WHERE s.object_type = 'domain' AND s.object_id = d.id ) THEN 'primary' WHEN d.parentDomainId = 0 THEN 'addon' ELSE 'unknown' END AS domain_type, h.www_root AS docroot FROM domains d LEFT JOIN clients c ON d.cl_id = c.id LEFT JOIN hosting h ON d.id = h.dom_id
UNION ALL
SELECT 'subdomain' AS object_type, sd.id AS object_id, CONCAT(sd.name, '.', pd.name) AS domain_name, NULL AS target_domain_name, c.id AS client_id, c.login AS client_login, 'subdomain' AS domain_type, h.www_root AS docroot FROM subdomains sd JOIN domains pd ON sd.dom_id = pd.id LEFT JOIN hosting h ON pd.id = h.dom_id LEFT JOIN clients c ON pd.cl_id = c.id
UNION ALL
SELECT 'alias' AS object_type, da.id AS object_id, da.name AS domain_name, pd.name AS target_domain_name, c.id AS client_id, c.login AS client_login, 'alias' AS domain_type, h.www_root AS docroot FROM domain_aliases da JOIN domains pd ON da.dom_id = pd.id LEFT JOIN hosting h ON pd.id = h.dom_id LEFT JOIN clients c ON pd.cl_id = c.id
ORDER BY client_login, domain_type, domain_name; """ data = await _run_query(sql) raw_data = [string.split() for string in data.split("\n") if string] return [ DomainData(docroot=row[7], domain=row[2], type=row[0], username=row[5]) for row in raw_data ]
|