| Viewing file:  parser.py (55.39 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
 
 """Code parsing for coverage.py."""
 
 from __future__ import annotations
 
 import ast
 import collections
 import os
 import re
 import sys
 import token
 import tokenize
 
 from types import CodeType
 from typing import (
 cast, Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple,
 )
 
 from coverage import env
 from coverage.bytecode import code_objects
 from coverage.debug import short_stack
 from coverage.exceptions import NoSource, NotPython
 from coverage.misc import join_regex, nice_pair
 from coverage.phystokens import generate_tokens
 from coverage.types import Protocol, TArc, TLineNo
 
 
 class PythonParser:
 """Parse code to find executable lines, excluded lines, etc.
 
 This information is all based on static analysis: no code execution is
 involved.
 
 """
 def __init__(
 self,
 text: Optional[str] = None,
 filename: Optional[str] = None,
 exclude: Optional[str] = None,
 ) -> None:
 """
 Source can be provided as `text`, the text itself, or `filename`, from
 which the text will be read.  Excluded lines are those that match
 `exclude`, a regex string.
 
 """
 assert text or filename, "PythonParser needs either text or filename"
 self.filename = filename or "<code>"
 if text is not None:
 self.text: str = text
 else:
 from coverage.python import get_python_source
 try:
 self.text = get_python_source(self.filename)
 except OSError as err:
 raise NoSource(f"No source for code: '{self.filename}': {err}") from err
 
 self.exclude = exclude
 
 # The text lines of the parsed code.
 self.lines: List[str] = self.text.split("\n")
 
 # The normalized line numbers of the statements in the code. Exclusions
 # are taken into account, and statements are adjusted to their first
 # lines.
 self.statements: Set[TLineNo] = set()
 
 # The normalized line numbers of the excluded lines in the code,
 # adjusted to their first lines.
 self.excluded: Set[TLineNo] = set()
 
 # The raw_* attributes are only used in this class, and in
 # lab/parser.py to show how this class is working.
 
 # The line numbers that start statements, as reported by the line
 # number table in the bytecode.
 self.raw_statements: Set[TLineNo] = set()
 
 # The raw line numbers of excluded lines of code, as marked by pragmas.
 self.raw_excluded: Set[TLineNo] = set()
 
 # The line numbers of class definitions.
 self.raw_classdefs: Set[TLineNo] = set()
 
 # The line numbers of docstring lines.
 self.raw_docstrings: Set[TLineNo] = set()
 
 # Internal detail, used by lab/parser.py.
 self.show_tokens = False
 
 # A dict mapping line numbers to lexical statement starts for
 # multi-line statements.
 self._multiline: Dict[TLineNo, TLineNo] = {}
 
 # Lazily-created arc data, and missing arc descriptions.
 self._all_arcs: Optional[Set[TArc]] = None
 self._missing_arc_fragments: Optional[TArcFragments] = None
 
 def lines_matching(self, *regexes: str) -> Set[TLineNo]:
 """Find the lines matching one of a list of regexes.
 
 Returns a set of line numbers, the lines that contain a match for one
 of the regexes in `regexes`.  The entire line needn't match, just a
 part of it.
 
 """
 combined = join_regex(regexes)
 regex_c = re.compile(combined)
 matches = set()
 for i, ltext in enumerate(self.lines, start=1):
 if regex_c.search(ltext):
 matches.add(i)
 return matches
 
 def _raw_parse(self) -> None:
 """Parse the source to find the interesting facts about its lines.
 
 A handful of attributes are updated.
 
 """
 # Find lines which match an exclusion pattern.
 if self.exclude:
 self.raw_excluded = self.lines_matching(self.exclude)
 
 # Tokenize, to find excluded suites, to find docstrings, and to find
 # multi-line statements.
 indent = 0
 exclude_indent = 0
 excluding = False
 excluding_decorators = False
 prev_toktype = token.INDENT
 first_line = None
 empty = True
 first_on_line = True
 nesting = 0
 
 assert self.text is not None
 tokgen = generate_tokens(self.text)
 for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
 if self.show_tokens:                # pragma: debugging
 print("%10s %5s %-20r %r" % (
 tokenize.tok_name.get(toktype, toktype),
 nice_pair((slineno, elineno)), ttext, ltext
 ))
 if toktype == token.INDENT:
 indent += 1
 elif toktype == token.DEDENT:
 indent -= 1
 elif toktype == token.NAME:
 if ttext == "class":
 # Class definitions look like branches in the bytecode, so
 # we need to exclude them.  The simplest way is to note the
 # lines with the "class" keyword.
 self.raw_classdefs.add(slineno)
 elif toktype == token.OP:
 if ttext == ":" and nesting == 0:
 should_exclude = (elineno in self.raw_excluded) or excluding_decorators
 if not excluding and should_exclude:
 # Start excluding a suite.  We trigger off of the colon
 # token so that the #pragma comment will be recognized on
 # the same line as the colon.
 self.raw_excluded.add(elineno)
 exclude_indent = indent
 excluding = True
 excluding_decorators = False
 elif ttext == "@" and first_on_line:
 # A decorator.
 if elineno in self.raw_excluded:
 excluding_decorators = True
 if excluding_decorators:
 self.raw_excluded.add(elineno)
 elif ttext in "([{":
 nesting += 1
 elif ttext in ")]}":
 nesting -= 1
 elif toktype == token.STRING and prev_toktype == token.INDENT:
 # Strings that are first on an indented line are docstrings.
 # (a trick from trace.py in the stdlib.) This works for
 # 99.9999% of cases.  For the rest (!) see:
 # http://stackoverflow.com/questions/1769332/x/1769794#1769794
 self.raw_docstrings.update(range(slineno, elineno+1))
 elif toktype == token.NEWLINE:
 if first_line is not None and elineno != first_line:    # type: ignore[unreachable]
 # We're at the end of a line, and we've ended on a
 # different line than the first line of the statement,
 # so record a multi-line range.
 for l in range(first_line, elineno+1):              # type: ignore[unreachable]
 self._multiline[l] = first_line
 first_line = None
 first_on_line = True
 
 if ttext.strip() and toktype != tokenize.COMMENT:
 # A non-white-space token.
 empty = False
 if first_line is None:
 # The token is not white space, and is the first in a statement.
 first_line = slineno
 # Check whether to end an excluded suite.
 if excluding and indent <= exclude_indent:
 excluding = False
 if excluding:
 self.raw_excluded.add(elineno)
 first_on_line = False
 
 prev_toktype = toktype
 
 # Find the starts of the executable statements.
 if not empty:
 byte_parser = ByteParser(self.text, filename=self.filename)
 self.raw_statements.update(byte_parser._find_statements())
 
 # The first line of modules can lie and say 1 always, even if the first
 # line of code is later. If so, map 1 to the actual first line of the
 # module.
 if env.PYBEHAVIOR.module_firstline_1 and self._multiline:
 self._multiline[1] = min(self.raw_statements)
 
 def first_line(self, lineno: TLineNo) -> TLineNo:
 """Return the first line number of the statement including `lineno`."""
 if lineno < 0:
 lineno = -self._multiline.get(-lineno, -lineno)
 else:
 lineno = self._multiline.get(lineno, lineno)
 return lineno
 
 def first_lines(self, linenos: Iterable[TLineNo]) -> Set[TLineNo]:
 """Map the line numbers in `linenos` to the correct first line of the
 statement.
 
 Returns a set of the first lines.
 
 """
 return {self.first_line(l) for l in linenos}
 
 def translate_lines(self, lines: Iterable[TLineNo]) -> Set[TLineNo]:
 """Implement `FileReporter.translate_lines`."""
 return self.first_lines(lines)
 
 def translate_arcs(self, arcs: Iterable[TArc]) -> Set[TArc]:
 """Implement `FileReporter.translate_arcs`."""
 return {(self.first_line(a), self.first_line(b)) for (a, b) in arcs}
 
 def parse_source(self) -> None:
 """Parse source text to find executable lines, excluded lines, etc.
 
 Sets the .excluded and .statements attributes, normalized to the first
 line of multi-line statements.
 
 """
 try:
 self._raw_parse()
 except (tokenize.TokenError, IndentationError, SyntaxError) as err:
 if hasattr(err, "lineno"):
 lineno = err.lineno         # IndentationError
 else:
 lineno = err.args[1][0]     # TokenError
 raise NotPython(
 f"Couldn't parse '{self.filename}' as Python source: " +
 f"{err.args[0]!r} at line {lineno}"
 ) from err
 
 self.excluded = self.first_lines(self.raw_excluded)
 
 ignore = self.excluded | self.raw_docstrings
 starts = self.raw_statements - ignore
 self.statements = self.first_lines(starts) - ignore
 
 def arcs(self) -> Set[TArc]:
 """Get information about the arcs available in the code.
 
 Returns a set of line number pairs.  Line numbers have been normalized
 to the first line of multi-line statements.
 
 """
 if self._all_arcs is None:
 self._analyze_ast()
 assert self._all_arcs is not None
 return self._all_arcs
 
 def _analyze_ast(self) -> None:
 """Run the AstArcAnalyzer and save its results.
 
 `_all_arcs` is the set of arcs in the code.
 
 """
 aaa = AstArcAnalyzer(self.text, self.raw_statements, self._multiline)
 aaa.analyze()
 
 self._all_arcs = set()
 for l1, l2 in aaa.arcs:
 fl1 = self.first_line(l1)
 fl2 = self.first_line(l2)
 if fl1 != fl2:
 self._all_arcs.add((fl1, fl2))
 
 self._missing_arc_fragments = aaa.missing_arc_fragments
 
 def exit_counts(self) -> Dict[TLineNo, int]:
 """Get a count of exits from that each line.
 
 Excluded lines are excluded.
 
 """
 exit_counts: Dict[TLineNo, int] = collections.defaultdict(int)
 for l1, l2 in self.arcs():
 if l1 < 0:
 # Don't ever report -1 as a line number
 continue
 if l1 in self.excluded:
 # Don't report excluded lines as line numbers.
 continue
 if l2 in self.excluded:
 # Arcs to excluded lines shouldn't count.
 continue
 exit_counts[l1] += 1
 
 # Class definitions have one extra exit, so remove one for each:
 for l in self.raw_classdefs:
 # Ensure key is there: class definitions can include excluded lines.
 if l in exit_counts:
 exit_counts[l] -= 1
 
 return exit_counts
 
 def missing_arc_description(
 self,
 start: TLineNo,
 end: TLineNo,
 executed_arcs: Optional[Iterable[TArc]] = None,
 ) -> str:
 """Provide an English sentence describing a missing arc."""
 if self._missing_arc_fragments is None:
 self._analyze_ast()
 assert self._missing_arc_fragments is not None
 
 actual_start = start
 
 if (
 executed_arcs and
 end < 0 and end == -start and
 (end, start) not in executed_arcs and
 (end, start) in self._missing_arc_fragments
 ):
 # It's a one-line callable, and we never even started it,
 # and we have a message about not starting it.
 start, end = end, start
 
 fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)])
 
 msgs = []
 for smsg, emsg in fragment_pairs:
 if emsg is None:
 if end < 0:
 # Hmm, maybe we have a one-line callable, let's check.
 if (-end, end) in self._missing_arc_fragments:
 return self.missing_arc_description(-end, end)
 emsg = "didn't jump to the function exit"
 else:
 emsg = "didn't jump to line {lineno}"
 emsg = emsg.format(lineno=end)
 
 msg = f"line {actual_start} {emsg}"
 if smsg is not None:
 msg += f", because {smsg.format(lineno=actual_start)}"
 
 msgs.append(msg)
 
 return " or ".join(msgs)
 
 
 class ByteParser:
 """Parse bytecode to understand the structure of code."""
 
 def __init__(
 self,
 text: str,
 code: Optional[CodeType] = None,
 filename: Optional[str] = None,
 ) -> None:
 self.text = text
 if code is not None:
 self.code = code
 else:
 assert filename is not None
 try:
 self.code = compile(text, filename, "exec", dont_inherit=True)
 except SyntaxError as synerr:
 raise NotPython(
 "Couldn't parse '%s' as Python source: '%s' at line %d" % (
 filename, synerr.msg, synerr.lineno or 0
 )
 ) from synerr
 
 def child_parsers(self) -> Iterable[ByteParser]:
 """Iterate over all the code objects nested within this one.
 
 The iteration includes `self` as its first value.
 
 """
 return (ByteParser(self.text, code=c) for c in code_objects(self.code))
 
 def _line_numbers(self) -> Iterable[TLineNo]:
 """Yield the line numbers possible in this code object.
 
 Uses co_lnotab described in Python/compile.c to find the
 line numbers.  Produces a sequence: l0, l1, ...
 """
 if hasattr(self.code, "co_lines"):
 for _, _, line in self.code.co_lines():
 if line:
 yield line
 else:
 # Adapted from dis.py in the standard library.
 byte_increments = self.code.co_lnotab[0::2]
 line_increments = self.code.co_lnotab[1::2]
 
 last_line_num = None
 line_num = self.code.co_firstlineno
 byte_num = 0
 for byte_incr, line_incr in zip(byte_increments, line_increments):
 if byte_incr:
 if line_num != last_line_num:
 yield line_num
 last_line_num = line_num
 byte_num += byte_incr
 if env.PYBEHAVIOR.negative_lnotab and line_incr >= 0x80:
 line_incr -= 0x100
 line_num += line_incr
 if line_num != last_line_num:
 yield line_num
 
 def _find_statements(self) -> Iterable[TLineNo]:
 """Find the statements in `self.code`.
 
 Produce a sequence of line numbers that start statements.  Recurses
 into all code objects reachable from `self.code`.
 
 """
 for bp in self.child_parsers():
 # Get all of the lineno information from this code.
 yield from bp._line_numbers()
 
 
 #
 # AST analysis
 #
 
 class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
 """The information needed to start an arc.
 
 `lineno` is the line number the arc starts from.
 
 `cause` is an English text fragment used as the `startmsg` for
 AstArcAnalyzer.missing_arc_fragments.  It will be used to describe why an
 arc wasn't executed, so should fit well into a sentence of the form,
 "Line 17 didn't run because {cause}."  The fragment can include "{lineno}"
 to have `lineno` interpolated into it.
 
 """
 def __new__(cls, lineno: TLineNo, cause: Optional[str] = None) -> ArcStart:
 return super().__new__(cls, lineno, cause)
 
 
 class TAddArcFn(Protocol):
 """The type for AstArcAnalyzer.add_arc()."""
 def __call__(
 self,
 start: TLineNo,
 end: TLineNo,
 smsg: Optional[str] = None,
 emsg: Optional[str] = None,
 ) -> None:
 ...
 
 TArcFragments = Dict[TArc, List[Tuple[Optional[str], Optional[str]]]]
 
 class Block:
 """
 Blocks need to handle various exiting statements in their own ways.
 
 All of these methods take a list of exits, and a callable `add_arc`
 function that they can use to add arcs if needed.  They return True if the
 exits are handled, or False if the search should continue up the block
 stack.
 """
 # pylint: disable=unused-argument
 def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 """Process break exits."""
 # Because break can only appear in loops, and most subclasses
 # implement process_break_exits, this function is never reached.
 raise AssertionError
 
 def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 """Process continue exits."""
 # Because continue can only appear in loops, and most subclasses
 # implement process_continue_exits, this function is never reached.
 raise AssertionError
 
 def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 """Process raise exits."""
 return False
 
 def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 """Process return exits."""
 return False
 
 
 class LoopBlock(Block):
 """A block on the block stack representing a `for` or `while` loop."""
 def __init__(self, start: TLineNo) -> None:
 # The line number where the loop starts.
 self.start = start
 # A set of ArcStarts, the arcs from break statements exiting this loop.
 self.break_exits: Set[ArcStart] = set()
 
 def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 self.break_exits.update(exits)
 return True
 
 def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 for xit in exits:
 add_arc(xit.lineno, self.start, xit.cause)
 return True
 
 
 class FunctionBlock(Block):
 """A block on the block stack representing a function definition."""
 def __init__(self, start: TLineNo, name: str) -> None:
 # The line number where the function starts.
 self.start = start
 # The name of the function.
 self.name = name
 
 def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 for xit in exits:
 add_arc(
 xit.lineno, -self.start, xit.cause,
 f"didn't except from function {self.name!r}",
 )
 return True
 
 def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 for xit in exits:
 add_arc(
 xit.lineno, -self.start, xit.cause,
 f"didn't return from function {self.name!r}",
 )
 return True
 
 
 class TryBlock(Block):
 """A block on the block stack representing a `try` block."""
 def __init__(self, handler_start: Optional[TLineNo], final_start: Optional[TLineNo]) -> None:
 # The line number of the first "except" handler, if any.
 self.handler_start = handler_start
 # The line number of the "finally:" clause, if any.
 self.final_start = final_start
 
 # The ArcStarts for breaks/continues/returns/raises inside the "try:"
 # that need to route through the "finally:" clause.
 self.break_from: Set[ArcStart] = set()
 self.continue_from: Set[ArcStart] = set()
 self.raise_from: Set[ArcStart] = set()
 self.return_from: Set[ArcStart] = set()
 
 def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 if self.final_start is not None:
 self.break_from.update(exits)
 return True
 return False
 
 def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 if self.final_start is not None:
 self.continue_from.update(exits)
 return True
 return False
 
 def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 if self.handler_start is not None:
 for xit in exits:
 add_arc(xit.lineno, self.handler_start, xit.cause)
 else:
 assert self.final_start is not None
 self.raise_from.update(exits)
 return True
 
 def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 if self.final_start is not None:
 self.return_from.update(exits)
 return True
 return False
 
 
 class WithBlock(Block):
 """A block on the block stack representing a `with` block."""
 def __init__(self, start: TLineNo) -> None:
 # We only ever use this block if it is needed, so that we don't have to
 # check this setting in all the methods.
 assert env.PYBEHAVIOR.exit_through_with
 
 # The line number of the with statement.
 self.start = start
 
 # The ArcStarts for breaks/continues/returns/raises inside the "with:"
 # that need to go through the with-statement while exiting.
 self.break_from: Set[ArcStart] = set()
 self.continue_from: Set[ArcStart] = set()
 self.return_from: Set[ArcStart] = set()
 
 def _process_exits(
 self,
 exits: Set[ArcStart],
 add_arc: TAddArcFn,
 from_set: Optional[Set[ArcStart]] = None,
 ) -> bool:
 """Helper to process the four kinds of exits."""
 for xit in exits:
 add_arc(xit.lineno, self.start, xit.cause)
 if from_set is not None:
 from_set.update(exits)
 return True
 
 def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 return self._process_exits(exits, add_arc, self.break_from)
 
 def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 return self._process_exits(exits, add_arc, self.continue_from)
 
 def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 return self._process_exits(exits, add_arc)
 
 def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool:
 return self._process_exits(exits, add_arc, self.return_from)
 
 
 class NodeList(ast.AST):
 """A synthetic fictitious node, containing a sequence of nodes.
 
 This is used when collapsing optimized if-statements, to represent the
 unconditional execution of one of the clauses.
 
 """
 def __init__(self, body: Sequence[ast.AST]) -> None:
 self.body = body
 self.lineno = body[0].lineno
 
 # TODO: some add_arcs methods here don't add arcs, they return them. Rename them.
 # TODO: the cause messages have too many commas.
 # TODO: Shouldn't the cause messages join with "and" instead of "or"?
 
 def _make_expression_code_method(noun: str) -> Callable[[AstArcAnalyzer, ast.AST], None]:
 """A function to make methods for expression-based callable _code_object__ methods."""
 def _code_object__expression_callable(self: AstArcAnalyzer, node: ast.AST) -> None:
 start = self.line_for_node(node)
 self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}")
 self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}")
 return _code_object__expression_callable
 
 
 class AstArcAnalyzer:
 """Analyze source text with an AST to find executable code paths."""
 
 def __init__(
 self,
 text: str,
 statements: Set[TLineNo],
 multiline: Dict[TLineNo, TLineNo],
 ) -> None:
 self.root_node = ast.parse(text)
 # TODO: I think this is happening in too many places.
 self.statements = {multiline.get(l, l) for l in statements}
 self.multiline = multiline
 
 # Turn on AST dumps with an environment variable.
 # $set_env.py: COVERAGE_AST_DUMP - Dump the AST nodes when parsing code.
 dump_ast = bool(int(os.environ.get("COVERAGE_AST_DUMP", 0)))
 
 if dump_ast:                                # pragma: debugging
 # Dump the AST so that failing tests have helpful output.
 print(f"Statements: {self.statements}")
 print(f"Multiline map: {self.multiline}")
 ast_dump(self.root_node)
 
 self.arcs: Set[TArc] = set()
 
 # A map from arc pairs to a list of pairs of sentence fragments:
 #   { (start, end): [(startmsg, endmsg), ...], }
 #
 # For an arc from line 17, they should be usable like:
 #    "Line 17 {endmsg}, because {startmsg}"
 self.missing_arc_fragments: TArcFragments = collections.defaultdict(list)
 self.block_stack: List[Block] = []
 
 # $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code.
 self.debug = bool(int(os.environ.get("COVERAGE_TRACK_ARCS", 0)))
 
 def analyze(self) -> None:
 """Examine the AST tree from `root_node` to determine possible arcs.
 
 This sets the `arcs` attribute to be a set of (from, to) line number
 pairs.
 
 """
 for node in ast.walk(self.root_node):
 node_name = node.__class__.__name__
 code_object_handler = getattr(self, "_code_object__" + node_name, None)
 if code_object_handler is not None:
 code_object_handler(node)
 
 def add_arc(
 self,
 start: TLineNo,
 end: TLineNo,
 smsg: Optional[str] = None,
 emsg: Optional[str] = None,
 ) -> None:
 """Add an arc, including message fragments to use if it is missing."""
 if self.debug:                      # pragma: debugging
 print(f"\nAdding possible arc: ({start}, {end}): {smsg!r}, {emsg!r}")
 print(short_stack(limit=10))
 self.arcs.add((start, end))
 
 if smsg is not None or emsg is not None:
 self.missing_arc_fragments[(start, end)].append((smsg, emsg))
 
 def nearest_blocks(self) -> Iterable[Block]:
 """Yield the blocks in nearest-to-farthest order."""
 return reversed(self.block_stack)
 
 def line_for_node(self, node: ast.AST) -> TLineNo:
 """What is the right line number to use for this node?
 
 This dispatches to _line__Node functions where needed.
 
 """
 node_name = node.__class__.__name__
 handler = cast(
 Optional[Callable[[ast.AST], TLineNo]],
 getattr(self, "_line__" + node_name, None)
 )
 if handler is not None:
 return handler(node)
 else:
 return node.lineno
 
 def _line_decorated(self, node: ast.FunctionDef) -> TLineNo:
 """Compute first line number for things that can be decorated (classes and functions)."""
 lineno = node.lineno
 if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator:
 if node.decorator_list:
 lineno = node.decorator_list[0].lineno
 return lineno
 
 def _line__Assign(self, node: ast.Assign) -> TLineNo:
 return self.line_for_node(node.value)
 
 _line__ClassDef = _line_decorated
 
 def _line__Dict(self, node: ast.Dict) -> TLineNo:
 if node.keys:
 if node.keys[0] is not None:
 return node.keys[0].lineno
 else:
 # Unpacked dict literals `{**{"a":1}}` have None as the key,
 # use the value in that case.
 return node.values[0].lineno
 else:
 return node.lineno
 
 _line__FunctionDef = _line_decorated
 _line__AsyncFunctionDef = _line_decorated
 
 def _line__List(self, node: ast.List) -> TLineNo:
 if node.elts:
 return self.line_for_node(node.elts[0])
 else:
 return node.lineno
 
 def _line__Module(self, node: ast.Module) -> TLineNo:
 if env.PYBEHAVIOR.module_firstline_1:
 return 1
 elif node.body:
 return self.line_for_node(node.body[0])
 else:
 # Empty modules have no line number, they always start at 1.
 return 1
 
 # The node types that just flow to the next node with no complications.
 OK_TO_DEFAULT = {
 "AnnAssign", "Assign", "Assert", "AugAssign", "Delete", "Expr", "Global",
 "Import", "ImportFrom", "Nonlocal", "Pass",
 }
 
 def add_arcs(self, node: ast.AST) -> Set[ArcStart]:
 """Add the arcs for `node`.
 
 Return a set of ArcStarts, exits from this node to the next. Because a
 node represents an entire sub-tree (including its children), the exits
 from a node can be arbitrarily complex::
 
 if something(1):
 if other(2):
 doit(3)
 else:
 doit(5)
 
 There are two exits from line 1: they start at line 3 and line 5.
 
 """
 node_name = node.__class__.__name__
 handler = cast(
 Optional[Callable[[ast.AST], Set[ArcStart]]],
 getattr(self, "_handle__" + node_name, None)
 )
 if handler is not None:
 return handler(node)
 else:
 # No handler: either it's something that's ok to default (a simple
 # statement), or it's something we overlooked.
 if env.TESTING:
 if node_name not in self.OK_TO_DEFAULT:
 raise RuntimeError(f"*** Unhandled: {node}")        # pragma: only failure
 
 # Default for simple statements: one exit from this node.
 return {ArcStart(self.line_for_node(node))}
 
 def add_body_arcs(
 self,
 body: Sequence[ast.AST],
 from_start: Optional[ArcStart] = None,
 prev_starts: Optional[Set[ArcStart]] = None
 ) -> Set[ArcStart]:
 """Add arcs for the body of a compound statement.
 
 `body` is the body node.  `from_start` is a single `ArcStart` that can
 be the previous line in flow before this body.  `prev_starts` is a set
 of ArcStarts that can be the previous line.  Only one of them should be
 given.
 
 Returns a set of ArcStarts, the exits from this body.
 
 """
 if prev_starts is None:
 assert from_start is not None
 prev_starts = {from_start}
 for body_node in body:
 lineno = self.line_for_node(body_node)
 first_line = self.multiline.get(lineno, lineno)
 if first_line not in self.statements:
 maybe_body_node = self.find_non_missing_node(body_node)
 if maybe_body_node is None:
 continue
 body_node = maybe_body_node
 lineno = self.line_for_node(body_node)
 for prev_start in prev_starts:
 self.add_arc(prev_start.lineno, lineno, prev_start.cause)
 prev_starts = self.add_arcs(body_node)
 return prev_starts
 
 def find_non_missing_node(self, node: ast.AST) -> Optional[ast.AST]:
 """Search `node` looking for a child that has not been optimized away.
 
 This might return the node you started with, or it will work recursively
 to find a child node in self.statements.
 
 Returns a node, or None if none of the node remains.
 
 """
 # This repeats work just done in add_body_arcs, but this duplication
 # means we can avoid a function call in the 99.9999% case of not
 # optimizing away statements.
 lineno = self.line_for_node(node)
 first_line = self.multiline.get(lineno, lineno)
 if first_line in self.statements:
 return node
 
 missing_fn = cast(
 Optional[Callable[[ast.AST], Optional[ast.AST]]],
 getattr(self, "_missing__" + node.__class__.__name__, None)
 )
 if missing_fn is not None:
 ret_node = missing_fn(node)
 else:
 ret_node = None
 return ret_node
 
 # Missing nodes: _missing__*
 #
 # Entire statements can be optimized away by Python. They will appear in
 # the AST, but not the bytecode.  These functions are called (by
 # find_non_missing_node) to find a node to use instead of the missing
 # node.  They can return None if the node should truly be gone.
 
 def _missing__If(self, node: ast.If) -> Optional[ast.AST]:
 # If the if-node is missing, then one of its children might still be
 # here, but not both. So return the first of the two that isn't missing.
 # Use a NodeList to hold the clauses as a single node.
 non_missing = self.find_non_missing_node(NodeList(node.body))
 if non_missing:
 return non_missing
 if node.orelse:
 return self.find_non_missing_node(NodeList(node.orelse))
 return None
 
 def _missing__NodeList(self, node: NodeList) -> Optional[ast.AST]:
 # A NodeList might be a mixture of missing and present nodes. Find the
 # ones that are present.
 non_missing_children = []
 for child in node.body:
 maybe_child = self.find_non_missing_node(child)
 if maybe_child is not None:
 non_missing_children.append(maybe_child)
 
 # Return the simplest representation of the present children.
 if not non_missing_children:
 return None
 if len(non_missing_children) == 1:
 return non_missing_children[0]
 return NodeList(non_missing_children)
 
 def _missing__While(self, node: ast.While) -> Optional[ast.AST]:
 body_nodes = self.find_non_missing_node(NodeList(node.body))
 if not body_nodes:
 return None
 # Make a synthetic While-true node.
 new_while = ast.While()
 new_while.lineno = body_nodes.lineno
 new_while.test = ast.Name()
 new_while.test.lineno = body_nodes.lineno
 new_while.test.id = "True"
 assert hasattr(body_nodes, "body")
 new_while.body = body_nodes.body
 new_while.orelse = []
 return new_while
 
 def is_constant_expr(self, node: ast.AST) -> Optional[str]:
 """Is this a compile-time constant?"""
 node_name = node.__class__.__name__
 if node_name in ["Constant", "NameConstant", "Num"]:
 return "Num"
 elif isinstance(node, ast.Name):
 if node.id in ["True", "False", "None", "__debug__"]:
 return "Name"
 return None
 
 # In the fullness of time, these might be good tests to write:
 #   while EXPR:
 #   while False:
 #   listcomps hidden deep in other expressions
 #   listcomps hidden in lists: x = [[i for i in range(10)]]
 #   nested function definitions
 
 # Exit processing: process_*_exits
 #
 # These functions process the four kinds of jump exits: break, continue,
 # raise, and return.  To figure out where an exit goes, we have to look at
 # the block stack context.  For example, a break will jump to the nearest
 # enclosing loop block, or the nearest enclosing finally block, whichever
 # is nearer.
 
 def process_break_exits(self, exits: Set[ArcStart]) -> None:
 """Add arcs due to jumps from `exits` being breaks."""
 for block in self.nearest_blocks():                         # pragma: always breaks
 if block.process_break_exits(exits, self.add_arc):
 break
 
 def process_continue_exits(self, exits: Set[ArcStart]) -> None:
 """Add arcs due to jumps from `exits` being continues."""
 for block in self.nearest_blocks():                         # pragma: always breaks
 if block.process_continue_exits(exits, self.add_arc):
 break
 
 def process_raise_exits(self, exits: Set[ArcStart]) -> None:
 """Add arcs due to jumps from `exits` being raises."""
 for block in self.nearest_blocks():
 if block.process_raise_exits(exits, self.add_arc):
 break
 
 def process_return_exits(self, exits: Set[ArcStart]) -> None:
 """Add arcs due to jumps from `exits` being returns."""
 for block in self.nearest_blocks():                         # pragma: always breaks
 if block.process_return_exits(exits, self.add_arc):
 break
 
 # Handlers: _handle__*
 #
 # Each handler deals with a specific AST node type, dispatched from
 # add_arcs.  Handlers return the set of exits from that node, and can
 # also call self.add_arc to record arcs they find.  These functions mirror
 # the Python semantics of each syntactic construct.  See the docstring
 # for add_arcs to understand the concept of exits from a node.
 #
 # Every node type that represents a statement should have a handler, or it
 # should be listed in OK_TO_DEFAULT.
 
 def _handle__Break(self, node: ast.Break) -> Set[ArcStart]:
 here = self.line_for_node(node)
 break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed")
 self.process_break_exits({break_start})
 return set()
 
 def _handle_decorated(self, node: ast.FunctionDef) -> Set[ArcStart]:
 """Add arcs for things that can be decorated (classes and functions)."""
 main_line: TLineNo = node.lineno
 last: Optional[TLineNo] = node.lineno
 decs = node.decorator_list
 if decs:
 if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator:
 last = None
 for dec_node in decs:
 dec_start = self.line_for_node(dec_node)
 if last is not None and dec_start != last:
 self.add_arc(last, dec_start)
 last = dec_start
 assert last is not None
 if env.PYBEHAVIOR.trace_decorated_def:
 self.add_arc(last, main_line)
 last = main_line
 if env.PYBEHAVIOR.trace_decorator_line_again:
 for top, bot in zip(decs, decs[1:]):
 self.add_arc(self.line_for_node(bot), self.line_for_node(top))
 self.add_arc(self.line_for_node(decs[0]), main_line)
 self.add_arc(main_line, self.line_for_node(decs[-1]))
 # The definition line may have been missed, but we should have it
 # in `self.statements`.  For some constructs, `line_for_node` is
 # not what we'd think of as the first line in the statement, so map
 # it to the first one.
 if node.body:
 body_start = self.line_for_node(node.body[0])
 body_start = self.multiline.get(body_start, body_start)
 for lineno in range(last+1, body_start):
 if lineno in self.statements:
 self.add_arc(last, lineno)
 last = lineno
 # The body is handled in collect_arcs.
 assert last is not None
 return {ArcStart(last)}
 
 _handle__ClassDef = _handle_decorated
 
 def _handle__Continue(self, node: ast.Continue) -> Set[ArcStart]:
 here = self.line_for_node(node)
 continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed")
 self.process_continue_exits({continue_start})
 return set()
 
 def _handle__For(self, node: ast.For) -> Set[ArcStart]:
 start = self.line_for_node(node.iter)
 self.block_stack.append(LoopBlock(start=start))
 from_start = ArcStart(start, cause="the loop on line {lineno} never started")
 exits = self.add_body_arcs(node.body, from_start=from_start)
 # Any exit from the body will go back to the top of the loop.
 for xit in exits:
 self.add_arc(xit.lineno, start, xit.cause)
 my_block = self.block_stack.pop()
 assert isinstance(my_block, LoopBlock)
 exits = my_block.break_exits
 from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete")
 if node.orelse:
 else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
 exits |= else_exits
 else:
 # No else clause: exit from the for line.
 exits.add(from_start)
 return exits
 
 _handle__AsyncFor = _handle__For
 
 _handle__FunctionDef = _handle_decorated
 _handle__AsyncFunctionDef = _handle_decorated
 
 def _handle__If(self, node: ast.If) -> Set[ArcStart]:
 start = self.line_for_node(node.test)
 from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
 exits = self.add_body_arcs(node.body, from_start=from_start)
 from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
 exits |= self.add_body_arcs(node.orelse, from_start=from_start)
 return exits
 
 if sys.version_info >= (3, 10):
 def _handle__Match(self, node: ast.Match) -> Set[ArcStart]:
 start = self.line_for_node(node)
 last_start = start
 exits = set()
 had_wildcard = False
 for case in node.cases:
 case_start = self.line_for_node(case.pattern)
 pattern = case.pattern
 while isinstance(pattern, ast.MatchOr):
 pattern = pattern.patterns[-1]
 if isinstance(pattern, ast.MatchAs):
 had_wildcard = True
 self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched")
 from_start = ArcStart(
 case_start,
 cause="the pattern on line {lineno} never matched",
 )
 exits |= self.add_body_arcs(case.body, from_start=from_start)
 last_start = case_start
 if not had_wildcard:
 exits.add(from_start)
 return exits
 
 def _handle__NodeList(self, node: NodeList) -> Set[ArcStart]:
 start = self.line_for_node(node)
 exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
 return exits
 
 def _handle__Raise(self, node: ast.Raise) -> Set[ArcStart]:
 here = self.line_for_node(node)
 raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
 self.process_raise_exits({raise_start})
 # `raise` statement jumps away, no exits from here.
 return set()
 
 def _handle__Return(self, node: ast.Return) -> Set[ArcStart]:
 here = self.line_for_node(node)
 return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed")
 self.process_return_exits({return_start})
 # `return` statement jumps away, no exits from here.
 return set()
 
 def _handle__Try(self, node: ast.Try) -> Set[ArcStart]:
 if node.handlers:
 handler_start = self.line_for_node(node.handlers[0])
 else:
 handler_start = None
 
 if node.finalbody:
 final_start = self.line_for_node(node.finalbody[0])
 else:
 final_start = None
 
 # This is true by virtue of Python syntax: have to have either except
 # or finally, or both.
 assert handler_start is not None or final_start is not None
 try_block = TryBlock(handler_start, final_start)
 self.block_stack.append(try_block)
 
 start = self.line_for_node(node)
 exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
 
 # We're done with the `try` body, so this block no longer handles
 # exceptions. We keep the block so the `finally` clause can pick up
 # flows from the handlers and `else` clause.
 if node.finalbody:
 try_block.handler_start = None
 if node.handlers:
 # If there are `except` clauses, then raises in the try body
 # will already jump to them.  Start this set over for raises in
 # `except` and `else`.
 try_block.raise_from = set()
 else:
 self.block_stack.pop()
 
 handler_exits: Set[ArcStart] = set()
 
 if node.handlers:
 last_handler_start: Optional[TLineNo] = None
 for handler_node in node.handlers:
 handler_start = self.line_for_node(handler_node)
 if last_handler_start is not None:
 self.add_arc(last_handler_start, handler_start)
 last_handler_start = handler_start
 from_cause = "the exception caught by line {lineno} didn't happen"
 from_start = ArcStart(handler_start, cause=from_cause)
 handler_exits |= self.add_body_arcs(handler_node.body, from_start=from_start)
 
 if node.orelse:
 exits = self.add_body_arcs(node.orelse, prev_starts=exits)
 
 exits |= handler_exits
 
 if node.finalbody:
 self.block_stack.pop()
 final_from = (                  # You can get to the `finally` clause from:
 exits |                         # the exits of the body or `else` clause,
 try_block.break_from |          # or a `break`,
 try_block.continue_from |       # or a `continue`,
 try_block.raise_from |          # or a `raise`,
 try_block.return_from           # or a `return`.
 )
 
 final_exits = self.add_body_arcs(node.finalbody, prev_starts=final_from)
 
 if try_block.break_from:
 if env.PYBEHAVIOR.finally_jumps_back:
 for break_line in try_block.break_from:
 lineno = break_line.lineno
 cause = break_line.cause.format(lineno=lineno)
 for final_exit in final_exits:
 self.add_arc(final_exit.lineno, lineno, cause)
 breaks = try_block.break_from
 else:
 breaks = self._combine_finally_starts(try_block.break_from, final_exits)
 self.process_break_exits(breaks)
 
 if try_block.continue_from:
 if env.PYBEHAVIOR.finally_jumps_back:
 for continue_line in try_block.continue_from:
 lineno = continue_line.lineno
 cause = continue_line.cause.format(lineno=lineno)
 for final_exit in final_exits:
 self.add_arc(final_exit.lineno, lineno, cause)
 continues = try_block.continue_from
 else:
 continues = self._combine_finally_starts(try_block.continue_from, final_exits)
 self.process_continue_exits(continues)
 
 if try_block.raise_from:
 self.process_raise_exits(
 self._combine_finally_starts(try_block.raise_from, final_exits)
 )
 
 if try_block.return_from:
 if env.PYBEHAVIOR.finally_jumps_back:
 for return_line in try_block.return_from:
 lineno = return_line.lineno
 cause = return_line.cause.format(lineno=lineno)
 for final_exit in final_exits:
 self.add_arc(final_exit.lineno, lineno, cause)
 returns = try_block.return_from
 else:
 returns = self._combine_finally_starts(try_block.return_from, final_exits)
 self.process_return_exits(returns)
 
 if exits:
 # The finally clause's exits are only exits for the try block
 # as a whole if the try block had some exits to begin with.
 exits = final_exits
 
 return exits
 
 def _combine_finally_starts(self, starts: Set[ArcStart], exits: Set[ArcStart]) -> Set[ArcStart]:
 """Helper for building the cause of `finally` branches.
 
 "finally" clauses might not execute their exits, and the causes could
 be due to a failure to execute any of the exits in the try block. So
 we use the causes from `starts` as the causes for `exits`.
 """
 causes = []
 for start in sorted(starts):
 if start.cause is not None:
 causes.append(start.cause.format(lineno=start.lineno))
 cause = " or ".join(causes)
 exits = {ArcStart(xit.lineno, cause) for xit in exits}
 return exits
 
 def _handle__While(self, node: ast.While) -> Set[ArcStart]:
 start = to_top = self.line_for_node(node.test)
 constant_test = self.is_constant_expr(node.test)
 top_is_body0 = False
 if constant_test:
 top_is_body0 = True
 if env.PYBEHAVIOR.keep_constant_test:
 top_is_body0 = False
 if top_is_body0:
 to_top = self.line_for_node(node.body[0])
 self.block_stack.append(LoopBlock(start=to_top))
 from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
 exits = self.add_body_arcs(node.body, from_start=from_start)
 for xit in exits:
 self.add_arc(xit.lineno, to_top, xit.cause)
 exits = set()
 my_block = self.block_stack.pop()
 assert isinstance(my_block, LoopBlock)
 exits.update(my_block.break_exits)
 from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
 if node.orelse:
 else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
 exits |= else_exits
 else:
 # No `else` clause: you can exit from the start.
 if not constant_test:
 exits.add(from_start)
 return exits
 
 def _handle__With(self, node: ast.With) -> Set[ArcStart]:
 start = self.line_for_node(node)
 if env.PYBEHAVIOR.exit_through_with:
 self.block_stack.append(WithBlock(start=start))
 exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
 if env.PYBEHAVIOR.exit_through_with:
 with_block = self.block_stack.pop()
 assert isinstance(with_block, WithBlock)
 with_exit = {ArcStart(start)}
 if exits:
 for xit in exits:
 self.add_arc(xit.lineno, start)
 exits = with_exit
 if with_block.break_from:
 self.process_break_exits(
 self._combine_finally_starts(with_block.break_from, with_exit)
 )
 if with_block.continue_from:
 self.process_continue_exits(
 self._combine_finally_starts(with_block.continue_from, with_exit)
 )
 if with_block.return_from:
 self.process_return_exits(
 self._combine_finally_starts(with_block.return_from, with_exit)
 )
 return exits
 
 _handle__AsyncWith = _handle__With
 
 # Code object dispatchers: _code_object__*
 #
 # These methods are used by analyze() as the start of the analysis.
 # There is one for each construct with a code object.
 
 def _code_object__Module(self, node: ast.Module) -> None:
 start = self.line_for_node(node)
 if node.body:
 exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
 for xit in exits:
 self.add_arc(xit.lineno, -start, xit.cause, "didn't exit the module")
 else:
 # Empty module.
 self.add_arc(-start, start)
 self.add_arc(start, -start)
 
 def _code_object__FunctionDef(self, node: ast.FunctionDef) -> None:
 start = self.line_for_node(node)
 self.block_stack.append(FunctionBlock(start=start, name=node.name))
 exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
 self.process_return_exits(exits)
 self.block_stack.pop()
 
 _code_object__AsyncFunctionDef = _code_object__FunctionDef
 
 def _code_object__ClassDef(self, node: ast.ClassDef) -> None:
 start = self.line_for_node(node)
 self.add_arc(-start, start)
 exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
 for xit in exits:
 self.add_arc(
 xit.lineno, -start, xit.cause,
 f"didn't exit the body of class {node.name!r}",
 )
 
 _code_object__Lambda = _make_expression_code_method("lambda")
 _code_object__GeneratorExp = _make_expression_code_method("generator expression")
 if env.PYBEHAVIOR.comprehensions_are_functions:
 _code_object__DictComp = _make_expression_code_method("dictionary comprehension")
 _code_object__SetComp = _make_expression_code_method("set comprehension")
 _code_object__ListComp = _make_expression_code_method("list comprehension")
 
 
 # Code only used when dumping the AST for debugging.
 
 SKIP_DUMP_FIELDS = ["ctx"]
 
 def _is_simple_value(value: Any) -> bool:
 """Is `value` simple enough to be displayed on a single line?"""
 return (
 value in [None, [], (), {}, set(), frozenset(), Ellipsis] or
 isinstance(value, (bytes, int, float, str))
 )
 
 def ast_dump(
 node: ast.AST,
 depth: int = 0,
 print: Callable[[str], None] = print,   # pylint: disable=redefined-builtin
 ) -> None:
 """Dump the AST for `node`.
 
 This recursively walks the AST, printing a readable version.
 
 """
 indent = " " * depth
 lineno = getattr(node, "lineno", None)
 if lineno is not None:
 linemark = f" @ {node.lineno},{node.col_offset}"
 if hasattr(node, "end_lineno"):
 assert hasattr(node, "end_col_offset")
 linemark += ":"
 if node.end_lineno != node.lineno:
 linemark += f"{node.end_lineno},"
 linemark += f"{node.end_col_offset}"
 else:
 linemark = ""
 head = f"{indent}<{node.__class__.__name__}{linemark}"
 
 named_fields = [
 (name, value)
 for name, value in ast.iter_fields(node)
 if name not in SKIP_DUMP_FIELDS
 ]
 if not named_fields:
 print(f"{head}>")
 elif len(named_fields) == 1 and _is_simple_value(named_fields[0][1]):
 field_name, value = named_fields[0]
 print(f"{head} {field_name}: {value!r}>")
 else:
 print(head)
 if 0:
 print("{}# mro: {}".format(     # type: ignore[unreachable]
 indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
 ))
 next_indent = indent + "    "
 for field_name, value in named_fields:
 prefix = f"{next_indent}{field_name}:"
 if _is_simple_value(value):
 print(f"{prefix} {value!r}")
 elif isinstance(value, list):
 print(f"{prefix} [")
 for n in value:
 if _is_simple_value(n):
 print(f"{next_indent}    {n!r}")
 else:
 ast_dump(n, depth + 8, print=print)
 print(f"{next_indent}]")
 else:
 print(prefix)
 ast_dump(value, depth + 8, print=print)
 
 print(f"{indent}>")
 
 |