| Viewing file:  autotracer.py (20.75 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 """
 This module contains Autotracer class
 """
 __package__ = 'ssa.modules'
 
 import json
 import logging
 import os
 from collections import defaultdict
 from dataclasses import dataclass, field, asdict
 from datetime import datetime, timedelta
 from fnmatch import fnmatchcase
 from typing import Iterator, Optional, NamedTuple, List, Iterable
 from urllib.parse import urlparse
 
 import numpy as np
 
 from ssa.db import setup_database
 from ssa.internal.exceptions import SSAError
 from ssa.modules.common import Common
 from ssa.modules.storage import (
 iter_domains_data,
 iter_urls_data,
 get_url_durations
 )
 from ssa.autotracing import status, disabled_users, misconfiguration_checks
 from ssa.configuration import load_tunables
 from ssa.configuration.schemes import autotracing_tunables_schema
 from ssa.internal.constants import autotracing_stats_file
 from ssa.internal.utils import (
 is_xray_version_supported,
 is_kernel_version_supported,
 sentry_init
 )
 
 xray_import_error = None
 try:
 import xray.shared_library as xray_lib
 except ImportError as e:
 xray_import_error = e
 xray_lib = None
 
 
 @dataclass
 class URL:
 """URL data container"""
 uri: str  # full URL including domain and URI
 avg_duration: int  # average duration during the day
 max_duration: int  # maximum duration during the day
 total_slow_reqs: int  # total SLOW request count during the day
 total_reqs: int  # total request count during the day
 belongs_to: str  # domain
 belongs_to_user: str  # domain owner
 
 
 @dataclass
 class RulesRejects:
 """Rejects counters container"""
 non_wp: int = 0  # number of URLs, skipped due to non-WP site
 throttled: int = 0  # number of URLs, skipped due to throttling
 density: int = 0  # number of URLs, skipped due to low density
 slowness: int = 0  # number of URLs, skipped because they are considered fast
 max_slowness: int = 0  # number of URLs, skipped because they are considered too slow
 disabled: int = 0  # number of URLs, skipped due to disabled autotracing
 nginx: int = 0  # number of URLs, skipped due to nginx caching
 frequency: int = 0  # number of URLs, skipped due to min_retracing_interval
 server_limit: int = 0  # number of URLs, skipped due to per-server limit
 domain_limit: int = 0  # number of URLs, skipped due to per-domain limit
 no_domain: int = 0  # number of URLs, skipped due to unavailable domain info (e.g. no domain)
 
 
 @dataclass
 class Stats:
 """AutoTracer statistics container"""
 rules_version: str  # version of rules applied from autotracing.json
 urls_processed: int = 0  # number of URLs, processed within the latest iteration
 urls_selected: int = 0  # number of URLs, selected to launch autotracing tasks
 rejects: RulesRejects = field(default_factory=RulesRejects)  # rejects counters
 
 
 class URLS(NamedTuple):
 """
 Representation of an URL
 """
 domain_name: str
 uri_path: str
 
 
 def url_split(url: str) -> URLS:
 """
 Split URL into domain_name and uripath including query string
 :param url: URL of format protocol://domain/path;parameters?query#fragment
 :return: namedtuple URL(domain_name, uripath)
 """
 fragments = urlparse(url)
 qs = f'?{fragments.query}' if fragments.query else ''
 uri = f'{fragments.path}{qs}' if fragments.path else '/'
 _no_www_netloc = fragments.netloc.replace('www.', '')
 _no_port_netloc = _no_www_netloc.split(':')[0]
 return URLS(_no_port_netloc, uri)
 
 
 class AutoTracer(Common):
 """
 SSA autotracing module implementation.
 """
 
 def __init__(self, engine=None):
 super().__init__()
 self.logger = logging.getLogger('auto_tracer')
 self.logger.info('AutoTracer enabled: %s', __package__)
 self.disabled_users = list()
 self.tasks_list = list()
 self.stats = None  # initial value for an iteration statistics
 
 self.engine = engine if engine else setup_database()
 
 def __call__(self):
 try:
 misconfiguration_checks()
 except SSAError as exc:
 self.logger.info('AutoTracer skipped: %s', exc.reason)
 return
 
 if xray_lib is None:
 # to send sentry event only if x-ray really not installed
 if os.path.exists('/usr/sbin/cloudlinux-xray-agent'):
 self.logger.error('AutoTracer skipped: X-ray module import error: %s', str(xray_import_error))
 else:
 self.logger.info('AutoTracer skipped: X-Ray not installed')
 return
 
 start_tool = xray_lib.start_autotracing
 
 if start_tool is not None:
 self.logger.info('AutoTracer started')
 # reload config
 super().__init__()
 self.logger.debug('AutoTracer loaded config: %s', self.config)
 # start gather own statistics for a current iteration
 self.stats = Stats(self.rules_version)
 for url in self.urls_scheduled():
 self.logger.info("Starting auto task for %s", url.uri)
 self.stats.urls_selected += 1
 try:
 start_tool(url=url.uri, tracing_count=self.request_number)
 except Exception as e:
 # most likely XRayError
 self.logger.error('Failed to start task: %s', str(e))
 self.save_iteration_stats()
 
 @staticmethod
 def load_conf() -> dict:
 """
 Load configuration
 """
 return load_tunables('autotracing.json',
 autotracing_tunables_schema)
 
 @staticmethod
 def gets_domaininfo(domain_name: str) -> Optional[object]:
 """
 Gets domain info for the specified domain
 """
 if xray_lib is None:
 return None
 return xray_lib.domain_info(domain_name)
 
 def nginx_is_enabled(self, user_name: str) -> bool:
 """
 Says if nginx is enabled for a specific user
 """
 if self.skip_nginx:
 if xray_lib is None:
 return False
 return xray_lib.NginxUserCache(user_name).is_enabled
 return False
 
 def per_server_limit_recalculated(self) -> int:
 """Recalculate limit per server taking into account running tasks"""
 running_count = len([task for task in self.tasks_list if
 task['status'] == 'running' and task[
 'user'] == '*autotracing*'])
 self.logger.debug('Number of running autotracing tasks %s',
 running_count)
 return self.per_server_limit - running_count
 
 def excludes_old_tasks(self, full_list: Optional[list] = None) -> list:
 """
 Excludes tasks older than N days from the general list of tasks
 """
 if full_list is None:
 full_list = xray_lib.tasks_list()
 self.logger.debug('Task list loaded %s', full_list)
 
 n_days_ago = datetime.now() - timedelta(
 days=self.min_retracing_interval)
 return [task for task in full_list
 if (task.get('createtime') if task.get('createtime') is not None else 0) > int(n_days_ago.timestamp())]
 
 def exclude_thesame_urls(self, current_url: str) -> bool:
 """
 Excludes url from the list if it completely matches the current url
 or if domain names match and "*" follows the domain name in the list
 """
 # c - current parsed url
 c = url_split(current_url)
 for task_data in self.tasks_list:
 # t - task parsed url
 t = url_split(task_data['url'])
 direct_match = c.domain_name == t.domain_name and c.uri_path == t.uri_path
 wildcard_match = fnmatchcase(
 c.domain_name, t.domain_name) and fnmatchcase(c.uri_path,
 t.uri_path)
 if direct_match or wildcard_match:
 self.logger.debug(
 'Skipped: URL %s was traced recently. Matched by %s',
 current_url, task_data['url'])
 return True
 return False
 
 def pass_by_density(self, url_total_reqs: list,
 domain_total_reqs: list) -> bool:
 """Check that URL density passes given threshold"""
 if self.density:
 url_density = np.amin(
 np.corrcoef(url_total_reqs, domain_total_reqs))
 self.logger.debug('Calculated density %s', url_density)
 return url_density > self.density_threshold
 return True
 
 def pass_by_slowness_percentile(self, url_durations: Iterable[int]) -> bool:
 """
 The measure of "slowness" for URL is:
 at least N% of its requests take more than X seconds.
 N% -- self.slow_duration_percentage
 X -- self.slow_duration_threshold
 """
 reversed_percentile = 100 - self.slow_duration_percentage
 reversed_percentile_value = np.percentile(url_durations,
 reversed_percentile)
 self.logger.debug('Calculated %sth percentile %s for min duration',
 reversed_percentile,
 reversed_percentile_value)
 return reversed_percentile_value >= self.slow_duration_threshold
 
 def pass_by_max_slowness_percentile(self,
 url_durations: Iterable[int]) -> bool:
 """
 The opposite to pass_by_slowness_percentile method.
 The measure of "much slowness" for URL is:
 at least N% of its requests must take less than X seconds.
 N% -- self.max_slow_duration_percentage
 X -- self.max_slow_duration_threshold
 """
 percentile_value = np.percentile(url_durations,
 self.max_slow_duration_percentage)
 self.logger.debug('Calculated %sth percentile %s for max duration',
 self.max_slow_duration_percentage,
 percentile_value)
 return percentile_value <= self.max_slow_duration_threshold
 
 def pass_by_allowed_throttling(self, url_throttled_reqs: Optional[list],
 url_total_reqs: list) -> bool:
 """
 Check that percent of throttled requests per URL passes given threshold
 """
 if url_throttled_reqs is None:
 # skip URL with unavailable throttling info
 return False
 throttled_percent = (sum(url_throttled_reqs)/sum(url_total_reqs))*100
 self.logger.debug('Calculated throttled percent %s', throttled_percent)
 return throttled_percent <= self.allowed_throttling_percentage
 
 def pass_by_engine(self, wp_status: Optional[bool]) -> bool:
 """
 Check that URLs of a particular domain should be analyzed.
 For now we skip non-wordpress sites
 """
 if self.only_wp:
 # turn unavailable wp_status, aka None, into False
 return bool(wp_status)
 return True
 
 def urls_computationally_filtered(self) -> Iterator[URL]:
 """
 Select all URLs suitable for auto tracing by very basic rules:
 - WP site
 - suitable throttling
 - suitable density
 - measure of "slow" URL
 ORDER OF RULES MUST NOT BE CHANGED: IT AFFECTS STATISTICS COUNTERS
 """
 for domain_data in iter_domains_data(self.engine):
 domain_owner = None
 if not self.pass_by_engine(domain_data.is_a_wordpress_domain):
 # rule ordering #1: skip non-wordpress sites
 # corresponds to non_wp RulesRejects counter
 self.logger.debug('Skipped by engine: non-wordpress')
 # all URLs of non-WP site are skipped then,
 # thus counters (reject abd total) are to be increased
 # on number of these URLs ignoring non-URL data fields
 skipped_count = domain_data.urls_number
 self.stats.rejects.non_wp += skipped_count
 self.stats.urls_processed += skipped_count
 continue
 
 domain_url_durations = dict(get_url_durations(
 self.engine, domain_data.domain_name))
 
 for url, data in iter_urls_data(self.engine,
 domain_data.domain_name, list(domain_url_durations.keys())):
 if url in self.non_url_fields:
 # skip entry of domain requests counter
 continue
 
 self.logger.debug('Processing URL %s', url)
 self.stats.urls_processed += 1
 if not self.pass_by_allowed_throttling(
 data.get('url_throttled_reqs'), data['url_total_reqs']):
 # rule ordering #2: percent of throttled requests
 # corresponds to throttled RulesRejects counter
 self.logger.debug('Skipped by throttled percent')
 self.stats.rejects.throttled += 1
 continue
 
 if not self.pass_by_density(data['url_total_reqs'],
 domain_data.domain_total_reqs):
 # rule ordering #3: density threshold
 # corresponds to density RulesRejects counter
 self.logger.debug('Skipped by density')
 self.stats.rejects.density += 1
 continue
 
 durations = domain_url_durations[url]
 if not self.pass_by_slowness_percentile(durations):
 # rule ordering #4: slowness assessment
 # corresponds to slowness RulesRejects counter
 self.logger.debug('Skipped by slowness percentile')
 self.stats.rejects.slowness += 1
 continue
 
 if not self.pass_by_max_slowness_percentile(durations):
 # rule ordering #5: maximum allowed slowness assessment
 # corresponds to max_slowness RulesRejects counter
 self.logger.debug('Skipped by max slowness percentile')
 self.stats.rejects.max_slowness += 1
 continue
 
 # the URL has passed all computational checks
 # from here we already need username
 if domain_owner is None:
 domain_info = self.gets_domaininfo(domain_data.domain_name)
 if domain_info is None:
 # this generally indicates "Domain does not exist"
 # error, we should skip such URLs
 self.logger.debug('Skipped by unavailable domain info')
 self.stats.rejects.no_domain += 1
 continue
 else:
 domain_owner = domain_info.user
 
 yield URL(url,
 avg_duration=int(np.mean(durations)),
 total_slow_reqs=sum(data['url_slow_reqs']),
 total_reqs=sum(data['url_total_reqs']),
 max_duration=max(durations),
 belongs_to=domain_data.domain_name,
 belongs_to_user=domain_owner)
 
 def urls_selected(self,
 stats_collected: Optional[dict] = None) -> Iterator[URL]:
 """
 From selected by computed thresholds URLs take those for which:
 - autotracing enabled
 - nginx disabled
 ORDER OF RULES MUST NOT BE CHANGED: IT AFFECTS STATISTICS COUNTERS
 """
 # fill disabled users list, initially empty
 self.fill_in_disabled_users()
 
 for url in self.urls_computationally_filtered():
 if url.belongs_to_user in self.disabled_users:
 # rule ordering #6: skip users for whom autotracing is disabled
 # corresponds to disabled RulesRejects counter
 self.logger.debug('Skipped: autotracing is disabled for %s',
 url.belongs_to_user)
 self.stats.rejects.disabled += 1
 continue
 if self.nginx_is_enabled(url.belongs_to_user):
 # rule ordering #7: skip url the owner of which uses nginx
 # corresponds to nginx RulesRejects counter
 self.logger.debug('Skipped: nginx is enabled for %s',
 url.belongs_to_user)
 self.stats.rejects.nginx += 1
 continue
 yield url
 
 def urls_scheduled(self) -> Iterator[URL]:
 """
 Schedule autotracing by sorted list taking into account the limits:
 - no same task for 10 days
 - limit per server
 - limit per domain
 ORDER OF RULES MUST NOT BE CHANGED: IT AFFECTS STATISTICS COUNTERS
 Return resulting list of URLs scheduled for auto tracing
 """
 # initial limit counters
 general_tasks_counter = 0
 tasks_counter_per_domain = defaultdict(int)
 # sort selected URLs
 sorted_urls = self.urls_sorted()
 self.logger.debug('Sorted scheduled list %s', sorted_urls)
 # get task list and filter out all old tasks in one turn
 self.tasks_list = self.excludes_old_tasks()
 per_server_smart_limit = self.per_server_limit_recalculated()
 for url in sorted_urls:
 if self.exclude_thesame_urls(url.uri):
 # rule ordering #8: limit of days for url (min_retracing_interval)
 # corresponds to frequency RulesRejects counter
 self.stats.rejects.frequency += 1
 continue
 if general_tasks_counter < per_server_smart_limit:
 if tasks_counter_per_domain[url.belongs_to] < self.per_domain_limit:
 general_tasks_counter += 1
 tasks_counter_per_domain[url.belongs_to] += 1
 yield url
 else:
 # rule ordering #9: domain limit is hit
 # corresponds to domain_limit RulesRejects counter
 self.logger.debug('Skipped URL %s by domain limit (%s)',
 url.uri,
 self.per_domain_limit)
 self.stats.rejects.domain_limit += 1
 else:
 # rule ordering #10: server limit is hit
 # corresponds to server_limit RulesRejects counter
 self.logger.debug('Skipped URL %s by server limit (%s)',
 url.uri,
 per_server_smart_limit)
 self.stats.rejects.server_limit += 1
 
 def urls_sorted(self) -> List[URL]:
 """
 Sort URLs by total number of requests first
 and by average duration second
 """
 first_serie = sorted(list(self.urls_selected()),
 key=lambda u: u.avg_duration, reverse=True)
 return sorted(first_serie, key=lambda u: u.total_reqs,
 reverse=True)
 
 def fill_in_disabled_users(self) -> None:
 """
 Fill internal list of disabled users
 """
 self.disabled_users = disabled_users()
 
 def save_iteration_stats(self) -> None:
 """
 Save collected statistics for current iteration to file
 """
 if self.stats is not None:
 try:
 with open(autotracing_stats_file, 'w') as stats_file:
 json.dump(asdict(self.stats), stats_file)
 except OSError as e:
 self.logger.warning(
 'Unable to save iteration stats to file: %s', str(e))
 
 def load_iteration_stats(self) -> dict:
 """
 Load statistics for latest iteration from file
 """
 try:
 with open(autotracing_stats_file) as stats_file:
 _data = json.load(stats_file)
 stat_data = Stats(_data['rules_version'],
 _data['urls_processed'],
 _data['urls_selected'],
 RulesRejects(**_data['rejects']))
 except (OSError, json.JSONDecodeError, KeyError):
 stat_data = Stats(self.rules_version)
 return asdict(stat_data)
 
 def get_stats(self) -> dict:
 """"""
 # load saved stats
 stats_loaded = self.load_iteration_stats()
 stats_loaded.update(dict(
 status=status()[0],
 disabled_users_quantity=len(disabled_users())
 ))
 return stats_loaded
 
 
 if __name__ == "__main__":
 sentry_init()
 logging.basicConfig(filename='auto_tracer_standalone.log',
 level=logging.DEBUG)
 t = AutoTracer()
 t()
 
 |