| Viewing file:  main_loop.py (6.1 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 import logging
 import time
 
 import psutil
 
 from lvestats.eventloop.plugin_executors import (
 DbRecoveredException,
 DbRecoverFailedException,
 PluginExecutionException,
 PluginTimeoutException,
 SameProcessPluginExecutor,
 SeparateProcessPluginExecutor,
 )
 from lvestats.lib.config import ConfigError
 
 __author__ = 'iseletsk'
 
 
 class MainLoop(object):
 def __init__(self,
 config,
 interval=5,
 plugins=None,
 plugin_timeout=5,
 multiproc=True,
 profiling_log=None):
 """
 :param config:
 :param lve_data: data being collected, used and modified by plugins during main loop execution.
 :param interval: update interval in seconds
 :param plugins: list of plugins
 :param plugin_timeout: time in seconds to let plugin be executed
 :param str profiling_log: if not None -- file to write profiling info
 :return:
 """
 self.config = config
 self.interval = interval
 self.default_interval = interval
 self.do_exit = False
 if plugins is None:
 self.plugins = []
 else:
 self.plugins = plugins
 
 self.aggregation_period = config.get('aggregation_period', 60)
 
 self.default_user_plugin_timeout = plugin_timeout
 self.log = logging.getLogger('main.loop')
 if multiproc:
 self.executor = SeparateProcessPluginExecutor(config, profiling_log)
 else:
 self.executor = SameProcessPluginExecutor(config)
 
 self.profiling_log = profiling_log
 
 def exit(self):
 self.log.debug('Want to exit')
 self.do_exit = True
 
 @staticmethod
 def _plugin_path_name(plugin_class):
 """
 >>> from lvestats.plugins.generic.aggregators import LveUsageAggregator
 >>> plugin_class = LveUsageAggregator
 >>> MainLoop._plugin_path_name(plugin_class)
 :param plugin_class:
 :return: Filesystem path to plugin and it's name
 """
 import inspect  # pylint: disable=import-outside-toplevel
 path = inspect.getfile(plugin_class)
 path = path.replace('.pyc', '.py')
 name = plugin_class.__name__
 return path, name
 
 def get_debug_info(self, stat_getter):
 try:
 stat_value = stat_getter()
 return str(stat_value)
 except Exception as e:
 self.log.debug('Error while getting info: %s', str(e))
 return ''
 
 def run(self, times=None):
 cnt = None
 if times:
 cnt = times
 
 profiler = None
 if self.profiling_log:
 import cProfile  # pylint: disable=import-outside-toplevel
 profiler = cProfile.Profile()
 if not cnt:
 cnt = 10
 self.log.info('Profiling server for %s plugin cycles', cnt)
 profiler.enable()
 
 while True:
 now = time.time()
 for plugin in self.plugins:
 if self.do_exit:
 self.log.debug('Exiting')
 self.executor.terminate()
 return
 
 if getattr(plugin, '__is_user_plugin__', False):
 timeout = getattr(plugin, 'timeout', self.default_user_plugin_timeout)
 else:
 timeout = getattr(plugin, 'timeout', None)
 
 try:
 if profiler:
 profiler.runcall(self.executor.execute, plugin, now, timeout)
 else:
 self.executor.execute(plugin, now, timeout)
 except DbRecoveredException:
 self.log.warning("Db was corrupt, recovered")
 continue
 except DbRecoverFailedException:
 self.log.exception("Unable to recover database")
 return
 except PluginTimeoutException:
 load = {
 'process_count': self.get_debug_info(lambda: len(psutil.pids())),
 'cpu': self.get_debug_info(psutil.cpu_times_percent),
 'mem': self.get_debug_info(lambda: psutil.virtual_memory().percent),
 'io': self.get_debug_info(psutil.disk_io_counters)
 }
 path, name = MainLoop._plugin_path_name(plugin)
 self.log.error("Plugin %s:%s timed out", path, name, extra={'data': load})
 continue
 except PluginExecutionException:
 path, name = MainLoop._plugin_path_name(plugin)
 self.log.exception("Error during execution of %s:%s", path, name)
 continue
 except ConfigError:
 path, name = MainLoop._plugin_path_name(plugin)
 self.log.exception("Unable to init plugin %s:%s", path, name)
 
 if cnt:
 cnt -= 1
 if cnt == 0:
 break
 self._sleep(now)
 
 if profiler:
 self.log.info('Profiling finished')
 import pstats  # pylint: disable=import-outside-toplevel
 with open(self.profiling_log, 'a+', encoding='utf-8') as f:
 f.write('Main loop profile, internal time:\n')
 stats = pstats.Stats(profiler, stream=f)
 stats.sort_stats('time').print_stats(20)
 
 f.write('Main loop profile, cumulative time:\n')
 stats.sort_stats('cumulative').print_stats(20)
 
 self.executor.terminate()
 
 def _sleep(self, now):
 time_passed = time.time() - now
 pause = self.interval - time_passed
 if pause > 0:
 if self.interval > self.default_interval:
 self.interval -= 0.5
 time.sleep(pause)
 else:
 if self.interval < self.aggregation_period:
 self.interval += 0.5
 
 |