| Viewing file:  resolvers.py (14.67 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
import collections
 from .compat import collections_abc
 from .providers import AbstractResolver
 from .structs import DirectedGraph
 
 
 RequirementInformation = collections.namedtuple(
 "RequirementInformation", ["requirement", "parent"]
 )
 
 
 class ResolverException(Exception):
 """A base class for all exceptions raised by this module.
 
 Exceptions derived by this class should all be handled in this module. Any
 bubbling pass the resolver should be treated as a bug.
 """
 
 
 class RequirementsConflicted(ResolverException):
 def __init__(self, criterion):
 super(RequirementsConflicted, self).__init__(criterion)
 self.criterion = criterion
 
 def __str__(self):
 return "Requirements conflict: {}".format(
 ", ".join(repr(r) for r in self.criterion.iter_requirement()),
 )
 
 
 class InconsistentCandidate(ResolverException):
 def __init__(self, candidate, criterion):
 super(InconsistentCandidate, self).__init__(candidate, criterion)
 self.candidate = candidate
 self.criterion = criterion
 
 def __str__(self):
 return "Provided candidate {!r} does not satisfy {}".format(
 self.candidate,
 ", ".join(repr(r) for r in self.criterion.iter_requirement()),
 )
 
 
 class Criterion(object):
 """Representation of possible resolution results of a package.
 
 This holds three attributes:
 
 * `information` is a collection of `RequirementInformation` pairs.
 Each pair is a requirement contributing to this criterion, and the
 candidate that provides the requirement.
 * `incompatibilities` is a collection of all known not-to-work candidates
 to exclude from consideration.
 * `candidates` is a collection containing all possible candidates deducted
 from the union of contributing requirements and known incompatibilities.
 It should never be empty, except when the criterion is an attribute of a
 raised `RequirementsConflicted` (in which case it is always empty).
 
 .. note::
 This class is intended to be externally immutable. **Do not** mutate
 any of its attribute containers.
 """
 
 def __init__(self, candidates, information, incompatibilities):
 self.candidates = candidates
 self.information = information
 self.incompatibilities = incompatibilities
 
 def __repr__(self):
 requirements = ", ".join(
 "({!r}, via={!r})".format(req, parent)
 for req, parent in self.information
 )
 return "Criterion({})".format(requirements)
 
 @classmethod
 def from_requirement(cls, provider, requirement, parent):
 """Build an instance from a requirement.
 """
 candidates = provider.find_matches([requirement])
 if not isinstance(candidates, collections_abc.Sequence):
 candidates = list(candidates)
 criterion = cls(
 candidates=candidates,
 information=[RequirementInformation(requirement, parent)],
 incompatibilities=[],
 )
 if not candidates:
 raise RequirementsConflicted(criterion)
 return criterion
 
 def iter_requirement(self):
 return (i.requirement for i in self.information)
 
 def iter_parent(self):
 return (i.parent for i in self.information)
 
 def merged_with(self, provider, requirement, parent):
 """Build a new instance from this and a new requirement.
 """
 infos = list(self.information)
 infos.append(RequirementInformation(requirement, parent))
 candidates = provider.find_matches([r for r, _ in infos])
 if not isinstance(candidates, collections_abc.Sequence):
 candidates = list(candidates)
 criterion = type(self)(candidates, infos, list(self.incompatibilities))
 if not candidates:
 raise RequirementsConflicted(criterion)
 return criterion
 
 def excluded_of(self, candidate):
 """Build a new instance from this, but excluding specified candidate.
 
 Returns the new instance, or None if we still have no valid candidates.
 """
 incompats = list(self.incompatibilities)
 incompats.append(candidate)
 candidates = [c for c in self.candidates if c != candidate]
 if not candidates:
 return None
 criterion = type(self)(candidates, list(self.information), incompats)
 return criterion
 
 
 class ResolutionError(ResolverException):
 pass
 
 
 class ResolutionImpossible(ResolutionError):
 def __init__(self, causes):
 super(ResolutionImpossible, self).__init__(causes)
 # causes is a list of RequirementInformation objects
 self.causes = causes
 
 
 class ResolutionTooDeep(ResolutionError):
 def __init__(self, round_count):
 super(ResolutionTooDeep, self).__init__(round_count)
 self.round_count = round_count
 
 
 # Resolution state in a round.
 State = collections.namedtuple("State", "mapping criteria")
 
 
 class Resolution(object):
 """Stateful resolution object.
 
 This is designed as a one-off object that holds information to kick start
 the resolution process, and holds the results afterwards.
 """
 
 def __init__(self, provider, reporter):
 self._p = provider
 self._r = reporter
 self._states = []
 
 @property
 def state(self):
 try:
 return self._states[-1]
 except IndexError:
 raise AttributeError("state")
 
 def _push_new_state(self):
 """Push a new state into history.
 
 This new state will be used to hold resolution results of the next
 coming round.
 """
 try:
 base = self._states[-1]
 except IndexError:
 state = State(mapping=collections.OrderedDict(), criteria={})
 else:
 state = State(
 mapping=base.mapping.copy(), criteria=base.criteria.copy(),
 )
 self._states.append(state)
 
 def _merge_into_criterion(self, requirement, parent):
 self._r.adding_requirement(requirement, parent)
 name = self._p.identify(requirement)
 try:
 crit = self.state.criteria[name]
 except KeyError:
 crit = Criterion.from_requirement(self._p, requirement, parent)
 else:
 crit = crit.merged_with(self._p, requirement, parent)
 return name, crit
 
 def _get_criterion_item_preference(self, item):
 name, criterion = item
 try:
 pinned = self.state.mapping[name]
 except KeyError:
 pinned = None
 return self._p.get_preference(
 pinned, criterion.candidates, criterion.information,
 )
 
 def _is_current_pin_satisfying(self, name, criterion):
 try:
 current_pin = self.state.mapping[name]
 except KeyError:
 return False
 return all(
 self._p.is_satisfied_by(r, current_pin)
 for r in criterion.iter_requirement()
 )
 
 def _get_criteria_to_update(self, candidate):
 criteria = {}
 for r in self._p.get_dependencies(candidate):
 name, crit = self._merge_into_criterion(r, parent=candidate)
 criteria[name] = crit
 return criteria
 
 def _attempt_to_pin_criterion(self, name, criterion):
 causes = []
 for candidate in criterion.candidates:
 try:
 criteria = self._get_criteria_to_update(candidate)
 except RequirementsConflicted as e:
 causes.append(e.criterion)
 continue
 
 # Check the newly-pinned candidate actually works. This should
 # always pass under normal circumstances, but in the case of a
 # faulty provider, we will raise an error to notify the implementer
 # to fix find_matches() and/or is_satisfied_by().
 satisfied = all(
 self._p.is_satisfied_by(r, candidate)
 for r in criterion.iter_requirement()
 )
 if not satisfied:
 raise InconsistentCandidate(candidate, criterion)
 
 # Put newly-pinned candidate at the end. This is essential because
 # backtracking looks at this mapping to get the last pin.
 self._r.pinning(candidate)
 self.state.mapping.pop(name, None)
 self.state.mapping[name] = candidate
 self.state.criteria.update(criteria)
 
 return []
 
 # All candidates tried, nothing works. This criterion is a dead
 # end, signal for backtracking.
 return causes
 
 def _backtrack(self):
 # Drop the current state, it's known not to work.
 del self._states[-1]
 
 # We need at least 2 states here:
 # (a) One to backtrack to.
 # (b) One to restore state (a) to its state prior to candidate-pinning,
 #     so we can pin another one instead.
 
 while len(self._states) >= 2:
 # Retract the last candidate pin.
 prev_state = self._states.pop()
 try:
 name, candidate = prev_state.mapping.popitem()
 except KeyError:
 continue
 self._r.backtracking(candidate)
 
 # Create a new state to work on, with the newly known not-working
 # candidate excluded.
 self._push_new_state()
 
 # Mark the retracted candidate as incompatible.
 criterion = self.state.criteria[name].excluded_of(candidate)
 if criterion is None:
 # This state still does not work. Try the still previous state.
 del self._states[-1]
 continue
 self.state.criteria[name] = criterion
 
 return True
 
 return False
 
 def resolve(self, requirements, max_rounds):
 if self._states:
 raise RuntimeError("already resolved")
 
 self._push_new_state()
 for r in requirements:
 try:
 name, crit = self._merge_into_criterion(r, parent=None)
 except RequirementsConflicted as e:
 raise ResolutionImpossible(e.criterion.information)
 self.state.criteria[name] = crit
 
 self._r.starting()
 
 for round_index in range(max_rounds):
 self._r.starting_round(round_index)
 
 self._push_new_state()
 curr = self.state
 
 unsatisfied_criterion_items = [
 item
 for item in self.state.criteria.items()
 if not self._is_current_pin_satisfying(*item)
 ]
 
 # All criteria are accounted for. Nothing more to pin, we are done!
 if not unsatisfied_criterion_items:
 del self._states[-1]
 self._r.ending(curr)
 return self.state
 
 # Choose the most preferred unpinned criterion to try.
 name, criterion = min(
 unsatisfied_criterion_items,
 key=self._get_criterion_item_preference,
 )
 failure_causes = self._attempt_to_pin_criterion(name, criterion)
 
 # Backtrack if pinning fails.
 if failure_causes:
 result = self._backtrack()
 if not result:
 causes = [
 i for crit in failure_causes for i in crit.information
 ]
 raise ResolutionImpossible(causes)
 
 self._r.ending_round(round_index, curr)
 
 raise ResolutionTooDeep(max_rounds)
 
 
 def _has_route_to_root(criteria, key, all_keys, connected):
 if key in connected:
 return True
 if key not in criteria:
 return False
 for p in criteria[key].iter_parent():
 try:
 pkey = all_keys[id(p)]
 except KeyError:
 continue
 if pkey in connected:
 connected.add(key)
 return True
 if _has_route_to_root(criteria, pkey, all_keys, connected):
 connected.add(key)
 return True
 return False
 
 
 Result = collections.namedtuple("Result", "mapping graph criteria")
 
 
 def _build_result(state):
 mapping = state.mapping
 all_keys = {id(v): k for k, v in mapping.items()}
 all_keys[id(None)] = None
 
 graph = DirectedGraph()
 graph.add(None)  # Sentinel as root dependencies' parent.
 
 connected = {None}
 for key, criterion in state.criteria.items():
 if not _has_route_to_root(state.criteria, key, all_keys, connected):
 continue
 if key not in graph:
 graph.add(key)
 for p in criterion.iter_parent():
 try:
 pkey = all_keys[id(p)]
 except KeyError:
 continue
 if pkey not in graph:
 graph.add(pkey)
 graph.connect(pkey, key)
 
 return Result(
 mapping={k: v for k, v in mapping.items() if k in connected},
 graph=graph,
 criteria=state.criteria,
 )
 
 
 class Resolver(AbstractResolver):
 """The thing that performs the actual resolution work.
 """
 
 base_exception = ResolverException
 
 def resolve(self, requirements, max_rounds=100):
 """Take a collection of constraints, spit out the resolution result.
 
 The return value is a representation to the final resolution result. It
 is a tuple subclass with three public members:
 
 * `mapping`: A dict of resolved candidates. Each key is an identifier
 of a requirement (as returned by the provider's `identify` method),
 and the value is the resolved candidate.
 * `graph`: A `DirectedGraph` instance representing the dependency tree.
 The vertices are keys of `mapping`, and each edge represents *why*
 a particular package is included. A special vertex `None` is
 included to represent parents of user-supplied requirements.
 * `criteria`: A dict of "criteria" that hold detailed information on
 how edges in the graph are derived. Each key is an identifier of a
 requirement, and the value is a `Criterion` instance.
 
 The following exceptions may be raised if a resolution cannot be found:
 
 * `ResolutionImpossible`: A resolution cannot be found for the given
 combination of requirements. The `causes` attribute of the
 exception is a list of (requirement, parent), giving the
 requirements that could not be satisfied.
 * `ResolutionTooDeep`: The dependency tree is too deeply nested and
 the resolver gave up. This is usually caused by a circular
 dependency, but you can try to resolve this by increasing the
 `max_rounds` argument.
 """
 resolution = Resolution(self.provider, self.reporter)
 state = resolution.resolve(requirements, max_rounds=max_rounds)
 return _build_result(state)
 
 |