| Viewing file:  plugin_executors.py (15.33 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 import cProfile
 import logging
 import multiprocessing
 import os
 import pstats
 import signal
 import sys
 import threading
 import time
 import traceback
 
 from sqlalchemy.exc import (
 NoReferenceError,
 NoSuchColumnError,
 NoSuchTableError,
 SQLAlchemyError,
 )
 
 from lvestats.core.plugin import LveStatsPluginTerminated
 from lvestats.core.plugin_context import PluginContext
 from lvestats.lib.config import ConfigError
 
 __author__ = 'shaman'
 
 
 class PluginExecutionException(Exception):
 def __init__(self, *args, **kwargs):
 super().__init__(*args, **kwargs)
 
 
 class DbRecoveredException(Exception):
 def __init__(self, *args, **kwargs):
 super().__init__(*args, **kwargs)
 
 
 class DbRecoverFailedException(Exception):
 def __init__(self, *args, **kwargs):
 super().__init__(*args, **kwargs)
 
 
 class PluginTimeoutException(Exception):
 def __init__(self, *args, **kwargs):
 super().__init__(*args, **kwargs)
 
 
 class PluginExecutor(object):
 initial_lve_data = {'stats': {}}
 
 def __init__(self, config):
 self.config = config
 self.log = logging.getLogger('main_loop.plugin_executor')
 
 from lvestats.lib.commons.func import (  # pylint: disable=import-outside-toplevel
 get_lve_version,
 )
 PluginExecutor.initial_lve_data = {'stats': {}, 'LVE_VERSION': get_lve_version()}
 
 def execute(self, plugin_class, now, timeout):
 """
 Executes given plugin with provided lve_data.
 Does not modify lve_data.
 Returns updated lve_data or throws PluginExecutionException or PluginTimeoutException
 :param plugin_class: object:
 :return:
 """
 raise NotImplementedError()
 
 def terminate(self):
 raise NotImplementedError()
 
 
 KILL_PROCESS_TIMEOUT = 20
 
 
 class SameProcessPluginExecutor(PluginExecutor):
 def __init__(self, config):
 super().__init__(config)
 self.plugin_context = PluginContext(config)
 self.last_execution_time = {}
 self.lve_data = PluginExecutor.initial_lve_data.copy()
 
 def execute(self, plugin_class, now, timeout):
 log = logging.getLogger('main_loop.plugin_executor.same_process')
 plugin_instance = self.plugin_context.get_instance(plugin_class)
 plugin_instance.now = now
 
 time_passed = None
 
 period = getattr(plugin_instance, 'period', getattr(plugin_class, 'period', None))
 
 if period:
 time_passed = now - self.last_execution_time.get(plugin_class, 0)
 
 if period is None or period <= time_passed:
 try:
 plugin_instance.execute(self.lve_data)
 self.last_execution_time[plugin_class] = now
 return
 except (NoSuchColumnError, NoSuchTableError, NoReferenceError) as ex:
 log.exception('Database error during executing plugin that leads to need of db recreation db')
 
 try:
 self.plugin_context.recover_db()
 except BaseException as exception_during_recover:
 raise DbRecoverFailedException(exception_during_recover) from exception_during_recover
 
 raise DbRecoveredException() from ex
 except SQLAlchemyError as ex:
 log.exception('Database error during executing plugin that may indicate the need to recreate db. '
 'Please check it manually.')
 raise PluginExecutionException(ex) from ex
 except BaseException as ex:
 raise PluginExecutionException(ex) from ex
 
 def terminate(self):
 pass
 
 
 class ProcessContext(object):
 def __init__(self):
 self.mgr = multiprocessing.Manager()
 self.ns = self.mgr.Namespace()
 self.ns.last_execution_time = {}
 self.plugin_ready_event = self.mgr.Event()
 self.plugin_execution_finished_event = self.mgr.Event()
 self.plugin_execution_success_event = self.mgr.Event()
 self.db_error_event = self.mgr.Event()
 
 self.have_to_exit = self.mgr.Event()
 self.exited = self.mgr.Event()
 
 def terminate(self):
 self.mgr.shutdown()
 
 
 class SeparateProcessPluginExecutor(PluginExecutor):
 def __init__(self, config, profiling_log=None):
 super().__init__(config)
 self.profiling_log = profiling_log
 self.process_context = None
 self.process = None
 
 @staticmethod
 def _in_separate_process(config, process_context, profiler=None, profiling_log=None):
 os.setpgrp()
 signal.signal(signal.SIGTERM, SeparateProcessPluginExecutor._child_process_sigterm_handler)
 signal.signal(signal.SIGUSR1, SeparateProcessPluginExecutor._sigusr1_handler)
 log = logging.getLogger('main_loop.plugin_executor.process')
 try:
 plugin_context = PluginContext(config)
 lve_data = PluginExecutor.initial_lve_data.copy()
 
 while True:
 while not process_context.plugin_ready_event.wait(timeout=1):
 if process_context.have_to_exit.is_set():
 process_context.plugin_execution_finished_event.set()
 
 if profiler:
 with open(profiling_log, 'a+', encoding='utf-8') as f:
 f.write('Plugin process profile, internal time:\n')
 stats = pstats.Stats(profiler, stream=f)
 stats.sort_stats('time').print_stats(20)
 
 f.write('Plugin process profile, cumulative time:\n')
 stats.sort_stats('cumulative').print_stats(20)
 process_context.exited.set()
 return
 
 process_context.plugin_ready_event.clear()
 
 plugin_class = process_context.ns.plugin_class
 now = process_context.ns.now
 
 try:
 plugin_instance = plugin_context.get_instance(plugin_class)
 except ConfigError as ce:
 log.exception(str(ce))
 continue
 except Exception as ex:
 log.exception("Error during instantiating plugin")
 process_context.ns.exception = ex
 process_context.plugin_execution_finished_event.set()
 continue
 
 plugin_instance.now = now
 
 period = getattr(plugin_instance, 'period', getattr(plugin_class, 'period', None))
 if period is not None:
 period = float(period)
 
 time_passed = None
 
 last_execution_time = process_context.ns.last_execution_time
 
 if period is not None:
 time_passed = now - last_execution_time.get(plugin_class, 0)
 
 if period is None or period <= time_passed:
 try:
 signal.signal(signal.SIGUSR2, SeparateProcessPluginExecutor._stop_plugin)
 log.debug("Executing plugin %s", plugin_class)
 
 t0 = time.time()
 plugin_instance.execute(lve_data)
 log.debug("Executing plugin %s took %f sec", plugin_class, time.time() - t0)
 
 last_execution_time[plugin_class] = now
 process_context.ns.last_execution_time = last_execution_time
 
 process_context.plugin_execution_success_event.set()
 process_context.plugin_execution_finished_event.set()
 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
 except (NoSuchColumnError, NoSuchTableError, NoReferenceError) as ex:
 log.exception('Database error during executing plugin that leads to need of db recreation db')
 process_context.ns.exception = ex
 process_context.db_error_event.set()
 process_context.plugin_execution_finished_event.set()
 continue
 except SQLAlchemyError as ex:
 log.exception('Database error during executing %s plugin that may indicate the need to '
 'recreate db. '
 'Please check it manually.', plugin_class)
 process_context.ns.exception = ex
 process_context.plugin_execution_finished_event.set()
 continue
 except LveStatsPluginTerminated:
 log.info("Plugin %s was terminated.", plugin_class)
 sys.exit(0)
 except IOError as ioe:
 log.exception("IO Error: %s", str(ioe))
 continue
 except Exception as ex:
 log.exception("Other exception during execution of plugin %s", plugin_class)
 process_context.ns.exception = ex
 process_context.plugin_execution_finished_event.set()
 continue
 else:
 log.debug("plugin %s will be launched in %f sec", plugin_class, period - time_passed)
 process_context.plugin_execution_success_event.set()
 process_context.plugin_execution_finished_event.set()
 except Exception as ex:
 try:
 process_context.ns.exception = ex
 except IOError as e:
 # Hide "IOError: [Errno 32] Broken pipe" on KeyboardInterrupt
 # Probably there is a better way to do this
 if e.errno == 32 and isinstance(ex, EOFError):
 return
 raise e     # don't hide any other error
 log.exception("Exception during execution in separate process")
 process_context.plugin_execution_finished_event.set()
 
 @staticmethod
 def _do_profile_in_separate_process(config, process_context, profiling_log):
 profiler = cProfile.Profile()
 profiler.runcall(SeparateProcessPluginExecutor._in_separate_process,
 config,
 process_context,
 profiler,
 profiling_log)
 
 def _make_sub_process(self, ):
 self.process_context = ProcessContext()
 target = SeparateProcessPluginExecutor._do_profile_in_separate_process if self.profiling_log else \
 SeparateProcessPluginExecutor._in_separate_process
 
 self.process = multiprocessing.Process(target=target,
 name='plugin_process',
 args=(self.config, self.process_context, self.profiling_log))
 self.process.start()
 
 @staticmethod
 def _stop_plugin(signum, frame):
 log = logging.getLogger('plugin_sigusr2_handler')
 log.info('Shutting down plugin')
 raise LveStatsPluginTerminated()
 
 @staticmethod
 def _child_process_sigterm_handler(signum, frame):
 log = logging.getLogger('subprocess_sigterm_handler')
 log.info('Shutting down child process')
 sys.exit(0)
 
 def _kill_subprocess(self, pid):
 if not pid:
 return
 
 os.kill(pid, signal.SIGKILL)
 self.log.debug("subprocess killed")
 
 def _terminate_sub_process(self):
 if not self.process or not self.process_context:
 return
 
 self.log.debug('Terminating subprocess')
 self.process_context.have_to_exit.set()
 
 if self.process_context.exited.wait(KILL_PROCESS_TIMEOUT):
 self.process = None
 self.process_context.terminate()
 self.process_context = None
 return
 
 self.log.debug('Killing subprocess')
 pid = self.process.pid
 
 try:
 self.process.terminate()
 self.process.join(KILL_PROCESS_TIMEOUT)
 if self.process.is_alive():
 self._kill_subprocess(pid)
 except Exception as ex:
 self.log.exception("Wasn't able to kill subprocess because of the error: %s", str(ex))
 self._kill_subprocess(pid)
 finally:
 self.process = None
 self.process_context.terminate()
 self.process_context = None
 
 @staticmethod
 def _sigusr1_handler(signum, frame):
 log = logging.getLogger('stack tracer')
 log.info("--- Threads stack traces, while plugin is considered stuck ---")
 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
 code = []
 for thread_id, stack in list(sys._current_frames().items()):  # pylint: disable=protected-access
 code.append(f"\n# Thread: {id2name.get(thread_id, '')}({thread_id})")
 for filename, lineno, name, line in traceback.extract_stack(stack):
 code.append(f'File: "{filename}", line {lineno}, in {name}')
 if line:
 code.append(f"  {line.strip()}")
 log.info("\n".join(code))
 
 def terminate(self):
 self._terminate_sub_process()
 
 def _restart_sub_process(self):
 self._terminate_sub_process()
 self._make_sub_process()
 
 def _get_process_and_context(self):
 if not self.process or not self.process_context:
 self._make_sub_process()
 return self.process, self.process_context
 
 def execute(self, plugin_class, now, timeout):
 if self.profiling_log:
 timeout = None
 
 _, context = self._get_process_and_context()
 
 t0 = time.time()
 context.ns.plugin_class = plugin_class
 context.ns.now = now
 
 context.plugin_execution_finished_event.clear()
 context.plugin_execution_success_event.clear()
 
 context.plugin_ready_event.set()
 self.log.debug('Executor (main process) data exchange with plugin took %f sec', time.time() - t0)
 context.plugin_execution_finished_event.wait(timeout)
 
 db_corrupted = context.db_error_event.is_set()
 
 if db_corrupted:
 self._terminate_sub_process()
 try:
 recreator = PluginContext(self.config)
 recreator.recover_db()
 except BaseException as exception_during_recover:
 raise DbRecoverFailedException(exception_during_recover) from exception_during_recover
 self._make_sub_process()
 raise DbRecoveredException()
 
 in_time = context.plugin_execution_finished_event.is_set()
 if not in_time:
 os.kill(self.process.pid, signal.SIGUSR1)
 time.sleep(timeout / 2.0)
 self._restart_sub_process()
 raise PluginTimeoutException()
 
 success = context.plugin_execution_success_event.is_set()
 if not success:
 ex = context.ns.exception
 self._terminate_sub_process()
 if ex:
 raise PluginExecutionException(ex)
 else:
 raise PluginExecutionException()
 
 
 __all__ = ['PluginExecutor', 'SameProcessPluginExecutor', 'SeparateProcessPluginExecutor',
 'PluginExecutionException', 'PluginTimeoutException', 'DbRecoveredException',
 'DbRecoverFailedException']
 
 |