| Viewing file:  pluginlib.py (7.57 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 """
 Library to work with plug-ins
 """
 
 import os
 import py_compile
 import subprocess
 import sys
 import pwd
 import glob
 import shutil
 import syslog
 
 from importlib import import_module
 from configparser import ConfigParser
 
 from clcommon.clcustomscript import lvectl_custompanel_script, CLSYSCONFIG
 from .const import CACHE_CPNAME, UNKNOWN_CP_NAME, UNKNOWN_CP_IMPORT
 from .cpapiexceptions import PluginImportError
 
 clsysconfig = ConfigParser(interpolation=None, strict=False)
 clsysconfig.read(CLSYSCONFIG)
 
 CONTROLPANELNAME_VAR = '__cpname__'
 DETECTFUNCNAME_VAR = 'detect'
 
 PLUGINS_PKG_PREFIX = 'clcommon.cpapi'
 
 PLUGINS_DIR = None
 if clsysconfig.has_option(section='cpapi', option='plugindir'):
 PLUGINS_DIR = clsysconfig.get(section='cpapi', option='plugindir')
 if not os.path.exists(PLUGINS_DIR):
 print("WARNING: plugindir is configured, but doesn't exists!")
 API_LINK_PATH = os.path.join(os.path.dirname(__file__), 'apilink.py')
 
 CPANEL_NAME = 'cPanel'
 DIRECTADMIN_NAME = 'DirectAdmin'
 PLESK_NAME = 'Plesk'
 ISPMANAGER_NAME = 'ISPManager'
 INTERWORX_NAME = 'InterWorx'
 UNKNOWN_NAME = 'Unknown'
 INTEGRATED_NAME = 'Integrated'
 
 OFFICIAL_PLUGINS = {
 CPANEL_NAME: 'cpanel',
 DIRECTADMIN_NAME: 'directadmin',
 PLESK_NAME: 'plesk',
 ISPMANAGER_NAME: 'ispmanager',
 INTERWORX_NAME: 'interworx',
 UNKNOWN_NAME: 'backward_plugin',
 INTEGRATED_NAME: 'vendors'
 }
 
 
 def detect_panel_fast():
 """
 This function will try to detect our officially supported control panels
 :return: tuple of CP name and CP plugin name to import
 """
 panel_name, official_plugin_name = None, None
 if os.path.isfile('/opt/cpvendor/etc/integration.ini'):
 panel_name, official_plugin_name = INTEGRATED_NAME, OFFICIAL_PLUGINS[INTEGRATED_NAME]
 elif os.path.isfile('/usr/local/cpanel/cpanel'):
 panel_name, official_plugin_name = CPANEL_NAME, OFFICIAL_PLUGINS[CPANEL_NAME]
 elif os.path.isfile('/usr/local/directadmin/directadmin') or\
 os.path.isfile('/usr/local/directadmin/custombuild/build'):
 panel_name, official_plugin_name = DIRECTADMIN_NAME, OFFICIAL_PLUGINS[DIRECTADMIN_NAME]
 elif os.path.isfile('/usr/local/psa/version'):
 panel_name, official_plugin_name = PLESK_NAME, OFFICIAL_PLUGINS[PLESK_NAME]
 # ispmanager must have:
 # v5: /usr/local/mgr5/ directory,
 # v4: /usr/local/ispmgr/bin/ispmgr file
 elif os.path.isfile('/usr/local/ispmgr/bin/ispmgr') or os.path.isdir('/usr/local/mgr5'):
 panel_name, official_plugin_name = ISPMANAGER_NAME, OFFICIAL_PLUGINS[ISPMANAGER_NAME]
 elif os.path.isdir('/usr/local/interworx'):
 panel_name, official_plugin_name = INTERWORX_NAME, OFFICIAL_PLUGINS[INTERWORX_NAME]
 elif lvectl_custompanel_script() is not None:       # Check custom panel script to use backward plugin
 panel_name, official_plugin_name = UNKNOWN_NAME, OFFICIAL_PLUGINS[UNKNOWN_NAME]
 return panel_name, official_plugin_name
 
 
 def get_cp_plugin_module(plugin_name=None):
 if plugin_name:
 # Case when some "official" plugin was already detected
 return import_module(f'{PLUGINS_PKG_PREFIX}.plugins.{plugin_name}')
 # Use cached module if detect_panel_fast() can't detect anything
 # Actual import statement in file is updated on each rebuild_cache()
 from .apilink import api as _apiplugin  # pylint: disable=import-outside-toplevel
 return _apiplugin
 
 
 def detectplugin(plugin_pack, ignore_errors=False):
 """
 Scan directory for presence of plugins.  Plugin is a file that has
 the extension ".py" and a non-empty variable "__cpname__" (the name
 of the control panel).  The file must contain a function "detect"
 which returns True in case of presence of the corresponding
 control panel.
 
 >>> detectplugin('plugins')
 ('from .plugins import cpanel as api', 'cPanel')
 
 :param plugin_pack: package name or the name of the plug-ins directory
 ('cache' - cache plugins users; 'plugins' - officially supported plug-ins)
 :rtype: tuple
 :return: a pair of values: (line to import the package, the name of the control panel)
 """
 plugin_dir = os.path.join(os.path.dirname(__file__), plugin_pack)
 plugin_path_pattern = os.path.join(plugin_dir, '*.py')
 modules = [
 os.path.splitext(os.path.basename(py_full_path))[0]
 for py_full_path in glob.glob(plugin_path_pattern)
 ]
 
 for mod_name in modules:
 if mod_name == '__init__':
 continue
 absolute_module_name = f'{PLUGINS_PKG_PREFIX}.{plugin_pack}.{mod_name}'
 import_string = f'from .{plugin_pack} import {mod_name} as api'
 try:
 api = import_module(absolute_module_name)
 except ImportError as e:
 if ignore_errors:
 continue
 raise PluginImportError(f'Can not import {absolute_module_name} plugin') from e
 try:
 panel_class = api.PanelPlugin()
 # Try to detect panel
 panel_data = panel_class.get_cp_description()
 return import_string, panel_data['name']
 except (AttributeError, TypeError, KeyError) as exc:
 syslog.syslog(
 syslog.LOG_WARNING,
 f'cpapi: Plugin {mod_name} does not satisfy cpapi requirements:\n{exc}',
 )
 return None, None
 
 
 def getuser():
 """
 Get current user's username based on os.getuid()
 :rtype: str
 """
 uid = os.getuid()
 if uid == 0:
 return 'root'
 return pwd.getpwuid(uid)[0]
 
 
 def _reinit_module(init_path):
 """create (or rewrite) an empty __init__.py and it's .pyc/.pyo parts"""
 with open(init_path, 'w', encoding='utf-8'):
 pass
 py_compile.compile(init_path)
 # In Py2 ".pyo" files can be generated only like this:
 subprocess.run(
 f'{sys.executable} -O -m py_compile "{init_path}"',
 shell=True,
 executable='/bin/bash',
 check=False,
 )
 
 
 def rebuild_cache(plugins_dir=PLUGINS_DIR):
 
 CACHE_DIR = 'cache'
 PLUGINS_PATH = 'plugins'  # directory with official supported plugins
 
 cache_dir = os.path.join(os.path.dirname(__file__), CACHE_DIR)
 # delete all in cache dir
 shutil.rmtree(cache_dir)
 if plugins_dir and os.path.isdir(plugins_dir):
 # copy all from plugins dir to cache
 shutil.copytree(plugins_dir, cache_dir)
 else:
 os.mkdir(cache_dir)
 os.chmod(cache_dir, 0o755)
 
 init_path = os.path.join(cache_dir, '__init__.py')
 _reinit_module(init_path)
 
 try:
 import_string, cpname = detectplugin(CACHE_DIR)
 except PluginImportError as exc:
 import_string, cpname = None, None
 print(f'WARNING: {exc}')
 if cpname is None:
 shutil.rmtree(cache_dir)
 os.mkdir(cache_dir)
 _reinit_module(init_path)
 import_string, cpname = detectplugin(PLUGINS_PATH, ignore_errors=True)
 if cpname is None:
 import_string, cpname = (UNKNOWN_CP_IMPORT, UNKNOWN_CP_NAME)
 print(f'WARNING: can not detect control panel; the control panel is set to "{cpname}"')
 # write control panel name to cache file
 if cpname:
 # write control panel name to /var/cache/controlpanelname
 with open(CACHE_CPNAME, 'w', encoding='utf-8') as f:
 f.write(cpname)
 
 # Write plugin import string for fast loading.
 # Example:
 # from .plugins import nopanel as api
 with open(API_LINK_PATH, 'w', encoding='utf-8') as f:
 f.write(import_string + '\n')
 
 |