| Viewing file:  cluserextselect.py (15.19 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 from __future__ import absolute_import
 from __future__ import print_function
 from __future__ import division
 import os
 
 from clcommon import clcagefs
 
 from .clextselect import ClExtSelect
 from .cluserselect import ClUserSelect
 from .clselectexcept import ClSelectExcept
 from .clselectprint import clprint
 from .utils import apply_for_at_least_one_user
 
 
 class ClUserExtSelect(ClUserSelect, ClExtSelect):
 USER_INI = 'alt_php.ini'
 
 def __init__(self, item='php', exclude_pid_list=None):
 ClUserSelect.__init__(self, item, exclude_pid_list)
 self._alt_extensions = None
 self._user_extensions = []
 self._conflicts = []
 
 def list_enabled_extensions(self, user, version=None):
 """
 Returns enabled user extensions for a version as a tuple
 @param user: string
 @param version: string
 @return: tuple
 """
 self._check_user_in_cagefs(user)
 if version is None:
 version = self.get_version(user)[0]
 if version == 'native':
 raise ClSelectExcept.UnableToGetExtensions(version)
 return tuple(map((lambda i: (i, True)),
 sorted(self._get_enabled_extensions(user, version))))
 
 def list_all_extensions(self, user, version=None):
 """
 Returns as a tuple all user extensions for a version,
 marking enabled and disabled ones
 @param user: string
 @param version: string
 @return: tuple
 """
 self._check_user_in_cagefs(user)
 if version is None:
 version = self.get_version(user)[0]
 if version == 'native':
 raise ClSelectExcept.UnableToGetExtensions(version)
 user_extensions = set(self._get_enabled_extensions(user, version))
 builtin_extensions = set(self._get_builtins(version))
 php_d_all_extensions = set(self._get_all_extensions(version))
 all_extensions = builtin_extensions | php_d_all_extensions
 extensions = []
 for ext in sorted(all_extensions):
 enabled = False
 if ext in builtin_extensions:
 enabled = None
 elif ext in user_extensions:
 enabled = True
 extensions.append((ext, enabled))
 return tuple(extensions)
 
 def _get_enabled_extensions(self, user, version):
 """
 Returns list of enabled user extensions
 @param user: string
 @param version: string
 @return: list
 """
 if len(self._user_extensions) == 0:
 self._load_user_extensions(user, version)
 return self._user_extensions
 
 def _get_all_extensions(self, version):
 """
 Returns list of all extensions for a version
 except compiled-in ones
 @param user: string
 @param version: string
 @return: list
 """
 if self._alt_extensions is None:
 self._alt_extensions = self._load_extensions_list(version)
 return self._alt_extensions
 
 def _check_extensions(self, ext_list, version):
 """
 validation extensions name
 @param ext_list: list
 @param version: string
 """
 all_extensions = self._get_all_extensions(version)
 bad_extensions = set(ext_list).difference(set(all_extensions))
 if bad_extensions:
 raise ClSelectExcept.NoSuchExtension(ext_list=bad_extensions, all_extensions=all_extensions)
 
 def bulk_enable_extensions(self, user, version, ext_list, check_ext=False):
 return self.bulk_handle_extensions(user, self.enable_extensions, version, ext_list, check_ext=check_ext)
 
 def bulk_handle_extensions(self, user, func, *args, **kwargs):
 return apply_for_at_least_one_user(
 func,
 self._clpwd.get_names(self._clpwd.get_uid(user)),
 ClSelectExcept.UnableToSaveData,
 *args, **kwargs
 )
 
 def enable_extensions(self, user, version, ext_list, check_ext=False):
 """
 Adds extensions to user php.ini
 @param user: string
 @param version: string
 @param ext_list: list
 @param check_ext: bool
 @return: None
 """
 self._check_user_in_cagefs(user)
 if check_ext:
 self._check_extensions(ext_list=ext_list, version=version)
 user_ini_path = self._compose_user_ini_path(user, version)
 alt_path = self._compose_alt_path(version)
 
 contents, extensions, extensions_data = self._load_ini_contents(user_ini_path)
 
 resulting_extensions = list(extensions_data.keys()) + ext_list
 ext_list_without_conflicts = self._check_for_conflicts(resulting_extensions)
 _conflicts_info = ClExtSelect.get_conflicts_info(resulting_extensions,
 ext_list_without_conflicts)
 self._print_conflicts_info(_conflicts_info)
 
 extensions = [ext for ext in extensions + ext_list
 if ext in ext_list_without_conflicts]
 resolved_dependencies = []
 for extension in extensions:
 resolved = [ext for ext in self._include_dependencies([extension],
 alt_path,
 extensions_data)
 if ext not in resolved_dependencies]
 dependence_info = ClExtSelect.get_dependencies_list(extension,
 resolved,
 ext_list)
 self._print_dependencies_info(dependence_info)
 resolved_dependencies.extend(resolved)
 
 for ext in resolved_dependencies:
 contents.extend(
 self._smooth_data(extensions_data[ext]))
 
 contents = self._move_ioncube_ext(contents)
 self._write_to_file(
 user, '\n'.join(contents)+'\n', user_ini_path)
 self._backup_settings(user)
 self._reload_processes(user)
 
 def bulk_replace_extensions(self, user, version, ext_list):
 return self.bulk_handle_extensions(user, self.replace_extensions_with_dependenses, version, ext_list)
 
 def replace_extensions(self, user, version, ext_list):
 """
 (deprecated)
 Replaces extensions in user php.ini with supplied ones
 and print information about dependences and conflicts for old php selector
 @param user: string
 @param version: string
 @param ext_list: list
 @return: list
 """
 (extensions_list, dependencies_list, conflict_dependencies) = \
 self.bulk_replace_extensions(user, version, ext_list)
 self._print_dependencies_info(dependencies_list)
 self._print_conflicts_info(conflict_dependencies)
 
 return extensions_list
 
 def replace_extensions_with_dependenses(self, user, version, ext_list):
 """
 Replaces extensions in user php.ini with supplied ones
 @param user: string
 @param version: string
 @param ext_list: list
 @return: extensions_list, dependencies_list, conflict_dependencieslist: (list, list, list)
 """
 self._check_user_in_cagefs(user)
 resolved_dependencies = []
 conflict_dependencies = []
 dependencies_list = []
 user_ini_path = self._compose_user_ini_path(user, version)
 alt_path = self._compose_alt_path(version)
 
 contents, extensions, extensions_data = self._load_ini_contents(user_ini_path)
 extensions_data = {}    # we REPLACE extensions
 
 ext_list_without_conflicts = self._check_for_conflicts(ext_list)
 conflict_dependencies = ClExtSelect.get_conflicts_info(ext_list,
 ext_list_without_conflicts)
 
 extensions = [ext for ext in ext_list if ext in ext_list_without_conflicts]
 for extension in extensions:
 resolved = [ext for ext in self._include_dependencies([extension],
 alt_path,
 extensions_data)
 if ext not in resolved_dependencies]
 dependencies_list = dependencies_list + self.get_dependencies_list(extension,
 resolved,
 ext_list)
 resolved_dependencies.extend(resolved)
 
 for ext in resolved_dependencies:
 contents.extend(
 self._smooth_data(extensions_data[ext]))
 
 contents = self._move_ioncube_ext(contents)
 self._write_to_file(
 user, '\n'.join(contents)+'\n', user_ini_path)
 self._backup_settings(user)
 self._reload_processes(user)
 return list(extensions_data.keys()), dependencies_list, conflict_dependencies
 
 def bulk_disable_extensions(self, user, version, ext_list):
 return self.bulk_handle_extensions(user, self.disable_extensions, version, ext_list)
 
 def disable_extensions(self, user, version, ext_list):
 """
 Removes extensions from user php.ini
 @param user: string
 @param version: string
 @param ext_list: list
 @return: None
 """
 self._check_user_in_cagefs(user)
 user_ini_path = self._compose_user_ini_path(user, version)
 alt_path = self._compose_alt_path(version)
 (contents, extensions,
 extensions_data) = self._load_ini_contents(user_ini_path)
 for item in set(ext_list):
 if item not in extensions_data:
 continue
 rest_of_set = set(extensions_data.keys()).difference([item])
 if (self._is_dependency(item, rest_of_set, alt_path)
 and not rest_of_set.issubset(set(ext_list))):
 clprint.print_diag(
 'text',
 {'status': 'WARN',
 'message': '%s left as dependency'
 % (item,)})
 continue
 extensions_data.pop(item, None)
 for ext in extensions:
 if ext not in extensions_data:
 continue
 contents.extend(
 self._smooth_data(extensions_data[ext]))
 self._write_to_file(
 user, '\n'.join(contents)+'\n', user_ini_path)
 self._backup_settings(user)
 self._reload_processes(user)
 
 def reset_extensions(self, user, version):
 """
 Replaces extensions in user php.ini with default ones
 @param user: string
 @param version: string
 @return: list
 """
 if not version:
 raise ClSelectExcept.EmptyParam('Version')
 ext_list = ClExtSelect._get_enabled_extensions(self, version)
 # replace extensions already adapted for multiple users
 data = self.replace_extensions(user, version, ext_list)
 return data
 
 def _move_ioncube_ext(contents):
 """
 PHP ioncube extensions must be at the beginning of extensions list
 @param contents: list
 @return: list
 """
 found = False
 fixed_contents = []
 stripped_contents = []
 for item in contents:
 if item.startswith(';---ioncube'):
 found = True
 fixed_contents.append(item)
 continue
 if found:
 if item.startswith(';---'):
 found = False;
 else:
 fixed_contents.append(item)
 continue
 stripped_contents.append(item)
 fixed_contents.extend(stripped_contents)
 return fixed_contents
 _move_ioncube_ext = staticmethod(_move_ioncube_ext)
 
 @staticmethod
 def _load_ini_contents(path):
 """
 Parses user ini file contents
 @param path: string
 @return: tuple
 """
 contents = []
 extensions = []
 extensions_data = {}
 is_content = False
 is_extension = False
 ext_name = None
 try:
 ini = open(path, 'r')
 for line in ini:
 line = line.rstrip()
 
 # Check if it is content block (Custom PHP options)
 if line.startswith(';>==='):
 is_extension = False
 is_content = True
 contents.append(line)
 
 # Check if it is extension block
 elif line.startswith(';---'):
 tmp_ext_name = line.strip(';- ')
 if tmp_ext_name == "":
 continue
 ext_name = tmp_ext_name
 is_extension = True
 # Create the key-value pair
 if ext_name not in extensions_data:
 extensions_data[ext_name] = [line]
 
 # Processing content
 elif is_content:
 # Skip comments
 if line.startswith(';') and not line.startswith(';<==='):
 continue
 # Append until the end of block ('<===')
 contents.append(line)
 if line.startswith(';<==='):
 is_content = False
 
 # Processing extensions
 elif ext_name and is_extension:
 # Skip comments and empty lines
 if line.startswith(';') or line == "":
 continue
 # Append extension data
 if ext_name not in extensions:
 extensions.append(ext_name)
 
 if ext_name in extensions_data:
 extensions_data[ext_name].append(line)
 ini.close()
 except (OSError, IOError):
 pass
 return contents, extensions, extensions_data
 
 def _load_user_extensions(self, user, version):
 """
 Loads user alternative extensions list for a version
 @param user: string
 @param version: string
 """
 user_ini_path = self._compose_user_ini_path(user, version)
 extensions = self._skim_over_extensions(user_ini_path)
 self._user_extensions.extend(extensions)
 
 def _compose_user_ini_path(self, user, version):
 """
 Composes user ini file path
 @param user: string
 @param version: string
 @return: string
 """
 if self.without_cagefs:
 homedir = self._clpwd.get_homedir(user)
 php_dir = 'php%s' % version.replace('.', '')
 return homedir + '/.cl.selector/alt_' + php_dir + '.ini'
 uid = str(self._clpwd.get_uid(user))
 user_prefix = '/' if clcagefs.in_cagefs() else os.path.join(self.CAGEFS_PATH, uid[-2:], user)
 path = os.path.join(user_prefix,
 'etc',
 'cl.%s.d' % (self._item,),
 'alt-%s%s' % (self._item, version.replace('.', '')),
 self.USER_INI)
 return path
 
 |