| Viewing file:  profiler.py (10.3 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8
 #
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import enum
 import os
 import contextlib
 import gc
 import time
 import statistics
 import typing
 from pathlib import Path
 from contextlib import ExitStack
 from collections import defaultdict
 from enum import StrEnum
 from dataclasses import dataclass
 from typing import NamedTuple, Generator, Callable, TypeAlias, ContextManager, Self, Sequence, Iterable, Protocol
 
 import sqlalchemy as sa
 
 import psutil
 import pytest
 from _pytest.terminal import TerminalReporter
 
 _db_metadata = sa.MetaData()
 _measurements_table = sa.Table(
 'measurements',
 _db_metadata,
 sa.Column('sequence_id', sa.String, primary_key=True),
 sa.Column('timestamp', sa.Float, primary_key=True),
 sa.Column('cpu_usage', sa.Float, nullable=False),
 sa.Column('run_time_seconds', sa.Float, nullable=False),
 sa.Column('memory_allocated_bytes', sa.Integer, nullable=False),
 sa.Column('total_memory_bytes', sa.Integer, nullable=False),
 )
 
 
 class Measurement(NamedTuple):
 timestamp: float
 cpu_usage: float
 run_time_seconds: float
 memory_allocated_bytes: int
 total_memory_bytes: int
 
 def __str__(self) -> str:
 memory_allocations_mb = _bytes_to_mb(self.memory_allocated_bytes)
 total_memory_mb = _bytes_to_mb(self.total_memory_bytes)
 return '\n'.join([
 f'Time             : {self.run_time_seconds} sec',
 f'CPU usage        : {self.cpu_usage:.2f}%',
 f'Memory allocated : {memory_allocations_mb:.2f} mb',
 f'Total memory     : {total_memory_mb:.2f} mb',
 ])
 
 
 assert {c.name for c in list(_measurements_table.columns)}.issuperset(Measurement._fields)
 
 
 @dataclass
 class Statistic:
 @classmethod
 def from_data(cls, data: Sequence[float]) -> Self:
 return Statistic(
 mean=statistics.mean(data),
 std_dev=statistics.stdev(data) if len(data) > 1 else 0,
 max_value=max(data),
 min_value=min(data)
 )
 
 mean: float
 std_dev: float
 max_value: float
 min_value: float
 
 
 @dataclass(frozen=True, slots=True)
 class StatisticSummarySection:
 @classmethod
 def from_statistic(cls, title: str, statistic: Statistic, units: str) -> Self:
 return cls(
 title=title,
 mean=f'{statistic.mean:.2f}{units}',
 std_dev=f'{statistic.std_dev:.2f}{units}',
 max_value=f'{statistic.max_value:.2f}{units}',
 min_value=f'{statistic.min_value:.2f}{units}',
 )
 
 @classmethod
 def empty(cls, title: str) -> Self:
 return cls(
 title=title,
 mean='N/A',
 std_dev='N/A',
 max_value='N/A',
 min_value='N/A',
 )
 
 title: str
 mean: str
 std_dev: str
 max_value: str
 min_value: str
 
 
 @enum.unique
 class _SummarySection(StrEnum):
 CPU_USAGE = 'CPU Usage'
 RUN_TIME = 'Run Time'
 MEMORY_ALLOCATED = 'Memory Allocated'
 TOTAL_MEMORY = 'Total Memory'
 
 
 def _create_summary(sections: Iterable[StatisticSummarySection]) -> str:
 summary = ['Measurement Statistics Summary']
 summary.append('----------------------------')
 for section in sections:
 summary.append(section.title)
 summary.append(f'  Mean: {section.mean}')
 summary.append(f'  Std Dev: {section.std_dev}')
 summary.append(f'  Max: {section.max_value}')
 summary.append(f'  Min: {section.min_value}')
 
 return "\n".join(summary)
 
 
 def _measurements_to_summary(measurements: Iterable[Measurement]) -> str:
 title_to_data = defaultdict[_SummarySection, list[float]](list)
 for measurement in measurements:
 title_to_data[_SummarySection.CPU_USAGE].append(measurement.cpu_usage)
 title_to_data[_SummarySection.RUN_TIME].append(measurement.run_time_seconds)
 title_to_data[_SummarySection.MEMORY_ALLOCATED].append(_bytes_to_mb(measurement.memory_allocated_bytes))
 title_to_data[_SummarySection.TOTAL_MEMORY].append(_bytes_to_mb(measurement.total_memory_bytes))
 
 sections = [StatisticSummarySection.from_statistic(
 title=title,
 statistic=Statistic.from_data(data),
 units={
 _SummarySection.CPU_USAGE: '%',
 _SummarySection.RUN_TIME: ' sec',
 _SummarySection.MEMORY_ALLOCATED: ' mb',
 _SummarySection.TOTAL_MEMORY: ' mb',
 }[title],
 ) for title, data in title_to_data.items()]
 return _create_summary(sections)
 
 
 _empty_summary = _create_summary(StatisticSummarySection.empty(s.name) for s in _SummarySection)
 
 
 Profiled: TypeAlias = Callable[[], ContextManager[None]]
 
 
 def _get_uss_memory(process: psutil.Process) -> int:
 # NOTE(vlebedev): On the background of why USS is used, have a look at
 #                 https://gmpy.dev/blog/2016/real-process-memory-and-environ-in-python
 key = 'memory_full_info'
 result = process.as_dict(attrs=[key])[key].uss
 return result
 
 
 def _bytes_to_mb(bytes_: int) -> float:
 return bytes_ / 1000000
 
 
 class _Profiler:
 def __init__(self, measurements: list[Measurement]) -> None:
 self._pid = os.getpid()
 self._measurements = measurements
 
 @contextlib.contextmanager
 def __call__(self) -> Generator[None, None, None]:
 process = psutil.Process(self._pid)
 
 gc.disable()
 start_time = time.perf_counter()
 memory_before = _get_uss_memory(process)
 process.cpu_percent()
 try:
 yield
 finally:
 end_time = time.perf_counter()
 memory_after = _get_uss_memory(process)
 cpu_usage_after = process.cpu_percent()
 memory_delta_bytes = memory_after - memory_before
 measurement = Measurement(
 timestamp=time.time(),
 cpu_usage=cpu_usage_after,
 run_time_seconds=end_time - start_time,
 memory_allocated_bytes=memory_delta_bytes,
 total_memory_bytes=memory_after,
 )
 self._measurements.append(measurement)
 gc.enable()
 gc.collect()
 
 
 class _WriteMeasurements(Protocol):
 def __call__(self, measurements: Iterable[Measurement]) -> None:
 ...
 
 
 _MEASUREMENTS_KEY = 'measurements'
 _MEASUREMENTS_ENABLED_FLAG = '--with-measurements'
 _SAVE_MEASUREMENTS_FLAG = '--save-measurements'
 
 
 class PytestProfilerPlugin:
 def pytest_configure(self, config: pytest.Config):
 if not config.getoption(_MEASUREMENTS_ENABLED_FLAG):
 @self._fixture(scope='function', name='profiled')
 def create_profiler_stub() -> Profiled:
 @contextlib.contextmanager
 def stub() -> Generator[None, None, None]:
 yield
 return stub
 return
 
 if config.getoption(_SAVE_MEASUREMENTS_FLAG):
 @self._fixture(scope='function', name='_write_measurements')
 def create_measurements_writer(
 request: pytest.FixtureRequest,
 ) -> Generator[_WriteMeasurements, None, None]:
 db_file = Path(request.path).parent / 'measurements.db'
 sequence_id = request.node.nodeid
 
 with ExitStack() as deffer:
 engine = sa.create_engine(f'sqlite+pysqlite:///{db_file}', echo=__debug__)
 deffer.callback(engine.dispose)
 
 conn = deffer.enter_context(engine.connect())
 
 _db_metadata.create_all(conn)
 
 def write_measurements(measurements: Iterable[Measurement]) -> None:
 stmt = _measurements_table.insert(None).values([{
 'sequence_id': sequence_id,
 **m._asdict(),
 } for m in measurements])
 conn.execute(stmt)
 
 yield write_measurements
 else:
 @self._fixture(scope='function', name='_write_measurements')
 def create_measurements_writer_stub() -> _WriteMeasurements:
 def stub(measurements: Iterable[Measurement]) -> None:
 # NOTE(vlebedev): Make pylance happy by "using" all arguments.
 del measurements
 return stub
 
 @self._fixture(scope='function', name='profiled')
 def profiler_created(
 record_property: Callable[[str, object], None],
 _write_measurements: _WriteMeasurements,
 ) -> Generator[Profiled, None, None]:
 measurements = []
 record_property(_MEASUREMENTS_KEY, measurements)
 yield _Profiler(measurements)
 _write_measurements(measurements)
 
 def pytest_addoption(self, parser: pytest.Parser):
 parser.addoption(
 _MEASUREMENTS_ENABLED_FLAG,
 action='store_true',
 default=False,
 help='Enable measurements collection during each test',
 )
 parser.addoption(
 _SAVE_MEASUREMENTS_FLAG,
 action='store_true',
 default=False,
 help='Save measurements to the database',
 )
 
 def pytest_report_teststatus(self, report: pytest.TestReport, config: pytest.Config):
 if not (config.getoption(_MEASUREMENTS_ENABLED_FLAG) and config.option.verbose >= 1):
 return
 if report.when != 'teardown':
 return
 
 tr: TerminalReporter = config.pluginmanager.get_plugin('terminalreporter')
 if tr is None:
 return
 
 try:
 measurements = typing.cast(
 list[Measurement],
 next(v for k, v in report.user_properties if k == _MEASUREMENTS_KEY),
 )
 except StopIteration:
 return
 
 try:
 summary = _measurements_to_summary(measurements)
 except ValueError:
 summary = _empty_summary
 
 tr.ensure_newline()
 tr.write_line(summary)
 
 def _fixture(self, *args, **kwargs):
 def wrapper(func):
 fixture = pytest.fixture(*args, **kwargs)(func)
 # NOTE(vlebedev): Without assigning fixture to some property,
 #                 pytest won't be able to find and use it.
 setattr(self, kwargs.get('name', func.__name__), fixture)
 return fixture
 return wrapper
 
 |