| Viewing file:  extensions.py (3.81 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 from __future__ import print_function
 from __future__ import division
 from __future__ import absolute_import
 import glob
 import os
 import re
 from future.moves import configparser as ConfigParser
 from future.moves.configparser import SafeConfigParser
 
 from clselect.utils import check_output
 from clselect.utils import file_writelines, file_readlines
 
 
 EXTENSION_PATTERN = re.compile('^(?P<name>.+) \((?P<version>.+)\)$',
 re.MULTILINE)
 
 CACHE = '/var/lve/rubygems'
 CACHE_BINARY = '/usr/share/l.v.e-manager/utils/cache_rubygems.py'
 
 
 class ExtensionInfo(object):
 
 url = 'http://rubygems.org/gems'
 
 def __init__(self, gem=None):
 if gem:
 self.gem = gem
 else:
 self.gem = sorted(glob.glob('/opt/alt/ruby??/bin/gem'))[-1]
 
 def _list_gems(self):
 output = check_output(self.gem, 'list', '--remote')
 return EXTENSION_PATTERN.findall(output)
 
 def _list_gem_version(self, gem):
 output = check_output(self.gem, 'list', '^' + gem + '$', '--remote', '--all')
 versions = [version.split(',') for extension, version in EXTENSION_PATTERN.findall(output)]
 return sorted(list(set(version.strip().split(" ")[0] for version in versions[0])))
 
 def list_extensions(self):
 extensions = [extension for extension, _ in self._list_gems()]
 return ExtensionInfo.extensions_docs(extensions)
 
 def list_extensions_cached(self):
 if not os.path.isfile(CACHE):
 extensions = self.write_cache()
 else:
 extensions = [i.strip() for i in file_readlines(CACHE)]
 return ExtensionInfo.extensions_docs(extensions)
 
 def write_cache(self):
 extensions = [extension for extension, _ in self._list_gems()]
 try:
 file_writelines(CACHE, ['%s\n' % extension for extension in extensions], 'w')
 os.chmod(CACHE, 0o644)
 except IOError:
 pass
 return extensions
 
 def list_extensions_version(self, extensions):
 return dict((extension, {'versions': self._list_gem_version(extension)})
 for extension in extensions)
 
 @staticmethod
 def delete_cache():
 try:
 os.unlink(CACHE)
 except OSError:
 pass
 
 @staticmethod
 def extension_doc(extension):
 return str.join('/', (ExtensionInfo.url, extension))
 
 @staticmethod
 def extensions_docs(extensions):
 docs = (ExtensionInfo.extension_doc(extension)
 for extension in extensions)
 return dict((extension, {'doc': doc})
 for extension, doc in zip(extensions, docs))
 
 @staticmethod
 def get_locked_extensions(interpreter):
 alt_ver = interpreter.replace('.','')
 file_path = os.path.join('/opt/alt', alt_ver ,'etc', 'locked_extensions.ini')
 if not os.path.exists(file_path):
 file_path = os.path.join(os.path.dirname(__file__), '..', 'locked_extensions.ini')
 parser = SafeConfigParser(interpolation=None, strict=False)
 parser.read(file_path)
 try:
 items = parser.items(interpreter)
 except ConfigParser.NoSectionError:
 items = ()
 
 return dict((extension, [v.strip() for v in versions.split(',') if v])
 for extension, versions in items)
 
 @staticmethod
 def is_extensions_locked(locked_extensions, extension, version):
 return (extension in locked_extensions and (
 list(set([v.strip() for v in version.split(',') if len(version) > 0])
 & set(locked_extensions.get(extension))) or
 len(locked_extensions.get(extension)) == 0
 ))
 |