| Viewing file:  stat_sender.py (7.9 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# -*- coding: utf-8 -*-
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 
 """
 This module contains SSA classes for sending e-mails with report
 and sending reports to ClickHouse
 """
 __package__ = 'ssa.modules'
 
 import json
 import logging
 import subprocess
 from email.message import EmailMessage
 from functools import partial
 from typing import Optional, Any
 from socket import gethostname
 
 from clcommon.cpapi import get_admin_email
 from requests import Session, Response
 from requests.adapters import HTTPAdapter
 from requests.exceptions import RequestException
 from requests.packages.urllib3.util.retry import Retry
 
 from .common import Common
 from .decision_maker import DecisionMaker
 from ..internal.constants import stat_server
 from ..internal.exceptions import SSAError
 from ..internal.mailer import Mailer, render_report_table
 from ..internal.utils import read_sys_id, sentry_init, duration_cast, \
 format_date
 
 
 class StatisticsSender(Common):
 """
 Send report to ClickHouse and over e-mails
 """
 
 def __init__(self, ch_endpoint: str = f'https://{stat_server}/api/clos-ssa'):
 super().__init__()
 self.logger = logging.getLogger('stat_sender')
 self.logger.info('StatisticsSender enabled: %s', __package__)
 self.sys_id = read_sys_id()
 self.ch_endpoint = ch_endpoint
 self.mail_sender = Mailer()
 
 retry_conf = Retry(total=3,
 allowed_methods=frozenset(['POST']),
 status_forcelist=frozenset([502, 503, 504]),
 backoff_factor=3)  # sleeps 0s, 6s, 18s
 adapter = HTTPAdapter(max_retries=retry_conf)
 self.session = Session()
 self.session.mount('https://', adapter)
 self.session.request = partial(self.session.request, timeout=10)
 
 def send(self, report: dict = None) -> None:
 """
 Send given report over e-mail
 If no report given, get it from DecisionMaker API
 """
 if report is None:
 report = DecisionMaker().get_json_report()
 
 if self.summary_notification_enabled:
 self.email_report(report)
 self.clickhouse_report(report)
 
 def email_report(self, report_view: dict) -> Optional[EmailMessage]:
 """
 Create and send e-mail with report
 """
 mail_to = self.get_mail_recipient()
 
 if mail_to and report_view.get('domains'):
 report_table, mail = render_report_table(report_view)
 msg = self.mail_sender._message(
 recipient=mail_to,
 template='ssa_report',
 date=format_date(report_view['date']),
 hostname=gethostname(),
 html=mail
 )
 msg.add_attachment(json.dumps(report_view),
 subtype='json',
 filename=f"report_{report_view['date']}.json")
 msg.add_attachment(report_table,
 subtype='html',
 filename=f"report_{report_view['date']}.html")
 
 self.mail_sender._send(msg)
 
 return msg
 
 def get_mail_recipient(self) -> Optional[str]:
 """
 Retrieve a recipient's e-mail:
 1. get address from a wmt-api utility
 2. if command failed or address is empty, get address of server admin
 """
 try:
 # get_admin_email could return '', None, or throw unexpected errors
 return self.wmt_api_report_email() or get_admin_email()
 except Exception as e:
 self.logger.error('get_admin_email failed with: %s', str(e))
 
 def wmt_api_report_email(self) -> Optional[str]:
 """
 Retrieve a recipient's e-mail address from WMT API
 """
 _util = 'wmt-api'
 try:
 api_response = subprocess.run(
 [f'/usr/share/web-monitoring-tool/wmtbin/{_util}',
 '--config-get'],
 check=True, text=True,
 capture_output=True).stdout.strip()
 except (subprocess.CalledProcessError, AttributeError, OSError,
 ValueError) as e:
 self.logger.error('wmt-api utility failed: %s', str(e))
 return
 
 try:
 return json.loads(api_response).get('config').get('report_email')
 except json.JSONDecodeError as e:
 self.logger.error('wmt-api returned invalid json: %s',
 str(e))
 except AttributeError:
 self.logger.error('wmt-api returned unexpected response: %s',
 api_response)
 
 def clickhouse_report(self, report_view: dict) -> bool:
 """
 Send report to ClickHouse
 """
 if report_view.get('domains'):
 self.logger.info('Sending POST request to %s',
 self.ch_endpoint)
 try:
 resp = self.session.post(self.ch_endpoint, json=self._ch_pack(
 self.clickhouse_format(report_view)))
 except RequestException as e:
 self.logger.error('POST failed with %s', e,
 extra={'endpoint': self.ch_endpoint})
 raise SSAError(
 f'Failed to POST data to SSA API server: {str(e)}') from e
 return self._process_response(resp)
 else:
 self.logger.info('Report is empty, not sending to ClickHouse')
 return False
 
 @staticmethod
 def _ch_pack(value: Any) -> dict:
 """
 Pack given value into data field of a dict
 """
 return dict(data=value)
 
 def clickhouse_format(self, original_report: dict) -> list:
 """
 Format local report for sending to ClickHouse
 (required structures differ)
 """
 ch_report = list()
 for domain in original_report.get('domains'):
 ch_report.append({
 'system_id': self.sys_id,
 'domain': domain.get('name'),
 'count_slow_urls': domain.get('slow_urls'),
 'count_slow_requests': domain.get('slow_reqs'),
 'total_requests': domain.get('total_reqs'),
 'details': [
 {
 "url": u.get('name'),
 "count_requests": u.get('reqs_num'),
 "avg_duration": duration_cast(
 u.get('average_duration')),
 "correlation": float(u.get('correlation', 0))
 } for u in domain.get('urls')
 ]
 })
 return ch_report
 
 def _process_response(self, response: Response) -> bool:
 """
 Check received response
 :param response: a requests.Response object
 :return: True in case of success, False otherwise
 """
 if not response.ok:
 self.logger.error('Unable to connect to server with %s:%s',
 response.status_code, response.reason,
 extra={'resp_text': response.text})
 return False
 else:
 self.logger.info('[%s:%s] Response received %s',
 response.status_code, response.reason,
 response.url)
 
 result = response.json()
 if result['status'] != 'ok':
 self.logger.error('Received response with status %s',
 result['status'], extra={'response': result})
 return False
 
 self.logger.info('Sent to ClickHouse successfully')
 return True
 
 
 if __name__ == "__main__":
 sentry_init()
 logging.basicConfig(filename='stat_sender_standalone.log',
 level=logging.INFO)
 try:
 StatisticsSender().send()
 except SSAError as exc:
 print(exc)
 raise SystemExit(1)
 
 |