| Viewing file:  notify.py (1.64 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# coding=utf-8#
 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
 #
 # Licensed under CLOUD LINUX LICENSE AGREEMENT
 # http://cloudlinux.com/docs/LICENSE.TXT
 from typing import Callable, Protocol
 
 
 class Signal(Protocol):
 def register(self, listener: Callable) -> Callable:
 ...
 
 # def unregister(self, listener: Callable) -> Callable:
 #     ...
 
 
 class Emitter(Signal):
 # NOTE(vlebedev): registering a listener will keep it alive (i.e. not being garbage collected)
 #                 until it's explicitly unregistered.
 
 def __init__(self):
 self._listeners = set()
 
 def register(self, listener: Callable):
 if not callable(listener):
 raise ValueError("Listener must be callable")
 self._listeners.add(listener)
 return listener
 
 # TODO(vlebedev): Might be problems with references to e.g. class methods.
 #                 It's not that obvious how to handle them properly as they are transient/ephemeral.
 #                 For more info have a look at https://docs.python.org/3/library/weakref.html#weakref.WeakMethod
 # def unregister(self, listener: Callable):
 #     self._listeners.discard(listener)
 #     return listener
 
 def __call__(self, *args, **kwargs) -> None:
 exceptions = []
 for listener in tuple(self._listeners):
 if listener is None:
 continue
 try:
 listener(*args, **kwargs)
 except Exception as e:
 exceptions.append(e)
 
 if exceptions:
 raise ExceptionGroup("Exceptions occurred in listeners", exceptions)
 
 |