| Viewing file:  appengine.py (10.78 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
"""This module provides a pool manager that uses Google App Engine's
 `URLFetch Service <https://cloud.google.com/appengine/docs/python/urlfetch>`_.
 
 Example usage::
 
 from pip._vendor.urllib3 import PoolManager
 from pip._vendor.urllib3.contrib.appengine import AppEngineManager, is_appengine_sandbox
 
 if is_appengine_sandbox():
 # AppEngineManager uses AppEngine's URLFetch API behind the scenes
 http = AppEngineManager()
 else:
 # PoolManager uses a socket-level API behind the scenes
 http = PoolManager()
 
 r = http.request('GET', 'https://google.com/')
 
 There are `limitations <https://cloud.google.com/appengine/docs/python/\
 urlfetch/#Python_Quotas_and_limits>`_ to the URLFetch service and it may not be
 the best choice for your application. There are three options for using
 urllib3 on Google App Engine:
 
 1. You can use :class:`AppEngineManager` with URLFetch. URLFetch is
 cost-effective in many circumstances as long as your usage is within the
 limitations.
 2. You can use a normal :class:`~urllib3.PoolManager` by enabling sockets.
 Sockets also have `limitations and restrictions
 <https://cloud.google.com/appengine/docs/python/sockets/\
 #limitations-and-restrictions>`_ and have a lower free quota than URLFetch.
 To use sockets, be sure to specify the following in your ``app.yaml``::
 
 env_variables:
 GAE_USE_SOCKETS_HTTPLIB : 'true'
 
 3. If you are using `App Engine Flexible
 <https://cloud.google.com/appengine/docs/flexible/>`_, you can use the standard
 :class:`PoolManager` without any configuration or special environment variables.
 """
 
 from __future__ import absolute_import
 
 import io
 import logging
 import warnings
 
 from ..exceptions import (
 HTTPError,
 HTTPWarning,
 MaxRetryError,
 ProtocolError,
 SSLError,
 TimeoutError,
 )
 from ..packages.six.moves.urllib.parse import urljoin
 from ..request import RequestMethods
 from ..response import HTTPResponse
 from ..util.retry import Retry
 from ..util.timeout import Timeout
 from . import _appengine_environ
 
 try:
 from google.appengine.api import urlfetch
 except ImportError:
 urlfetch = None
 
 
 log = logging.getLogger(__name__)
 
 
 class AppEnginePlatformWarning(HTTPWarning):
 pass
 
 
 class AppEnginePlatformError(HTTPError):
 pass
 
 
 class AppEngineManager(RequestMethods):
 """
 Connection manager for Google App Engine sandbox applications.
 
 This manager uses the URLFetch service directly instead of using the
 emulated httplib, and is subject to URLFetch limitations as described in
 the App Engine documentation `here
 <https://cloud.google.com/appengine/docs/python/urlfetch>`_.
 
 Notably it will raise an :class:`AppEnginePlatformError` if:
 * URLFetch is not available.
 * If you attempt to use this on App Engine Flexible, as full socket
 support is available.
 * If a request size is more than 10 megabytes.
 * If a response size is more than 32 megabytes.
 * If you use an unsupported request method such as OPTIONS.
 
 Beyond those cases, it will raise normal urllib3 errors.
 """
 
 def __init__(
 self,
 headers=None,
 retries=None,
 validate_certificate=True,
 urlfetch_retries=True,
 ):
 if not urlfetch:
 raise AppEnginePlatformError(
 "URLFetch is not available in this environment."
 )
 
 warnings.warn(
 "urllib3 is using URLFetch on Google App Engine sandbox instead "
 "of sockets. To use sockets directly instead of URLFetch see "
 "https://urllib3.readthedocs.io/en/1.26.x/reference/urllib3.contrib.html.",
 AppEnginePlatformWarning,
 )
 
 RequestMethods.__init__(self, headers)
 self.validate_certificate = validate_certificate
 self.urlfetch_retries = urlfetch_retries
 
 self.retries = retries or Retry.DEFAULT
 
 def __enter__(self):
 return self
 
 def __exit__(self, exc_type, exc_val, exc_tb):
 # Return False to re-raise any potential exceptions
 return False
 
 def urlopen(
 self,
 method,
 url,
 body=None,
 headers=None,
 retries=None,
 redirect=True,
 timeout=Timeout.DEFAULT_TIMEOUT,
 **response_kw
 ):
 
 retries = self._get_retries(retries, redirect)
 
 try:
 follow_redirects = redirect and retries.redirect != 0 and retries.total
 response = urlfetch.fetch(
 url,
 payload=body,
 method=method,
 headers=headers or {},
 allow_truncated=False,
 follow_redirects=self.urlfetch_retries and follow_redirects,
 deadline=self._get_absolute_timeout(timeout),
 validate_certificate=self.validate_certificate,
 )
 except urlfetch.DeadlineExceededError as e:
 raise TimeoutError(self, e)
 
 except urlfetch.InvalidURLError as e:
 if "too large" in str(e):
 raise AppEnginePlatformError(
 "URLFetch request too large, URLFetch only "
 "supports requests up to 10mb in size.",
 e,
 )
 raise ProtocolError(e)
 
 except urlfetch.DownloadError as e:
 if "Too many redirects" in str(e):
 raise MaxRetryError(self, url, reason=e)
 raise ProtocolError(e)
 
 except urlfetch.ResponseTooLargeError as e:
 raise AppEnginePlatformError(
 "URLFetch response too large, URLFetch only supports"
 "responses up to 32mb in size.",
 e,
 )
 
 except urlfetch.SSLCertificateError as e:
 raise SSLError(e)
 
 except urlfetch.InvalidMethodError as e:
 raise AppEnginePlatformError(
 "URLFetch does not support method: %s" % method, e
 )
 
 http_response = self._urlfetch_response_to_http_response(
 response, retries=retries, **response_kw
 )
 
 # Handle redirect?
 redirect_location = redirect and http_response.get_redirect_location()
 if redirect_location:
 # Check for redirect response
 if self.urlfetch_retries and retries.raise_on_redirect:
 raise MaxRetryError(self, url, "too many redirects")
 else:
 if http_response.status == 303:
 method = "GET"
 
 try:
 retries = retries.increment(
 method, url, response=http_response, _pool=self
 )
 except MaxRetryError:
 if retries.raise_on_redirect:
 raise MaxRetryError(self, url, "too many redirects")
 return http_response
 
 retries.sleep_for_retry(http_response)
 log.debug("Redirecting %s -> %s", url, redirect_location)
 redirect_url = urljoin(url, redirect_location)
 return self.urlopen(
 method,
 redirect_url,
 body,
 headers,
 retries=retries,
 redirect=redirect,
 timeout=timeout,
 **response_kw
 )
 
 # Check if we should retry the HTTP response.
 has_retry_after = bool(http_response.headers.get("Retry-After"))
 if retries.is_retry(method, http_response.status, has_retry_after):
 retries = retries.increment(method, url, response=http_response, _pool=self)
 log.debug("Retry: %s", url)
 retries.sleep(http_response)
 return self.urlopen(
 method,
 url,
 body=body,
 headers=headers,
 retries=retries,
 redirect=redirect,
 timeout=timeout,
 **response_kw
 )
 
 return http_response
 
 def _urlfetch_response_to_http_response(self, urlfetch_resp, **response_kw):
 
 if is_prod_appengine():
 # Production GAE handles deflate encoding automatically, but does
 # not remove the encoding header.
 content_encoding = urlfetch_resp.headers.get("content-encoding")
 
 if content_encoding == "deflate":
 del urlfetch_resp.headers["content-encoding"]
 
 transfer_encoding = urlfetch_resp.headers.get("transfer-encoding")
 # We have a full response's content,
 # so let's make sure we don't report ourselves as chunked data.
 if transfer_encoding == "chunked":
 encodings = transfer_encoding.split(",")
 encodings.remove("chunked")
 urlfetch_resp.headers["transfer-encoding"] = ",".join(encodings)
 
 original_response = HTTPResponse(
 # In order for decoding to work, we must present the content as
 # a file-like object.
 body=io.BytesIO(urlfetch_resp.content),
 msg=urlfetch_resp.header_msg,
 headers=urlfetch_resp.headers,
 status=urlfetch_resp.status_code,
 **response_kw
 )
 
 return HTTPResponse(
 body=io.BytesIO(urlfetch_resp.content),
 headers=urlfetch_resp.headers,
 status=urlfetch_resp.status_code,
 original_response=original_response,
 **response_kw
 )
 
 def _get_absolute_timeout(self, timeout):
 if timeout is Timeout.DEFAULT_TIMEOUT:
 return None  # Defer to URLFetch's default.
 if isinstance(timeout, Timeout):
 if timeout._read is not None or timeout._connect is not None:
 warnings.warn(
 "URLFetch does not support granular timeout settings, "
 "reverting to total or default URLFetch timeout.",
 AppEnginePlatformWarning,
 )
 return timeout.total
 return timeout
 
 def _get_retries(self, retries, redirect):
 if not isinstance(retries, Retry):
 retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
 
 if retries.connect or retries.read or retries.redirect:
 warnings.warn(
 "URLFetch only supports total retries and does not "
 "recognize connect, read, or redirect retry parameters.",
 AppEnginePlatformWarning,
 )
 
 return retries
 
 
 # Alias methods from _appengine_environ to maintain public API interface.
 
 is_appengine = _appengine_environ.is_appengine
 is_appengine_sandbox = _appengine_environ.is_appengine_sandbox
 is_local_appengine = _appengine_environ.is_local_appengine
 is_prod_appengine = _appengine_environ.is_prod_appengine
 is_prod_appengine_mvms = _appengine_environ.is_prod_appengine_mvms
 
 |