| Viewing file:  environments.py (17.03 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 from __future__ import print_function
 from __future__ import absolute_import
 from __future__ import division
 import os
 import sys
 import pwd
 import subprocess
 import shutil
 import filecmp
 
 from datetime import datetime
 from clselect.clselectctl import get_directory
 from clselect.utils import check_call, check_output, list_dirs, run_command
 from .extensions import EXTENSION_PATTERN, ExtensionInfo
 from .interpreters import Interpreter, interpreters
 import simplejson as json
 from simplejson import JSONDecodeError
 from clselect.clselectexcept import ClSelectExcept
 DEFAULT_PREFIX = 'virtualenv'
 BACKUP_PREFIX = '.virtualenv.backup'
 # only this binary it brought by alt-python-virtualenv package
 VIRTUALENV_BIN = '/opt/cloudlinux/venv/bin/virtualenv'
 VERSION_DELIMITER = '#'
 WRAPPERS_PATH = '/usr/share/l.v.e-manager/utils'
 PYTHON_WRAPPER = 'python_wrapper'
 SET_ENV_VARS_SCRIPT = 'set_env_vars.py'
 
 
 class Environment(object):
 
 def __init__(self, name, user=None, prefix=None):
 self.name = name
 if user:
 self.user = user
 else:
 self.user = pwd.getpwuid(os.getuid()).pw_name
 self.homepath = pwd.getpwnam(self.user).pw_dir
 self.pip_logfile = os.path.join(self.homepath, '.pip/pip.log')
 if prefix is None:
 self.prefix = DEFAULT_PREFIX
 else:
 self.prefix = prefix
 self.path = os.path.join(_abs_prefix(self.user, self.prefix), name)
 self.backup_path = os.path.join(
 _abs_prefix(self.user, BACKUP_PREFIX), self.name)
 self._requirements = None
 self._interpreter = None
 self._pip = None
 self.interpreter_name = 'python' + name
 # Create extenstion remap table
 self._extension_remap = {'MySQLdb': 'MySQL-python'}
 
 def __repr__(self):
 return ("%s.%s(name='%s', user='%s', prefix='%s')" % (
 self.__class__.__module__, self.__class__.__name__,
 self.name, self.user, self.prefix))
 
 def _demote(self):
 user_pwd = pwd.getpwnam(self.user)
 
 def func():
 os.setgid(user_pwd.pw_gid)
 os.setuid(user_pwd.pw_uid)
 os.environ['USER'] = self.user
 os.environ['HOME'] = user_pwd.pw_dir
 
 return func
 
 def as_dict(self, key=None):
 e = {
 'name': self.name,
 'interpreter': self.interpreter(),
 'extensions': self.extensions(),
 }
 if key:
 del e[key]
 return {getattr(self, key): e}
 return e
 
 def as_deepdict(self, key=None, with_extensions=True):
 e = {
 'name': self.name,
 'interpreter': self.interpreter().as_dict(),
 }
 if with_extensions:
 e.update({
 'extensions': self.extensions(),
 })
 if key:
 del e[key]
 return {getattr(self, key): e}
 return e
 
 def create(self, interpreter=None, version=None, wait=None):
 if not interpreter:
 interpreter = Interpreter(target_user=self.user)
 path = self.path
 if version:
 path = os.path.join(path, version)
 prompt = "({}:{})".format(
 get_directory(os.path.basename(self.prefix)), self.name,
 )
 args = [
 VIRTUALENV_BIN,
 '--prompt', prompt,
 '--python', interpreter.binary,
 path,
 ]
 kwargs = {"preexec_fn": self._demote(), "cwd": self.homepath, "wait": wait}
 try:
 check_call(*args, **kwargs)
 except ClSelectExcept.ExternalProgramFailed as err:
 err = str(err)
 err_trace = None
 # Change error text and add help if disk quota exceeded
 if "Disk quota exceeded" in err:
 err_text = "Disk quota exceeded.\n " \
 "Contact system administrator to increase disk quota."
 elif "Traceback" in err:
 # Find second ":" character. First is "Traceback :"
 err_char = err.find(":", err.find(":")+1)
 # Find last row of trace
 err_trace_end = err[:err_char].rfind('\n')  # pylint: disable=indexing-exception
 if err_trace_end == -1 or err_char == -1:
 err_text = err
 else:
 # Trace row without error
 err_trace = err[:err_trace_end]  # pylint: disable=indexing-exception
 # Only error without first trace
 err_text = err[err_trace_end+1:]  # pylint: disable=indexing-exception
 else:
 err_text = err
 raise ClSelectExcept.ExternalProgramFailed(
 message=err_text,
 details=err_trace,
 )
 
 self.configure_environment()
 
 def detect_python_binary(self, bin_path):
 files_to_check = [
 'python',
 self.interpreter_name.split('.')[0],
 self.interpreter_name,
 ]
 for file in files_to_check:
 path = os.path.join(bin_path, file)
 if not os.path.islink(path) or os.readlink(path).startswith('/opt/alt/python'):
 return path
 return None
 
 def configure_environment(self, auto_restore=False):
 """
 Configures environment:
 1. Rename binary to pythonX.Y_bin
 2. Makes symlink from python binary to python_wrapper
 """
 bin_path = os.path.join(self.path, 'bin')
 new_interpreter_path = os.path.join(bin_path, self.interpreter_name) + '_bin'
 interpreter_path = self.detect_python_binary(bin_path)
 if interpreter_path is None:
 return
 
 if os.path.exists(new_interpreter_path):
 os.remove(new_interpreter_path)
 
 os.rename(interpreter_path, new_interpreter_path)
 try:
 if not os.path.exists(interpreter_path):
 os.symlink(os.path.join(WRAPPERS_PATH, PYTHON_WRAPPER), interpreter_path)
 except (IOError, OSError):
 if auto_restore:
 os.rename(new_interpreter_path, interpreter_path)
 raise
 
 if not os.path.exists(os.path.join(bin_path, SET_ENV_VARS_SCRIPT)):
 os.symlink(os.path.join(WRAPPERS_PATH, SET_ENV_VARS_SCRIPT),
 os.path.join(bin_path, SET_ENV_VARS_SCRIPT))
 
 def destroy(self, version=None):
 path = self.path
 if version:
 path = os.path.join(path, version)
 if os.path.exists(path):
 check_call('/bin/rm', '-r', '--interactive=never', path,
 preexec_fn=self._demote())
 
 def _get_extension_name(self, extension_name):
 """
 Returns extensions name considering extension remap table
 :param extension_name: Input extension name
 :return: Result extension name
 """
 if extension_name in self._extension_remap:
 return self._extension_remap[extension_name]
 else:
 return extension_name
 
 def _recreate(self, version):
 """
 Recreate python virtual environment with requirements
 :return:
 """
 # if virtual environment does not exists, just create it
 # unfortunately, we don't have requirements
 env_path = os.path.join(self.path, version)
 if not os.path.exists(self.pip(version=version)):
 return
 print('Re-create python virtualenv:', env_path)
 # pip freeze, save last requirements into the file
 self._pip_freeze(version)
 # remember the requirements in the memory
 requirements_path = self.pip_requirements(version)
 requirements = []
 if os.path.exists(requirements_path):
 reqs_file = open(requirements_path, 'r')
 requirements = reqs_file.readlines()
 reqs_file.close()
 # destroy python virtual environment
 self.destroy(version=version)
 # create python virtual environment
 self.create(version=version, wait=True)
 # put remembered requirements into the file
 reqs_file = open(requirements_path, 'w')
 reqs_file.writelines(requirements)
 reqs_file.close()
 # pip install -r requirements, install requirements
 check_call(
 self.pip(version=version), 'install', '-r',
 self.pip_requirements(version))
 
 def recreate(self):
 for version in interpreters(key='version').keys():
 self._recreate(version)
 
 def exists(self):
 return os.path.exists(self.path)
 
 def interpreter(self):
 if not self._interpreter:
 self._interpreter = Interpreter(prefix=self.path, target_user=self.user)
 return self._interpreter
 
 def extension_install(self, extension_name):
 extension_name = self._get_extension_name(extension_name)
 locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
 t = extension_name.split(VERSION_DELIMITER)
 extension, version = t[0], t[1:] or ''
 command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile +
 ' install ' + extension_name)
 if version:
 version = version[0]
 command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile +
 ' install ' + extension + '==' + version)
 if ExtensionInfo.is_extensions_locked(locked_extensions, extension_name, version):
 raise ValueError("Extension '%s' install is prohibited. System extension" % extension_name)
 check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath)
 self._pip_freeze()
 
 def extension_install_requirements(self, requirements_path):
 command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile +
 ' install -r {}'.format(requirements_path))
 check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath)
 self._pip_freeze()
 
 def extension_update(self, extension):
 check_call(self.pip(), '--log-file='+self.pip_logfile,
 'install', '--upgrade', extension,
 preexec_fn=self._demote(), cwd=self.homepath)
 self._pip_freeze()
 
 def extension_uninstall(self, extension):
 locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
 t = extension.split(VERSION_DELIMITER)
 extension, version = t[0], t[1:] or ''
 if version:
 version = version[0]
 if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version):
 raise ValueError("Extension '%s' removal is prohibited" % extension)
 p = subprocess.Popen(
 (self.pip(), '--log-file='+self.pip_logfile, 'uninstall', extension), preexec_fn=self._demote(),
 stdin=subprocess.PIPE, stderr=subprocess.PIPE,
 stdout=subprocess.PIPE, cwd=self.homepath, text=True)
 stdout, stderr = p.communicate('y')
 if p.returncode:
 raise Exception(stderr or stdout)
 self._pip_freeze()
 
 def extensions(self):
 result = {}
 locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
 try:
 output = check_output(self.pip(), 'list', '--log-file='+self.pip_logfile, '--format=json', preexec_fn=self._demote(), cwd=self.homepath)
 extensions = [(x['name'], x['version']) for x in json.loads(output)]
 except (JSONDecodeError, KeyError, ValueError, ClSelectExcept.FileProcessError, ClSelectExcept.ExternalProgramFailed):
 output = check_output(self.pip(), 'list', '--log-file='+self.pip_logfile, preexec_fn=self._demote(), cwd=self.homepath)
 extensions = EXTENSION_PATTERN.findall(output)
 docs = (ExtensionInfo().extension_doc(extension)
 for extension, _ in extensions)
 for (name, version), doc in zip(extensions, docs):
 if ExtensionInfo.is_extensions_locked(locked_extensions, name, version):
 version_diff = list(set([v.strip() for v in version.split(',')])
 - set(locked_extensions.get(name)))
 if version_diff and len(locked_extensions.get(name)) != 0:
 result[name] = {'doc': doc, 'version': ', '.join(version_diff)}
 else:
 result[name] = {'doc': doc, 'version': version}
 
 return result
 
 def pip(self, version=None):
 if version is not None:
 return os.path.join(self.path, version, 'bin', 'pip')
 if not self._pip:
 self._pip = os.path.join(self.path, 'bin', 'pip')
 return self._pip
 
 def pip_requirements(self, version=None):
 if version is not None:
 return os.path.join(self.path, version, 'requirement.pip')
 return os.path.join(self.path, 'requirement.pip')
 
 def update_python_interpreter(self, backup=False, force=False, verbose=True, _alt_interpreters_dict=None):
 """
 copy binary python from /opt/alt/pythonXY/bin/pythonX.Y to virtualenvdir/bin/pythonX.Y
 :param backup: make backup old python interpreter
 :param force: force rewrite python interpreter without check
 :param verbose: print actions details to stdout
 :return: True - updating success; False - updating fail
 """
 update_result = False
 interpreter = self.interpreter()
 if _alt_interpreters_dict:
 main_interpreter = _alt_interpreters_dict[interpreter.version]
 else:
 main_interpreter = interpreters(key='version')[interpreter.version]  # path to original /opt/alt/pythonXY/bin/pythonX.Y
 
 updated_list = list()  # list updated interpreters
 
 if os.path.islink(interpreter.python_bin) and os.readlink(interpreter.python_bin).startswith('/opt/alt/python'):
 if verbose:
 print('Nothing to do, binary in your virtual environment is already symlink to global python!')
 return False
 
 # make backup and delete old python binary
 python_backup = interpreter.python_bin + '.orig_%s' % datetime.now().strftime("%Y-%m-%d_%H-%M")
 stat_ = os.stat(interpreter.python_bin)
 shutil.copy(interpreter.python_bin, python_backup)
 os.chown(python_backup, stat_.st_uid, stat_.st_gid)  # preserving owner
 
 try:
 for virtualenv_python_bin in interpreter.binary_list:
 if filecmp.cmp(main_interpreter.binary, interpreter.python_bin) and not force:
 update_result = False
 if verbose:
 print("    not need updating; skip '%s'" % virtualenv_python_bin)
 continue
 
 if verbose:
 sys.stdout.write("    copy '%s' -> '%s'..." % (main_interpreter.binary, virtualenv_python_bin))
 run_command(cmd=('/bin/cp', '--force', main_interpreter.binary, virtualenv_python_bin))
 updated_list.append(virtualenv_python_bin)
 print("Done")
 update_result = True
 except (OSError, IOError) as e:
 # rollback binaries python if something is wrong
 print("Fail %s" % str(e))
 for updated_python in updated_list:
 shutil.copyfile(python_backup, updated_python)  # safe copy with preserve owner and mode
 os.unlink(python_backup)
 if not backup:  # delete backup if not need
 os.unlink(python_backup)
 return update_result
 
 def _pip_freeze(self, version=None):
 """
 Output installed packages in requirements format
 :return: None
 """
 if not os.path.exists(self.pip(version)):
 return
 command = (self.pip(version), 'freeze', '-l')
 f = open(self.pip_requirements(version), 'w')
 check_call(args=command, preexec_fn=self._demote(),
 cwd=self.homepath, output=f)
 
 def pip_freeze(self):
 """
 Output installed packages in requirements format
 :return: None
 """
 for version in interpreters(key='version').keys():
 self._pip_freeze(version=version)
 
 
 def _abs_prefix(user=None, prefix=None):
 if not prefix:
 prefix = DEFAULT_PREFIX
 if user:
 return os.path.join(pwd.getpwnam(user).pw_dir, prefix)
 else:
 return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix)
 
 
 def environments(user=None, prefix=None):
 venv_path = _abs_prefix(user, prefix)
 try:
 env_list = list_dirs(venv_path)
 except OSError:
 return []
 envs = []
 for env_name in env_list:
 envs.append(Environment(env_name, user, prefix))
 return envs
 
 
 def environments_dict(key, user=None, prefix=None):
 return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix))
 
 
 def environments_deepdict(key, user=None, prefix=None):
 return dict(list(e.as_deepdict(key=key).items())
 for e in environments(user, prefix))
 
 |