| Viewing file:  cleanup.py (2.04 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 import contextlib
 import time
 from datetime import timedelta
 from typing import Generator
 from threading import Event
 
 import sqlalchemy as sa
 import sqlalchemy.exc
 
 from lvestats.orm import bursting_events_table
 
 from ..common import Timestamp
 from .._logs import logger
 from .base import thread_running
 
 
 def cleanup_old_events(
 engine: sa.engine.Engine,
 server_id: str,
 cutoff: Timestamp,
 ) -> None:
 stmt = sa.delete(bursting_events_table).where(sa.and_(
 bursting_events_table.c.timestamp <= cutoff,
 bursting_events_table.c.server_id == server_id,
 ))
 engine.execute(stmt)
 
 
 @contextlib.contextmanager
 def cleanup_running(
 engine: sa.engine.Engine,
 server_id: str,
 cleanup_interval: timedelta,
 history_window: timedelta,
 run_period: timedelta = timedelta(seconds=5),
 fail_fast: bool = True,
 ) -> Generator[None, None, None]:
 def main(terminate: Event):
 # FIXME(vlebedev): It will take  ~`dump_period` in the worst case for thread to respond to termination request.
 #                  Loop more frequently?
 prev_db_write_time = 0.0
 while not terminate.is_set():
 now = time.time()
 if (now - prev_db_write_time) > cleanup_interval.total_seconds():
 cutoff = Timestamp(int(now - history_window.total_seconds()))
 try:
 cleanup_old_events(engine, server_id, cutoff)
 except sqlalchemy.exc.DBAPIError as e:
 if fail_fast:
 raise e
 logger.error('Failed to cleanup bursting events from DB!', exc_info=e)
 else:
 prev_db_write_time = now
 time.sleep(run_period.total_seconds())
 logger.debug('Stopping events cleanup thread.')
 
 with thread_running('bursting-cleanup', main):
 yield
 
 |