| Viewing file:  php_manager.py (12.45 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding: utf-8
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 from __future__ import print_function
 from __future__ import division
 from __future__ import absolute_import
 import json
 import os
 from pwd import getpwuid
 from future.utils import iteritems
 
 import cldetectlib as detect
 from clcommon.clconfig import get_param, replace_param
 from clcommon.ui_config import UIConfig
 from clselect.clselectctlphp import format_summary, parse_extensions, API_1
 
 from clselect import ClSelect
 
 from clselect import ClUserSelect, ClExtSelect, ClUserExtSelect, ClUserOptSelect
 from clselect.baseclselect import BaseSelectorError
 from clselect.clselectexcept import ClSelectExcept as ClSelectExceptions
 from clcommon.cpapi import get_main_username_by_uid
 
 
 class PhpManager(object):
 """Responsible for actual PhpSelector selector high-level API"""
 
 interpreter = 'php'
 # DirectAdmin parameters
 DA_PATH = "/usr/local/directadmin/plugins/phpselector/plugin.conf"
 DA_PARAM = "active"
 
 def __init__(self, cfg=None, pkg=None):
 euid = os.geteuid()
 self.is_root_user = euid == 0
 self.user_name = get_main_username_by_uid(euid)
 if self.is_root_user:
 self.cl_select_lib = ClSelect(self.interpreter)
 else:
 self.cl_select_lib = ClUserSelect(self.interpreter)
 
 @property
 def selector_enabled(self):
 """
 Get current status for panel
 :return: bool
 """
 if detect.is_da():
 # DirectAdmin
 return get_param(self.DA_PATH, self.DA_PARAM) == "yes"
 else:
 return not UIConfig().get_param('hidePhpApp', 'uiSettings')
 
 @selector_enabled.setter
 def selector_enabled(self, status):
 """
 Enable/disable selector for each panel
 :param status: bool
 :return:
 """
 if detect.is_da():
 try:
 replace_param(self.DA_PATH, self.DA_PARAM, "yes" if status else "no")
 except (OSError, IOError) as e:
 raise BaseSelectorError("Can not process config file %s with reason: %s" % (
 self.DA_PATH, str(e)))
 UIConfig().set_config({'uiSettings': {'hidePhpApp': not status}})
 else:
 UIConfig().set_config({'uiSettings': {'hidePhpApp': not status}})
 
 
 @property
 def extensions_is_hidden(self):
 """
 Extensions was hidden by admin
 :return: bool
 """
 return UIConfig().get_param('hidePHPextensions', 'uiSettings') is True
 
 @property
 def domains_tab_is_hidden(self):
 """
 Domains tab was hidden by admin
 :return: bool
 """
 return UIConfig().get_param('hideDomainsTab', 'uiSettings') is True
 
 def switch_current_version(self, version):
 """
 Set current version for user
 :param version: str
 :return: None (succes)
 """
 if self.is_root_user:
 raise BaseSelectorError("Not supported as root user")
 # cloudlinux-selector utility automatically drops privileges
 # so there is no way to know here that we are executed from root
 # keep it this way until someone asks to change it
 elif message := self._get_version_selection_disabled_msg():
 raise ClSelectExceptions.VersionModificationBlocked(message)
 
 c = ClUserSelect(self.interpreter)
 c.set_version(self.user_name, str(version))
 c.clean_crui_images([self.user_name])
 
 def reset_extensions(self, version):
 """
 Reset extension for selected version
 :param version:  str
 :return: List of enabled extensions after reset
 """
 if self.is_root_user:
 raise BaseSelectorError("Not supported as root user")
 ClSelect.check_multiphp_system_default_version()
 c = ClUserExtSelect(self.interpreter)
 extensions = c.reset_extensions(self.user_name, str(version))
 c.clean_crui_images([self.user_name])
 return {"extensions": extensions}
 
 def set_extensions(self, version, extensions):
 """
 Set extensions for php version for admin and user
 :param version: str
 :param extensions: list
 :return: response for user. Error or warning list or no error
 """
 to_enable = []
 for k, v in iteritems(extensions):
 if v == "enabled":
 to_enable.append(k)
 elif v != "disabled":
 return {"status": "ERROR: %s is not a valid state of an extension" % v}
 if self.is_root_user:
 ClExtSelect(self.interpreter).replace_extensions(str(version), to_enable)
 return {}
 else:
 if self.extensions_is_hidden:
 raise BaseSelectorError("Extensions was disabled by admin")
 c = ClUserExtSelect(self.interpreter)
 extensions, resolved_dependencies, conflict_dependencies = c\
 .bulk_replace_extensions(self.user_name, str(version), to_enable)
 c.clean_crui_images([self.user_name])
 if resolved_dependencies:
 resolved_dependencies = ['{} enabled as dependency ({})'.format(*ext) for ext in resolved_dependencies]
 # ToDo: add ability to show several warnings and split messages
 warnings = (', '.join(resolved_dependencies)) if resolved_dependencies else ''
 if conflict_dependencies:
 if warnings:
 warnings = warnings + '. '
 warnings = warnings + '{} skipped as conflicting.'.format(', '.join(conflict_dependencies))
 if warnings:
 return {'warning': warnings}
 return {}
 
 def get_summary(self):
 """
 Return all information of php selector
 All extensions(user, admin) and options(for user), selected version (for user)
 
 :return: dict
 """
 summary_data = self._fetch_summary_data()
 summary_json = self._parse_summary_data_to_json(summary_data)
 
 summary_json.update(self._get_common_summary_data())
 if not self.is_root_user:
 summary_json.update(self._get_user_specific_summary_data(summary_data))
 
 if 'available_versions' not in summary_json:
 if 'message' in summary_json:
 raise BaseSelectorError(summary_json['message'])
 return {'status': 'ERROR', 'data': summary_json}
 
 self._process_versions(summary_json['available_versions'])
 return summary_json
 
 def _fetch_summary_data(self):
 """
 Fetch summary data for user or root
 
 :return: tuple containing the summary data
 """
 if self.is_root_user:
 return self.cl_select_lib.get_summary(False)
 try:
 return self.cl_select_lib.get_summary(self.user_name, True)
 except ClSelectExceptions.NotCageFSUser as e:
 raise BaseSelectorError({
 'message': 'User %(user)s not in CageFs',
 'context': {'user': self.user_name}
 }) from e
 
 def _parse_summary_data_to_json(self, summary_data):
 """
 Parse summary data to JSON format
 
 :param summary_data: tuple
 :return: dict
 """
 json_data = format_summary(summary_data, format='json', api_version=API_1)
 return json.loads(json_data)
 
 def _get_common_summary_data(self):
 """
 Get summary data applicable for both root and non-root users
 
 :return: dict containing common summary data
 """
 return {
 'selector_enabled': self.selector_enabled,
 'extensions_is_hidden': self.extensions_is_hidden,
 'domains_tab_is_hidden': self.domains_tab_is_hidden,
 }
 
 def _get_user_specific_summary_data(self, summary_data):
 """
 Get summary data specific to non-root users
 
 :param summary_data: tuple containing the summary data
 :return: dict with user-specific summary data
 """
 selected_version = self._find_selected_version(summary_data)
 version_selection_disabled_msg = self._get_version_selection_disabled_msg()
 
 data = {}
 if selected_version:
 data['selected_version'] = selected_version
 if version_selection_disabled_msg:
 data['version_selection_disabled_msg'] = version_selection_disabled_msg
 return data
 
 def _find_selected_version(self, summary_data):
 """
 Determine the PHP version selected by the user based on the summary data.
 
 :param summary_data: tuple
 A tuple containing version information
 :return: str or None
 The selected PHP version as a string if found, otherwise None
 """
 for version_info in summary_data:
 version, (_, _, is_selected) = version_info
 if is_selected:
 return version
 return None
 
 def _get_version_selection_disabled_msg(self):
 """
 Retrieve the message displayed when PHP version selection is disabled.
 This method is only applicable for non-root users
 
 :return: str - The message indicating why PHP version selection is disabled for the user
 """
 return self.cl_select_lib.get_version_selection_disabled_msg(self.user_name)
 
 def _process_versions(self, versions_list):
 """
 Process versions list and add extensions and options to it
 :param versions_list: list of versions
 """
 for item in versions_list:
 # get extensions for users only and exclude native version
 if not self.is_root_user and item['version'].startswith('native'):
 continue
 try:
 extensions_json = self._get_extensions(item['version'])
 item.update(extensions_json)
 except ClSelectExceptions.UnableToGetExtensions as e:
 raise BaseSelectorError({
 'message': str(e),
 'details': e.details,
 }) from e
 
 # get options for every version for users exclude native
 if not self.is_root_user and not item['version'].startswith('native'):
 try:
 options_json = ClUserOptSelect(self.interpreter).get_options(
 self.user_name,
 item['version'],
 )
 item['options'] = options_json
 except ClSelectExceptions.UnableToGetExtensions as e:
 raise BaseSelectorError({
 'message': str(e),
 'details': e.details,
 }) from e
 
 def set_options(self, version, options):
 """
 Save php options
 :param version: version which save options for
 :type version: str
 :param options: dict of options for save
 :type options: dict
 :return: None (success)
 """
 if self.is_root_user:
 raise BaseSelectorError("Not supported as root user")
 ClSelect.check_multiphp_system_default_version()
 
 c = ClUserOptSelect(self.interpreter)
 c.bulk_insert_options(
 self.user_name,
 str(version),
 options)
 c.clean_crui_images([self.user_name])
 
 def switch_default_version(self, version):
 ClSelect(self.interpreter).set_version(str(version))
 
 def get_current_version(self, user_names = None):
 """
 Return current versions for list of users or all users
 :param user_names: array of users
 :return: information with users and versions
 """
 user_map = ClUserSelect(self.interpreter).get_version_user_map(user_names)
 result = {}
 for version, users in user_map.items():
 result.update({user: {'version': version} for user in users})
 return {'users': result}
 
 def _get_extensions(self, version):
 """
 return list of exnensions for selected version
 (for user and admin)
 :param version: version get extensions for
 :type version: str
 :return:
 """
 if self.is_root_user:
 # Recreate ClExtSelect for noncache
 ext_list = ClExtSelect(self.interpreter).list_extensions(version)
 else:
 ext_list = ClUserExtSelect(self.interpreter).list_all_extensions(self.user_name, version)
 return parse_extensions(ext_list, version, fmt='json')
 
 |