| Viewing file:  test_heapyc.py (15.6 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from guppy.heapy.test import support
 
 class TestCase(support.TestCase):
 def setUp(self):
 support.TestCase.setUp(self)
 self.sets = self.guppy.sets
 heapdefs = getattr(self.sets.setsc, '_NyHeapDefs_'),
 self.root = []
 self.heapyc = self.guppy.heapy.heapyc
 self.hv = self.heapyc.HeapView(self.root, heapdefs)
 self.nodeset = self.sets.immnodeset
 self.mutnodeset = self.sets.mutnodeset
 self.nodegraph = self.heapyc.NodeGraph
 
 
 class TestHeapView(TestCase):
 def test_hiding_tag(self):
 hiding_tag = self.hv._hiding_tag_
 
 a = []
 ns = self.mutnodeset([a])
 ng = self.nodegraph([(a, a)])
 
 self.aseq(self.hv.relimg([ns]), self.nodeset([a]))
 self.aseq(self.hv.relimg([ng]), self.nodeset([a]))
 
 ns._hiding_tag_ = hiding_tag
 self.aseq(self.hv.relimg([ns]), self.nodeset([]))
 
 ng._hiding_tag_ = hiding_tag
 self.aseq(self.hv.relimg([ng]), self.nodeset([]))
 
 self.hv._hiding_tag_ = []
 
 self.aseq(self.hv.relimg([ns]), self.nodeset([a, None]))
 self.aseq(self.hv.relimg([ng]), self.nodeset([a, None]))
 
 def test_inheritance_from_heapview(self):
 # I am not using inheritance from HeapView,
 # but it would be kinda weird if it didn't work.
 
 HeapView = self.guppy.heapy.heapyc.HeapView
 x = 'x'
 newroot = [x]
 
 class HV(HeapView):
 def __new__(self):
 return HeapView.__new__(HV, newroot, ())
 
 hv = HV()
 assert hv.heap() == self.nodeset([x, newroot])
 
 def test_inheritance_from_special_types(self):
 # Test that relate, size & traverse function correctly for inherited types
 # as discussed in Notes Apr 14 2005.
 # Testing with a standard type (list) with specially size and relate definitions,
 # and a heapdef'd type (mutnodeset) with size, relate and traverse defs.
 # Test includes more than 1 level of inheritance, since the generic
 # method needs to go over all bases and not just the (first) base.
 
 hv = self.hv
 hv._hiding_tag_ = []    # Different from default nodeset's hiding_tag
 immnodeset = self.sets.immnodeset
 mutnodeset = self.sets.mutnodeset
 
 for base in (list, mutnodeset):
 
 class T(base):
 __slots__ = 't',
 
 class U(T):
 __slots__ = 'u',
 
 a = base()
 t = T()
 t.t = []
 u = U()
 u.t = []
 u.u = []
 data = list(range(16))
 for x in data:
 a.append(x)
 t.append(x)
 u.append(x)
 
 # Test size
 
 za = hv.indisize_sum([a])
 zt = hv.indisize_sum([t])
 zu = hv.indisize_sum([u])
 
 self.assertTrue(za < zt < zu)
 
 # Test traverse
 
 self.aseq(hv.relimg([a]), immnodeset(data))
 self.aseq(hv.relimg([t]), immnodeset(data+[T, t.t]))
 self.aseq(hv.relimg([u]), immnodeset(data+[U, u.t, u.u]))
 
 # Test relate
 
 def rel(src, tgt):
 r = hv.relate(src, tgt)
 self.assertTrue(r != ((),)*len(r))
 return r
 
 self.aseq(rel(t, data[1]), rel(a, data[1]))
 self.aseq(rel(u, data[1]), rel(a, data[1]))
 self.aseq(rel(u, u.t), rel(t, t.t))
 rel(u, u.u)
 
 def test_nodeset_circularity(self):
 # The traversal to function correctly for types inherited from nodeset
 # required a fix as discussed in Notes Apr 14 2005.
 # This method intends to test that this fix was harmless wrt gc & circularity.
 # To make this method fail, it was necessary to disable nodeset gc traversal.
 
 # xxx It seems I have not yet come around to write this test...
 pass
 
 def test_registered_hiding(self):
 hv = self.hv
 
 class Set(object):
 __slots__ = 'some', '_hiding_tag_', 'other'
 
 class Der(Set):
 pass
 
 hv.register__hiding_tag__type(Set)
 
 # Der is inherited and registration follows its base type.
 
 s = Set()
 d = Der()
 t = Set()
 some = []
 other = []
 dother = []
 s.some = some
 s.other = other
 d.some = some
 d.other = dother
 self.root.append([s, t, d])
 self.root.append(s)
 self.root.append(d)
 self.root.append(t)
 
 x = hv.heap()
 assert dother in x
 assert some in x
 assert other in x
 
 assert s in x
 assert d in x
 assert t in x
 
 s._hiding_tag_ = hv._hiding_tag_
 d._hiding_tag_ = hv._hiding_tag_
 
 x = hv.heap()
 assert some not in x
 assert other not in x
 
 assert s not in x
 assert d not in x
 assert t in x
 
 he = []
 hv._hiding_tag_ = he
 
 x = hv.heap()
 assert dother in x
 assert some in x
 assert other in x
 
 assert s in x
 assert d in x
 assert t in x
 
 def test_timing(self):
 # Test some timing aspects of heap traversal
 
 from time import process_time as clock
 hv = self.hv
 
 d = []
 h = [d]
 
 self.root.extend(100000*[h])
 self.root.extend(list(range(100000)))
 
 start = clock()
 
 x = hv.heap()
 
 elapsed0 = clock() - start
 print('elapsed0', elapsed0, 'len(x)', len(x))
 
 class Set(object):
 __slots__ = 'some', '_hiding_tag_', 'other'
 
 class Der(Set):
 pass
 
 hv.register__hiding_tag__type(Set)
 
 s = Set()
 s._hiding_tag_ = hv._hiding_tag_
 d = Der()
 d._hiding_tag_ = hv._hiding_tag_
 self.root[0:50000] = 25000*[s, d]
 
 start = clock()
 
 x = hv.heap()
 
 elapsed1 = clock() - start
 print('elapsed1', elapsed1, 'len(x)', len(x))
 
 # This has failed a couple of times so I remove it now, (apr 5 2008)
 # xxx should look into this later ...
 #self.assert_(elapsed1 < 3.0 * elapsed0)
 
 
 class TestLeak(support.TestCase):
 
 def test_1(self):
 import gc
 from sys import getrefcount as grc
 
 support.TestCase.setUp(self)
 sets = self.guppy.sets
 heapdefs = getattr(sets.setsc, '_NyHeapDefs_'),
 root = []
 heapyc = self.guppy.heapy.heapyc
 nodeset = sets.mutnodeset
 nodegraph = heapyc.NodeGraph
 
 class T(object):
 __slots__ = 'a', '_hiding_tag_', 'tonly'
 pass
 
 class U(T):
 __slots__ = 'b',
 pass
 
 class V(object):
 __slots__ = 'c',
 
 gc.collect()
 
 ns = nodeset()
 a = [ns]
 a.append(a)
 b = []
 he = []
 c = []
 t = T()
 tonly = []
 
 t.a = a
 t._hiding_tag_ = he
 t.tonly = tonly
 
 u = U()
 u.a = a
 u._hiding_tag_ = he
 u.b = b
 
 v = V()
 v.c = c
 
 a = [x for x in [list]]
 
 li = [he, a, b, c, t, u, v, T, U, V, ns, nodeset, list]
 rcli0 = [grc(x) for x in li]
 
 ns |= li + list(range(10000, 10010))
 root.extend(li)
 
 rcli = [grc(x) for x in li]
 
 rec = nodeset([x for x in li])
 x = None
 
 rec.append(rec)
 ns.add(rec)
 rec._hiding_tag_ = rec
 
 hv = heapyc.HeapView(root, heapdefs)
 hv.register__hiding_tag__type(T)
 h = hv.heap()
 assert a in h
 assert c in h
 assert tonly in h
 hv._hiding_tag_ = he
 h = hv.heap()
 del x
 del h
 del hv
 
 ns.discard(rec)
 rec = None
 gc.collect()
 
 nrcli = [grc(x) for x in li]
 self.aseq(rcli, nrcli)
 
 root[:] = []
 ns.clear()
 
 nrcli0 = [grc(x) for x in li]
 
 self.aseq(rcli0, nrcli0)
 
 def test_weaky(self):
 # Test that the extra-type information in heapview
 # will still allow types to come, be used, and go, and be collected
 # This depends on that they are weakly-referenced
 # so internal heapview structures can remove them when they are
 # to be collected.
 
 import gc
 from sys import getrefcount as grc
 
 support.TestCase.setUp(self)
 sets = self.guppy.sets
 heapdefs = getattr(sets.setsc, '_NyHeapDefs_'),
 root = []
 heapyc = self.guppy.heapy.heapyc
 nodeset = sets.NodeSet
 nodegraph = heapyc.NodeGraph
 
 gc.collect()
 
 probe = []
 rcprobe = grc(probe)
 
 class T(object):
 x = probe
 
 class U(T):
 pass
 T.U = U  # Make circular dependency
 t = T()
 u = U()
 
 root.append(t)
 root.append(u)
 
 hv = heapyc.HeapView(root, heapdefs)
 x = hv.heap()
 assert t in x
 x = None
 
 T = t = U = u = None
 root[:] = []
 
 gc.collect()    # 2 collections needed sometimes? Note Apr 15 2005
 
 nrcprobe = grc(probe)
 
 self.aseq(nrcprobe, rcprobe)
 
 
 class TestNodeGraph(TestCase):
 def test_constructor_and_methods(self):
 
 # Test constructor w no arg
 ng = self.nodegraph()
 # Test add_edge
 ng.add_edge(1, 2)
 # Test add_edges_n1
 ng.add_edges_n1([3, 4], 5)
 lng = list(ng)
 lng.sort()
 assert lng == [(1, 2), (3, 5), (4, 5)]
 # Test as_flat_list
 fl = ng.as_flat_list()
 fl.sort()
 assert fl == [1, 2, 3, 4, 5, 5]
 # Test copy
 cp = ng.copy()
 cp.add_edge(5, 6)
 # Test __iter__ explicitly
 lng1 = list(ng.__iter__())
 lng1.sort()
 assert lng1 == lng
 lcp = list(cp)
 lcp.sort()
 assert lcp == [(1, 2), (3, 5), (4, 5), (5, 6)]
 
 # Test domain_covers
 assert ng.domain_covers([1, 3, 4])
 assert not ng.domain_covers([1, 3, 4, 5])
 
 # Test domain_restricted
 rng = ng.domain_restricted([1, 3])
 # Test get_domain
 assert rng.get_domain() == self.nodeset([1, 3])
 lrng = list(rng)
 lrng.sort()
 assert lrng == [(1, 2), (3, 5)]
 # Test get_range
 assert rng.get_range() == self.nodeset([2, 5])
 # Test invert
 rng.invert()
 lrng = list(rng)
 lrng.sort()
 assert lrng == [(2, 1), (5, 3)]
 # Test inverted
 ing = ng.inverted()
 ling = list(ing)
 ling.sort()
 assert ling == [(2, 1), (5, 3), (5, 4)]
 # Test relimg
 assert ing.relimg([2]) == self.nodeset([1])
 assert ing.relimg([2, 5, 3]) == self.nodeset([1, 3, 4])
 # Test update
 ing.update([(3, 7), (4, 8)])
 assert ing.relimg([2, 5, 3]) == self.nodeset([1, 3, 4, 7])
 # Test updated
 uing = ing.updated([(2, 9)])
 assert ing.relimg([2, 5, 3]) == self.nodeset([1, 3, 4, 7])
 assert uing.relimg([2, 5, 3]) == self.nodeset([1, 3, 4, 7, 9])
 
 # Test __getitem__
 tgts = list(uing[2])
 tgts.sort()
 assert tgts == [1, 9]
 # Test __len__
 assert len(uing) == 6
 uing[2] = (2, 8)
 # Test __setitem__
 tgts = list(uing[2])
 tgts.sort()
 assert tgts == [2, 8]
 
 # Test clear
 ng.clear()
 assert list(ng) == []
 
 # Test constructor with iterable
 
 ng = self.nodegraph([(1, 2)])
 assert list(ng) == [(1, 2)]
 assert not ng.is_mapping
 
 # Test constructor with is_mapping flag
 
 ng = self.nodegraph(is_mapping=True)
 assert ng.is_mapping
 assert list(ng) == []
 ng.add_edge(1, 2)
 assert list(ng) == [(1, 2)]
 assert ng[1] == 2
 
 ng = self.nodegraph(is_mapping=False)
 assert not ng.is_mapping
 
 # Test constructor with iterable & is_mapping flag
 
 for ng in (self.nodegraph([(1, 2)], True),
 self.nodegraph(iterable=[(1, 2)], is_mapping=True),
 self.nodegraph([(1, 2)], is_mapping=True),
 self.nodegraph(is_mapping=True, iterable=[(1, 2)])
 ):
 assert ng.is_mapping
 assert list(ng) == [(1, 2)]
 assert ng[1] == 2
 ng[1] = 3
 assert ng[1] == 3
 
 # Test is_sorted flag
 # though this behaviour is not fixed - may change with implementation
 ng = self.nodegraph()
 ng.add_edge(1, 2)
 ng.add_edge(2, 1)
 assert not ng.is_sorted
 ng[1]
 assert ng.is_sorted
 
 def test_inheritance(self):
 class T(self.heapyc.NodeGraph):
 __slots__ = 'x'
 
 def as_sorted_list(self):
 a = list(self)
 a.sort()
 return a
 
 t = T()
 t.add_edge(1, 2)
 t.add_edge(2, 3)
 assert t.as_sorted_list() == [(1, 2), (2, 3)]
 
 t = T([(4, 5), (6, 7)])
 assert t.as_sorted_list() == [(4, 5), (6, 7)]
 
 # Test that the base type functionality has been inherited
 #  by making test_constructor_and_methods think NodeGraph is T
 self.nodegraph = T
 self.test_constructor_and_methods()
 
 # Test with a constructor with new argument
 # and some more attributes
 
 class R(T):
 __slots__ = 'stop',
 
 def __new__(self, stop):
 r = T.__new__(R, is_mapping=1)
 r.add_edges_n1(list(range(stop)), 0)
 r.stop = stop
 return r
 
 def keys(self):
 return list(self.get_domain())
 
 def values(self):
 return [self[k] for k in list(self.keys())]
 
 r = R(10)
 assert r.stop == 10
 assert r.is_mapping
 lr = list(r)
 lr.sort()
 assert lr[-2:] == [(8, 0), (9, 0)]
 
 keys = list(r.keys())
 keys.sort()
 assert keys == list(range(10))
 values = list(r.values())
 assert values == [0]*10
 
 
 class TestClassifiers(TestCase):
 # Some new standalone classifiers tests.
 # Some old are also tested via test_Classifiers.
 
 def test_inrel(self):
 def str_inrel(c):
 c = list(c)
 c.sort()
 return ', '.join(['(%s, %r)' % (x.kind, x.relator) for x in c])
 
 hv = self.hv
 rg = self.nodegraph()
 x = []
 y = [x]
 rg.add_edge(x, y)
 cli = hv.cli_inrel(rg, {}, {})
 c = cli.classify(x)
 self.aseq(str_inrel(c), '(2, 0)')
 
 for i in range(5):
 y.append(x)
 c = cli.classify(x)
 self.aseq(str_inrel(c), '(2, 0), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5)')
 
 for i in range(5):
 r = {str(i): x}
 rg.add_edge(x, r)
 c = cli.classify(x)
 
 def test_nodetuple_richcompare(self):
 hv = self.hv
 
 cli = hv.cli_and((hv.cli_id(),), {})
 
 a, b = cli.classify(1), cli.classify(2)
 
 self.assertTrue(a != b)
 self.assertFalse(a == b)
 self.assertTrue(a >= b or a <= b)
 self.assertFalse(a >= b and a <= b)
 self.assertTrue(a > b or a < b)
 self.assertFalse(a > b and a < b)
 
 a, b = cli.classify(1), cli.classify(1)
 
 self.assertFalse(a != b)
 self.assertTrue(a == b)
 self.assertTrue(a >= b or a <= b)
 self.assertTrue(a >= b and a <= b)
 self.assertFalse(a > b or a < b)
 self.assertFalse(a > b and a < b)
 
 
 def test_main(debug=False):
 support.run_unittest(TestClassifiers, debug)
 support.run_unittest(TestNodeGraph, debug)
 support.run_unittest(TestLeak, debug)
 support.run_unittest(TestHeapView, debug)
 
 
 if __name__ == "__main__":
 test_main()
 
 |