| Viewing file:  tornadoweb.py (2.09 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# Copyright 2017 Elisey Zanko#
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at
 #
 # http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
 import sys
 import typing
 
 from pip._vendor.tenacity import BaseRetrying
 from pip._vendor.tenacity import DoAttempt
 from pip._vendor.tenacity import DoSleep
 from pip._vendor.tenacity import RetryCallState
 
 from tornado import gen
 
 if typing.TYPE_CHECKING:
 from tornado.concurrent import Future
 
 _RetValT = typing.TypeVar("_RetValT")
 
 
 class TornadoRetrying(BaseRetrying):
 def __init__(self, sleep: "typing.Callable[[float], Future[None]]" = gen.sleep, **kwargs: typing.Any) -> None:
 super().__init__(**kwargs)
 self.sleep = sleep
 
 @gen.coroutine
 def __call__(  # type: ignore  # Change signature from supertype
 self,
 fn: "typing.Callable[..., typing.Union[typing.Generator[typing.Any, typing.Any, _RetValT], Future[_RetValT]]]",
 *args: typing.Any,
 **kwargs: typing.Any,
 ) -> "typing.Generator[typing.Any, typing.Any, _RetValT]":
 self.begin()
 
 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
 while True:
 do = self.iter(retry_state=retry_state)
 if isinstance(do, DoAttempt):
 try:
 result = yield fn(*args, **kwargs)
 except BaseException:  # noqa: B902
 retry_state.set_exception(sys.exc_info())
 else:
 retry_state.set_result(result)
 elif isinstance(do, DoSleep):
 retry_state.prepare_for_next_attempt()
 yield self.sleep(do)
 else:
 raise gen.Return(do)
 
 |