| Viewing file:  _inputstream.py (31.77 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from __future__ import absolute_import, division, unicode_literals
 from pip._vendor.six import text_type, binary_type
 from pip._vendor.six.moves import http_client, urllib
 
 import codecs
 import re
 
 from pip._vendor import webencodings
 
 from .constants import EOF, spaceCharacters, asciiLetters, asciiUppercase
 from .constants import ReparseException
 from . import _utils
 
 from io import StringIO
 
 try:
 from io import BytesIO
 except ImportError:
 BytesIO = StringIO
 
 # Non-unicode versions of constants for use in the pre-parser
 spaceCharactersBytes = frozenset([item.encode("ascii") for item in spaceCharacters])
 asciiLettersBytes = frozenset([item.encode("ascii") for item in asciiLetters])
 asciiUppercaseBytes = frozenset([item.encode("ascii") for item in asciiUppercase])
 spacesAngleBrackets = spaceCharactersBytes | frozenset([b">", b"<"])
 
 
 invalid_unicode_no_surrogate = "[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]"  # noqa
 
 if _utils.supports_lone_surrogates:
 # Use one extra step of indirection and create surrogates with
 # eval. Not using this indirection would introduce an illegal
 # unicode literal on platforms not supporting such lone
 # surrogates.
 assert invalid_unicode_no_surrogate[-1] == "]" and invalid_unicode_no_surrogate.count("]") == 1
 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate[:-1] +
 eval('"\\uD800-\\uDFFF"') +  # pylint:disable=eval-used
 "]")
 else:
 invalid_unicode_re = re.compile(invalid_unicode_no_surrogate)
 
 non_bmp_invalid_codepoints = set([0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE,
 0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF,
 0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE,
 0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF,
 0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE,
 0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF,
 0x10FFFE, 0x10FFFF])
 
 ascii_punctuation_re = re.compile("[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005B-\u0060\u007B-\u007E]")
 
 # Cache for charsUntil()
 charsUntilRegEx = {}
 
 
 class BufferedStream(object):
 """Buffering for streams that do not have buffering of their own
 
 The buffer is implemented as a list of chunks on the assumption that
 joining many strings will be slow since it is O(n**2)
 """
 
 def __init__(self, stream):
 self.stream = stream
 self.buffer = []
 self.position = [-1, 0]  # chunk number, offset
 
 def tell(self):
 pos = 0
 for chunk in self.buffer[:self.position[0]]:
 pos += len(chunk)
 pos += self.position[1]
 return pos
 
 def seek(self, pos):
 assert pos <= self._bufferedBytes()
 offset = pos
 i = 0
 while len(self.buffer[i]) < offset:
 offset -= len(self.buffer[i])
 i += 1
 self.position = [i, offset]
 
 def read(self, bytes):
 if not self.buffer:
 return self._readStream(bytes)
 elif (self.position[0] == len(self.buffer) and
 self.position[1] == len(self.buffer[-1])):
 return self._readStream(bytes)
 else:
 return self._readFromBuffer(bytes)
 
 def _bufferedBytes(self):
 return sum([len(item) for item in self.buffer])
 
 def _readStream(self, bytes):
 data = self.stream.read(bytes)
 self.buffer.append(data)
 self.position[0] += 1
 self.position[1] = len(data)
 return data
 
 def _readFromBuffer(self, bytes):
 remainingBytes = bytes
 rv = []
 bufferIndex = self.position[0]
 bufferOffset = self.position[1]
 while bufferIndex < len(self.buffer) and remainingBytes != 0:
 assert remainingBytes > 0
 bufferedData = self.buffer[bufferIndex]
 
 if remainingBytes <= len(bufferedData) - bufferOffset:
 bytesToRead = remainingBytes
 self.position = [bufferIndex, bufferOffset + bytesToRead]
 else:
 bytesToRead = len(bufferedData) - bufferOffset
 self.position = [bufferIndex, len(bufferedData)]
 bufferIndex += 1
 rv.append(bufferedData[bufferOffset:bufferOffset + bytesToRead])
 remainingBytes -= bytesToRead
 
 bufferOffset = 0
 
 if remainingBytes:
 rv.append(self._readStream(remainingBytes))
 
 return b"".join(rv)
 
 
 def HTMLInputStream(source, **kwargs):
 # Work around Python bug #20007: read(0) closes the connection.
 # http://bugs.python.org/issue20007
 if (isinstance(source, http_client.HTTPResponse) or
 # Also check for addinfourl wrapping HTTPResponse
 (isinstance(source, urllib.response.addbase) and
 isinstance(source.fp, http_client.HTTPResponse))):
 isUnicode = False
 elif hasattr(source, "read"):
 isUnicode = isinstance(source.read(0), text_type)
 else:
 isUnicode = isinstance(source, text_type)
 
 if isUnicode:
 encodings = [x for x in kwargs if x.endswith("_encoding")]
 if encodings:
 raise TypeError("Cannot set an encoding with a unicode input, set %r" % encodings)
 
 return HTMLUnicodeInputStream(source, **kwargs)
 else:
 return HTMLBinaryInputStream(source, **kwargs)
 
 
 class HTMLUnicodeInputStream(object):
 """Provides a unicode stream of characters to the HTMLTokenizer.
 
 This class takes care of character encoding and removing or replacing
 incorrect byte-sequences and also provides column and line tracking.
 
 """
 
 _defaultChunkSize = 10240
 
 def __init__(self, source):
 """Initialises the HTMLInputStream.
 
 HTMLInputStream(source, [encoding]) -> Normalized stream from source
 for use by html5lib.
 
 source can be either a file-object, local filename or a string.
 
 The optional encoding parameter must be a string that indicates
 the encoding.  If specified, that encoding will be used,
 regardless of any BOM or later declaration (such as in a meta
 element)
 
 """
 
 if not _utils.supports_lone_surrogates:
 # Such platforms will have already checked for such
 # surrogate errors, so no need to do this checking.
 self.reportCharacterErrors = None
 elif len("\U0010FFFF") == 1:
 self.reportCharacterErrors = self.characterErrorsUCS4
 else:
 self.reportCharacterErrors = self.characterErrorsUCS2
 
 # List of where new lines occur
 self.newLines = [0]
 
 self.charEncoding = (lookupEncoding("utf-8"), "certain")
 self.dataStream = self.openStream(source)
 
 self.reset()
 
 def reset(self):
 self.chunk = ""
 self.chunkSize = 0
 self.chunkOffset = 0
 self.errors = []
 
 # number of (complete) lines in previous chunks
 self.prevNumLines = 0
 # number of columns in the last line of the previous chunk
 self.prevNumCols = 0
 
 # Deal with CR LF and surrogates split over chunk boundaries
 self._bufferedCharacter = None
 
 def openStream(self, source):
 """Produces a file object from source.
 
 source can be either a file object, local filename or a string.
 
 """
 # Already a file object
 if hasattr(source, 'read'):
 stream = source
 else:
 stream = StringIO(source)
 
 return stream
 
 def _position(self, offset):
 chunk = self.chunk
 nLines = chunk.count('\n', 0, offset)
 positionLine = self.prevNumLines + nLines
 lastLinePos = chunk.rfind('\n', 0, offset)
 if lastLinePos == -1:
 positionColumn = self.prevNumCols + offset
 else:
 positionColumn = offset - (lastLinePos + 1)
 return (positionLine, positionColumn)
 
 def position(self):
 """Returns (line, col) of the current position in the stream."""
 line, col = self._position(self.chunkOffset)
 return (line + 1, col)
 
 def char(self):
 """ Read one character from the stream or queue if available. Return
 EOF when EOF is reached.
 """
 # Read a new chunk from the input stream if necessary
 if self.chunkOffset >= self.chunkSize:
 if not self.readChunk():
 return EOF
 
 chunkOffset = self.chunkOffset
 char = self.chunk[chunkOffset]
 self.chunkOffset = chunkOffset + 1
 
 return char
 
 def readChunk(self, chunkSize=None):
 if chunkSize is None:
 chunkSize = self._defaultChunkSize
 
 self.prevNumLines, self.prevNumCols = self._position(self.chunkSize)
 
 self.chunk = ""
 self.chunkSize = 0
 self.chunkOffset = 0
 
 data = self.dataStream.read(chunkSize)
 
 # Deal with CR LF and surrogates broken across chunks
 if self._bufferedCharacter:
 data = self._bufferedCharacter + data
 self._bufferedCharacter = None
 elif not data:
 # We have no more data, bye-bye stream
 return False
 
 if len(data) > 1:
 lastv = ord(data[-1])
 if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF:
 self._bufferedCharacter = data[-1]
 data = data[:-1]
 
 if self.reportCharacterErrors:
 self.reportCharacterErrors(data)
 
 # Replace invalid characters
 data = data.replace("\r\n", "\n")
 data = data.replace("\r", "\n")
 
 self.chunk = data
 self.chunkSize = len(data)
 
 return True
 
 def characterErrorsUCS4(self, data):
 for _ in range(len(invalid_unicode_re.findall(data))):
 self.errors.append("invalid-codepoint")
 
 def characterErrorsUCS2(self, data):
 # Someone picked the wrong compile option
 # You lose
 skip = False
 for match in invalid_unicode_re.finditer(data):
 if skip:
 continue
 codepoint = ord(match.group())
 pos = match.start()
 # Pretty sure there should be endianness issues here
 if _utils.isSurrogatePair(data[pos:pos + 2]):
 # We have a surrogate pair!
 char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2])
 if char_val in non_bmp_invalid_codepoints:
 self.errors.append("invalid-codepoint")
 skip = True
 elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
 pos == len(data) - 1):
 self.errors.append("invalid-codepoint")
 else:
 skip = False
 self.errors.append("invalid-codepoint")
 
 def charsUntil(self, characters, opposite=False):
 """ Returns a string of characters from the stream up to but not
 including any character in 'characters' or EOF. 'characters' must be
 a container that supports the 'in' method and iteration over its
 characters.
 """
 
 # Use a cache of regexps to find the required characters
 try:
 chars = charsUntilRegEx[(characters, opposite)]
 except KeyError:
 if __debug__:
 for c in characters:
 assert(ord(c) < 128)
 regex = "".join(["\\x%02x" % ord(c) for c in characters])
 if not opposite:
 regex = "^%s" % regex
 chars = charsUntilRegEx[(characters, opposite)] = re.compile("[%s]+" % regex)
 
 rv = []
 
 while True:
 # Find the longest matching prefix
 m = chars.match(self.chunk, self.chunkOffset)
 if m is None:
 # If nothing matched, and it wasn't because we ran out of chunk,
 # then stop
 if self.chunkOffset != self.chunkSize:
 break
 else:
 end = m.end()
 # If not the whole chunk matched, return everything
 # up to the part that didn't match
 if end != self.chunkSize:
 rv.append(self.chunk[self.chunkOffset:end])
 self.chunkOffset = end
 break
 # If the whole remainder of the chunk matched,
 # use it all and read the next chunk
 rv.append(self.chunk[self.chunkOffset:])
 if not self.readChunk():
 # Reached EOF
 break
 
 r = "".join(rv)
 return r
 
 def unget(self, char):
 # Only one character is allowed to be ungotten at once - it must
 # be consumed again before any further call to unget
 if char is not None:
 if self.chunkOffset == 0:
 # unget is called quite rarely, so it's a good idea to do
 # more work here if it saves a bit of work in the frequently
 # called char and charsUntil.
 # So, just prepend the ungotten character onto the current
 # chunk:
 self.chunk = char + self.chunk
 self.chunkSize += 1
 else:
 self.chunkOffset -= 1
 assert self.chunk[self.chunkOffset] == char
 
 
 class HTMLBinaryInputStream(HTMLUnicodeInputStream):
 """Provides a unicode stream of characters to the HTMLTokenizer.
 
 This class takes care of character encoding and removing or replacing
 incorrect byte-sequences and also provides column and line tracking.
 
 """
 
 def __init__(self, source, override_encoding=None, transport_encoding=None,
 same_origin_parent_encoding=None, likely_encoding=None,
 default_encoding="windows-1252", useChardet=True):
 """Initialises the HTMLInputStream.
 
 HTMLInputStream(source, [encoding]) -> Normalized stream from source
 for use by html5lib.
 
 source can be either a file-object, local filename or a string.
 
 The optional encoding parameter must be a string that indicates
 the encoding.  If specified, that encoding will be used,
 regardless of any BOM or later declaration (such as in a meta
 element)
 
 """
 # Raw Stream - for unicode objects this will encode to utf-8 and set
 #              self.charEncoding as appropriate
 self.rawStream = self.openStream(source)
 
 HTMLUnicodeInputStream.__init__(self, self.rawStream)
 
 # Encoding Information
 # Number of bytes to use when looking for a meta element with
 # encoding information
 self.numBytesMeta = 1024
 # Number of bytes to use when using detecting encoding using chardet
 self.numBytesChardet = 100
 # Things from args
 self.override_encoding = override_encoding
 self.transport_encoding = transport_encoding
 self.same_origin_parent_encoding = same_origin_parent_encoding
 self.likely_encoding = likely_encoding
 self.default_encoding = default_encoding
 
 # Determine encoding
 self.charEncoding = self.determineEncoding(useChardet)
 assert self.charEncoding[0] is not None
 
 # Call superclass
 self.reset()
 
 def reset(self):
 self.dataStream = self.charEncoding[0].codec_info.streamreader(self.rawStream, 'replace')
 HTMLUnicodeInputStream.reset(self)
 
 def openStream(self, source):
 """Produces a file object from source.
 
 source can be either a file object, local filename or a string.
 
 """
 # Already a file object
 if hasattr(source, 'read'):
 stream = source
 else:
 stream = BytesIO(source)
 
 try:
 stream.seek(stream.tell())
 except:  # pylint:disable=bare-except
 stream = BufferedStream(stream)
 
 return stream
 
 def determineEncoding(self, chardet=True):
 # BOMs take precedence over everything
 # This will also read past the BOM if present
 charEncoding = self.detectBOM(), "certain"
 if charEncoding[0] is not None:
 return charEncoding
 
 # If we've been overriden, we've been overriden
 charEncoding = lookupEncoding(self.override_encoding), "certain"
 if charEncoding[0] is not None:
 return charEncoding
 
 # Now check the transport layer
 charEncoding = lookupEncoding(self.transport_encoding), "certain"
 if charEncoding[0] is not None:
 return charEncoding
 
 # Look for meta elements with encoding information
 charEncoding = self.detectEncodingMeta(), "tentative"
 if charEncoding[0] is not None:
 return charEncoding
 
 # Parent document encoding
 charEncoding = lookupEncoding(self.same_origin_parent_encoding), "tentative"
 if charEncoding[0] is not None and not charEncoding[0].name.startswith("utf-16"):
 return charEncoding
 
 # "likely" encoding
 charEncoding = lookupEncoding(self.likely_encoding), "tentative"
 if charEncoding[0] is not None:
 return charEncoding
 
 # Guess with chardet, if available
 if chardet:
 try:
 from chardet.universaldetector import UniversalDetector
 except ImportError:
 pass
 else:
 buffers = []
 detector = UniversalDetector()
 while not detector.done:
 buffer = self.rawStream.read(self.numBytesChardet)
 assert isinstance(buffer, bytes)
 if not buffer:
 break
 buffers.append(buffer)
 detector.feed(buffer)
 detector.close()
 encoding = lookupEncoding(detector.result['encoding'])
 self.rawStream.seek(0)
 if encoding is not None:
 return encoding, "tentative"
 
 # Try the default encoding
 charEncoding = lookupEncoding(self.default_encoding), "tentative"
 if charEncoding[0] is not None:
 return charEncoding
 
 # Fallback to html5lib's default if even that hasn't worked
 return lookupEncoding("windows-1252"), "tentative"
 
 def changeEncoding(self, newEncoding):
 assert self.charEncoding[1] != "certain"
 newEncoding = lookupEncoding(newEncoding)
 if newEncoding is None:
 return
 if newEncoding.name in ("utf-16be", "utf-16le"):
 newEncoding = lookupEncoding("utf-8")
 assert newEncoding is not None
 elif newEncoding == self.charEncoding[0]:
 self.charEncoding = (self.charEncoding[0], "certain")
 else:
 self.rawStream.seek(0)
 self.charEncoding = (newEncoding, "certain")
 self.reset()
 raise ReparseException("Encoding changed from %s to %s" % (self.charEncoding[0], newEncoding))
 
 def detectBOM(self):
 """Attempts to detect at BOM at the start of the stream. If
 an encoding can be determined from the BOM return the name of the
 encoding otherwise return None"""
 bomDict = {
 codecs.BOM_UTF8: 'utf-8',
 codecs.BOM_UTF16_LE: 'utf-16le', codecs.BOM_UTF16_BE: 'utf-16be',
 codecs.BOM_UTF32_LE: 'utf-32le', codecs.BOM_UTF32_BE: 'utf-32be'
 }
 
 # Go to beginning of file and read in 4 bytes
 string = self.rawStream.read(4)
 assert isinstance(string, bytes)
 
 # Try detecting the BOM using bytes from the string
 encoding = bomDict.get(string[:3])         # UTF-8
 seek = 3
 if not encoding:
 # Need to detect UTF-32 before UTF-16
 encoding = bomDict.get(string)         # UTF-32
 seek = 4
 if not encoding:
 encoding = bomDict.get(string[:2])  # UTF-16
 seek = 2
 
 # Set the read position past the BOM if one was found, otherwise
 # set it to the start of the stream
 if encoding:
 self.rawStream.seek(seek)
 return lookupEncoding(encoding)
 else:
 self.rawStream.seek(0)
 return None
 
 def detectEncodingMeta(self):
 """Report the encoding declared by the meta element
 """
 buffer = self.rawStream.read(self.numBytesMeta)
 assert isinstance(buffer, bytes)
 parser = EncodingParser(buffer)
 self.rawStream.seek(0)
 encoding = parser.getEncoding()
 
 if encoding is not None and encoding.name in ("utf-16be", "utf-16le"):
 encoding = lookupEncoding("utf-8")
 
 return encoding
 
 
 class EncodingBytes(bytes):
 """String-like object with an associated position and various extra methods
 If the position is ever greater than the string length then an exception is
 raised"""
 def __new__(self, value):
 assert isinstance(value, bytes)
 return bytes.__new__(self, value.lower())
 
 def __init__(self, value):
 # pylint:disable=unused-argument
 self._position = -1
 
 def __iter__(self):
 return self
 
 def __next__(self):
 p = self._position = self._position + 1
 if p >= len(self):
 raise StopIteration
 elif p < 0:
 raise TypeError
 return self[p:p + 1]
 
 def next(self):
 # Py2 compat
 return self.__next__()
 
 def previous(self):
 p = self._position
 if p >= len(self):
 raise StopIteration
 elif p < 0:
 raise TypeError
 self._position = p = p - 1
 return self[p:p + 1]
 
 def setPosition(self, position):
 if self._position >= len(self):
 raise StopIteration
 self._position = position
 
 def getPosition(self):
 if self._position >= len(self):
 raise StopIteration
 if self._position >= 0:
 return self._position
 else:
 return None
 
 position = property(getPosition, setPosition)
 
 def getCurrentByte(self):
 return self[self.position:self.position + 1]
 
 currentByte = property(getCurrentByte)
 
 def skip(self, chars=spaceCharactersBytes):
 """Skip past a list of characters"""
 p = self.position               # use property for the error-checking
 while p < len(self):
 c = self[p:p + 1]
 if c not in chars:
 self._position = p
 return c
 p += 1
 self._position = p
 return None
 
 def skipUntil(self, chars):
 p = self.position
 while p < len(self):
 c = self[p:p + 1]
 if c in chars:
 self._position = p
 return c
 p += 1
 self._position = p
 return None
 
 def matchBytes(self, bytes):
 """Look for a sequence of bytes at the start of a string. If the bytes
 are found return True and advance the position to the byte after the
 match. Otherwise return False and leave the position alone"""
 p = self.position
 data = self[p:p + len(bytes)]
 rv = data.startswith(bytes)
 if rv:
 self.position += len(bytes)
 return rv
 
 def jumpTo(self, bytes):
 """Look for the next sequence of bytes matching a given sequence. If
 a match is found advance the position to the last byte of the match"""
 newPosition = self[self.position:].find(bytes)
 if newPosition > -1:
 # XXX: This is ugly, but I can't see a nicer way to fix this.
 if self._position == -1:
 self._position = 0
 self._position += (newPosition + len(bytes) - 1)
 return True
 else:
 raise StopIteration
 
 
 class EncodingParser(object):
 """Mini parser for detecting character encoding from meta elements"""
 
 def __init__(self, data):
 """string - the data to work on for encoding detection"""
 self.data = EncodingBytes(data)
 self.encoding = None
 
 def getEncoding(self):
 methodDispatch = (
 (b"<!--", self.handleComment),
 (b"<meta", self.handleMeta),
 (b"</", self.handlePossibleEndTag),
 (b"<!", self.handleOther),
 (b"<?", self.handleOther),
 (b"<", self.handlePossibleStartTag))
 for _ in self.data:
 keepParsing = True
 for key, method in methodDispatch:
 if self.data.matchBytes(key):
 try:
 keepParsing = method()
 break
 except StopIteration:
 keepParsing = False
 break
 if not keepParsing:
 break
 
 return self.encoding
 
 def handleComment(self):
 """Skip over comments"""
 return self.data.jumpTo(b"-->")
 
 def handleMeta(self):
 if self.data.currentByte not in spaceCharactersBytes:
 # if we have <meta not followed by a space so just keep going
 return True
 # We have a valid meta element we want to search for attributes
 hasPragma = False
 pendingEncoding = None
 while True:
 # Try to find the next attribute after the current position
 attr = self.getAttribute()
 if attr is None:
 return True
 else:
 if attr[0] == b"http-equiv":
 hasPragma = attr[1] == b"content-type"
 if hasPragma and pendingEncoding is not None:
 self.encoding = pendingEncoding
 return False
 elif attr[0] == b"charset":
 tentativeEncoding = attr[1]
 codec = lookupEncoding(tentativeEncoding)
 if codec is not None:
 self.encoding = codec
 return False
 elif attr[0] == b"content":
 contentParser = ContentAttrParser(EncodingBytes(attr[1]))
 tentativeEncoding = contentParser.parse()
 if tentativeEncoding is not None:
 codec = lookupEncoding(tentativeEncoding)
 if codec is not None:
 if hasPragma:
 self.encoding = codec
 return False
 else:
 pendingEncoding = codec
 
 def handlePossibleStartTag(self):
 return self.handlePossibleTag(False)
 
 def handlePossibleEndTag(self):
 next(self.data)
 return self.handlePossibleTag(True)
 
 def handlePossibleTag(self, endTag):
 data = self.data
 if data.currentByte not in asciiLettersBytes:
 # If the next byte is not an ascii letter either ignore this
 # fragment (possible start tag case) or treat it according to
 # handleOther
 if endTag:
 data.previous()
 self.handleOther()
 return True
 
 c = data.skipUntil(spacesAngleBrackets)
 if c == b"<":
 # return to the first step in the overall "two step" algorithm
 # reprocessing the < byte
 data.previous()
 else:
 # Read all attributes
 attr = self.getAttribute()
 while attr is not None:
 attr = self.getAttribute()
 return True
 
 def handleOther(self):
 return self.data.jumpTo(b">")
 
 def getAttribute(self):
 """Return a name,value pair for the next attribute in the stream,
 if one is found, or None"""
 data = self.data
 # Step 1 (skip chars)
 c = data.skip(spaceCharactersBytes | frozenset([b"/"]))
 assert c is None or len(c) == 1
 # Step 2
 if c in (b">", None):
 return None
 # Step 3
 attrName = []
 attrValue = []
 # Step 4 attribute name
 while True:
 if c == b"=" and attrName:
 break
 elif c in spaceCharactersBytes:
 # Step 6!
 c = data.skip()
 break
 elif c in (b"/", b">"):
 return b"".join(attrName), b""
 elif c in asciiUppercaseBytes:
 attrName.append(c.lower())
 elif c is None:
 return None
 else:
 attrName.append(c)
 # Step 5
 c = next(data)
 # Step 7
 if c != b"=":
 data.previous()
 return b"".join(attrName), b""
 # Step 8
 next(data)
 # Step 9
 c = data.skip()
 # Step 10
 if c in (b"'", b'"'):
 # 10.1
 quoteChar = c
 while True:
 # 10.2
 c = next(data)
 # 10.3
 if c == quoteChar:
 next(data)
 return b"".join(attrName), b"".join(attrValue)
 # 10.4
 elif c in asciiUppercaseBytes:
 attrValue.append(c.lower())
 # 10.5
 else:
 attrValue.append(c)
 elif c == b">":
 return b"".join(attrName), b""
 elif c in asciiUppercaseBytes:
 attrValue.append(c.lower())
 elif c is None:
 return None
 else:
 attrValue.append(c)
 # Step 11
 while True:
 c = next(data)
 if c in spacesAngleBrackets:
 return b"".join(attrName), b"".join(attrValue)
 elif c in asciiUppercaseBytes:
 attrValue.append(c.lower())
 elif c is None:
 return None
 else:
 attrValue.append(c)
 
 
 class ContentAttrParser(object):
 def __init__(self, data):
 assert isinstance(data, bytes)
 self.data = data
 
 def parse(self):
 try:
 # Check if the attr name is charset
 # otherwise return
 self.data.jumpTo(b"charset")
 self.data.position += 1
 self.data.skip()
 if not self.data.currentByte == b"=":
 # If there is no = sign keep looking for attrs
 return None
 self.data.position += 1
 self.data.skip()
 # Look for an encoding between matching quote marks
 if self.data.currentByte in (b'"', b"'"):
 quoteMark = self.data.currentByte
 self.data.position += 1
 oldPosition = self.data.position
 if self.data.jumpTo(quoteMark):
 return self.data[oldPosition:self.data.position]
 else:
 return None
 else:
 # Unquoted value
 oldPosition = self.data.position
 try:
 self.data.skipUntil(spaceCharactersBytes)
 return self.data[oldPosition:self.data.position]
 except StopIteration:
 # Return the whole remaining value
 return self.data[oldPosition:]
 except StopIteration:
 return None
 
 
 def lookupEncoding(encoding):
 """Return the python codec name corresponding to an encoding or None if the
 string doesn't correspond to a valid encoding."""
 if isinstance(encoding, binary_type):
 try:
 encoding = encoding.decode("ascii")
 except UnicodeDecodeError:
 return None
 
 if encoding is not None:
 try:
 return webencodings.lookup(encoding)
 except AttributeError:
 return None
 else:
 return None
 
 |