| Viewing file:  environments.py (6.36 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 from __future__ import absolute_import
 from __future__ import print_function
 from __future__ import division
 import os
 import pwd
 from distutils.version import StrictVersion
 
 from clselect.clselectctl import get_directory
 from clselect.utils import check_call, check_output, list_dirs
 from .extensions import EXTENSION_PATTERN, ExtensionInfo
 from .interpreters import Interpreter
 
 
 DEFAULT_PREFIX = 'rubyvenv'
 RUBYVENV_BIN = os.path.join(os.path.dirname(__file__), 'rubyvenv.py')
 VERSION_DELIMITER = '#'
 PYTHON_PATH = '/opt/cloudlinux/venv/bin/python3'
 
 
 class Environment(object):
 
 def __init__(self, name, user=None, prefix=None):
 self.name = name
 if user:
 self.user = user
 else:
 self.user = pwd.getpwuid(os.getuid()).pw_name
 if prefix is None:
 self.prefix = DEFAULT_PREFIX
 else:
 self.prefix = prefix
 self.path = os.path.join(_abs_prefix(self.user, self.prefix), name)
 self._interpreter = None
 self._gem = None
 self.interpreter_name = 'ruby' + name
 
 def __repr__(self):
 return ("%s.%s(name='%s', user='%s', prefix='%s')" % (
 self.__class__.__module__, self.__class__.__name__,
 self.name, self.user, self.prefix))
 
 def _demote(self):
 user_pwd = pwd.getpwnam(self.user)
 
 def func():
 os.setgid(user_pwd.pw_gid)
 os.setuid(user_pwd.pw_uid)
 
 return func
 
 def as_dict(self, key=None):
 e = {
 'name': self.name,
 'interpreter': self.interpreter(),
 'extensions': self.extensions(),
 }
 if key:
 del e[key]
 return {getattr(self, key): e}
 return e
 
 def as_deepdict(self, key=None):
 e = {
 'name': self.name,
 'interpreter': self.interpreter().as_dict(),
 'extensions': self.extensions(),
 }
 if key:
 del e[key]
 return {getattr(self, key): e}
 return e
 
 def create(self, interpreter=None):
 if not interpreter:
 interpreter = Interpreter(target_user=self.user)
 prompt = '(' + \
 get_directory(os.path.basename(self.prefix)) + ':' + self.name + \
 ')'
 check_call(
 PYTHON_PATH, RUBYVENV_BIN,
 '--prompt', prompt,
 '--ruby', interpreter.binary,
 self.path,
 preexec_fn=self._demote())
 
 def destroy(self):
 check_call('/bin/rm', '-r', '--interactive=never', self.path, preexec_fn=self._demote())
 
 def exists(self):
 return os.path.exists(self.path)
 
 def interpreter(self):
 if not self._interpreter:
 self._interpreter = Interpreter(prefix=self.path, target_user=self.user)
 return self._interpreter
 
 def gem(self):
 if not self._gem:
 self._gem = os.path.join(self.path, 'bin', 'gem')
 return self._gem
 
 def extension_install(self, extension):
 locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
 t = extension.split(VERSION_DELIMITER)
 extension, version = t[0], t[1:] or ''
 if StrictVersion(self.name) >= StrictVersion('2.6'):
 command = (self.gem(), 'install', '--no-document', extension)
 else:
 command = (self.gem(), 'install', '--no-rdoc', '--no-ri', extension)
 if version:
 version = version[0]
 command += ('-v', version)
 if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version):
 raise ValueError("Extension '%s' install is prohibited. System extension" % extension)
 check_call(args=command, preexec_fn=self._demote())
 
 def extension_update(self, extension):
 check_call(self.gem(), 'update', extension, preexec_fn=self._demote())
 
 def extension_uninstall(self, extension):
 locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
 t = extension.split(VERSION_DELIMITER)
 extension, version = t[0], t[1:] or ''
 command = (self.gem(), 'uninstall', extension, '-x', '-a')
 if version:
 version = version[0]
 command += ('-v', version)
 if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version):
 raise ValueError("Extension '%s' removal is prohibited" % extension)
 check_call(args=command, preexec_fn=self._demote())
 
 def extensions(self):
 result = {}
 locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
 output = check_output(self.gem(), 'list', '--local', preexec_fn=self._demote())
 extensions = EXTENSION_PATTERN.findall(output)
 docs = (ExtensionInfo.extension_doc(extension)
 for extension, _ in extensions)
 for (name, version), doc in zip(extensions, docs):
 if ExtensionInfo.is_extensions_locked(locked_extensions, name, version):
 version_diff = list(set([v.strip() for v in version.split(',')])
 - set(locked_extensions.get(name)))
 if version_diff and len(locked_extensions.get(name)) != 0:
 result[name] = {'doc': doc, 'version': ', '.join(version_diff)}
 else:
 result[name] = {'doc': doc, 'version': version}
 
 return result
 
 
 def _abs_prefix(user=None, prefix=None):
 if not prefix:
 prefix = DEFAULT_PREFIX
 if user:
 return os.path.join(pwd.getpwnam(user).pw_dir, prefix)
 else:
 return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix)
 
 
 def environments(user=None, prefix=None):
 venv_path = _abs_prefix(user, prefix)
 try:
 env_list = list_dirs(venv_path)
 except OSError:
 return []
 envs = []
 for env_name in env_list:
 envs.append(Environment(env_name, user, prefix))
 return envs
 
 
 def environments_dict(key, user=None, prefix=None):
 return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix))
 
 
 def environments_deepdict(key, user=None, prefix=None):
 return dict(list(e.as_deepdict(key=key).items())
 for e in environments(user, prefix))
 
 |