| Viewing file:  utils.py (1.63 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 """Utility stuff which is not directly related to the plugin domain itself."""""
 import functools
 import itertools
 from typing import Any, Callable, Generator, Iterator, Iterable, TypeVar
 
 from ._logs import logger
 
 _T = TypeVar('_T')
 
 
 # NOTE(vlebedev): Taken from https://docs.python.org/3/library/itertools.html#itertools.batched
 # TODO(vlebedev): Remove once codebase is updated to Python >= 3.12
 def batched(iterable: Iterable[_T], n: int) -> Iterator[tuple[_T, ...]]:
 """batched('ABCDEFG', 3) --> ABC DEF G
 
 Batch data into tuples of length n. The last batch may be shorter."""
 
 if n < 1:
 raise ValueError('n must be at least one')
 it = iter(iterable)
 while batch := tuple(itertools.islice(it, n)):
 yield batch
 
 
 def repeast_last(upstream: Iterable[_T]) -> Iterator[_T]:
 for item in upstream:
 last = item
 yield item
 while True:
 try:
 yield last
 except NameError:
 return
 
 
 def bootstrap_gen(func: Callable[..., Generator[Any, Any, Any]]):
 @functools.wraps(func)
 def wrapper(*args, **kwargs):
 g = func(*args, **kwargs)
 g.send(None)
 return g
 return wrapper
 
 
 def log_exceptions(func):
 @functools.wraps(func)
 def wrapper(*args, **kwargs):
 try:
 return func(*args, **kwargs)
 except Exception:
 logger.exception('Function %s failed!', func.__name__)
 raise
 return wrapper
 
 |